/* * CommitProxyServer.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include "fdbclient/Atomic.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/Knobs.h" #include "fdbclient/CommitProxyInterface.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/SystemData.h" #include "fdbclient/TransactionLineage.h" #include "fdbrpc/sim_validation.h" #include "fdbserver/ApplyMetadataMutation.h" #include "fdbserver/ConflictSet.h" #include "fdbserver/DataDistributorInterface.h" #include "fdbserver/FDBExecHelper.actor.h" #include "fdbserver/IKeyValueStore.h" #include "fdbserver/Knobs.h" #include "fdbserver/LogSystem.h" #include "fdbserver/LogSystemDiskQueueAdapter.h" #include "fdbserver/MasterInterface.h" #include "fdbserver/MutationTracking.h" #include "fdbserver/ProxyCommitData.actor.h" #include "fdbserver/RatekeeperInterface.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/RestoreUtil.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" #include "flow/ActorCollection.h" #include "flow/Error.h" #include "flow/IRandom.h" #include "flow/Knobs.h" #include "flow/Trace.h" #include "flow/Tracing.h" #include "flow/actorcompiler.h" // This must be the last #include. ACTOR Future broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply) { state ReplyPromise reply = req.reply; resetReply(req); std::vector> replies; int currentStream = 0; std::vector broadcastEndpoints = req.broadcastInfo; for (int i = 0; i < sendAmount && currentStream < broadcastEndpoints.size(); i++) { std::vector endpoints; RequestStream cur(broadcastEndpoints[currentStream++]); while (currentStream < broadcastEndpoints.size() * (i + 1) / sendAmount) { endpoints.push_back(broadcastEndpoints[currentStream++]); } req.broadcastInfo = endpoints; replies.push_back(brokenPromiseToNever(cur.getReply(req))); resetReply(req); } wait(waitForAll(replies)); if (sendReply) { reply.send(Void()); } return Void(); } ACTOR void discardCommit(UID id, Future fcm, Future dummyCommitState) { ASSERT(!dummyCommitState.isReady()); LogSystemDiskQueueAdapter::CommitMessage cm = wait(fcm); TraceEvent("Discarding", id).detail("Count", cm.messages.size()); cm.acknowledge.send(Void()); ASSERT(dummyCommitState.isReady()); } struct ResolutionRequestBuilder { ProxyCommitData* self; std::vector requests; std::vector> transactionResolverMap; std::vector outTr; std::vector>> txReadConflictRangeIndexMap; // Used to report conflicting keys, the format is // [CommitTransactionRef_Index][Resolver_Index][Read_Conflict_Range_Index_on_Resolver] // -> read_conflict_range's original index in the commitTransactionRef ResolutionRequestBuilder(ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion, Span& parentSpan) : self(self), requests(self->resolvers.size()) { for (auto& req : requests) { req.spanContext = parentSpan.context; req.prevVersion = prevVersion; req.version = version; req.lastReceivedVersion = lastReceivedVersion; } } CommitTransactionRef& getOutTransaction(int resolver, Version read_snapshot) { CommitTransactionRef*& out = outTr[resolver]; if (!out) { ResolveTransactionBatchRequest& request = requests[resolver]; request.transactions.resize(request.arena, request.transactions.size() + 1); out = &request.transactions.back(); out->read_snapshot = read_snapshot; } return *out; } void addTransaction(CommitTransactionRequest& trRequest, int transactionNumberInBatch) { auto& trIn = trRequest.transaction; // SOMEDAY: There are a couple of unnecessary O( # resolvers ) steps here outTr.assign(requests.size(), nullptr); ASSERT(transactionNumberInBatch >= 0 && transactionNumberInBatch < 32768); bool isTXNStateTransaction = false; for (auto& m : trIn.mutations) { if (m.type == MutationRef::SetVersionstampedKey) { transformVersionstampMutation(m, &MutationRef::param1, requests[0].version, transactionNumberInBatch); trIn.write_conflict_ranges.push_back(requests[0].arena, singleKeyRange(m.param1, requests[0].arena)); } else if (m.type == MutationRef::SetVersionstampedValue) { transformVersionstampMutation(m, &MutationRef::param2, requests[0].version, transactionNumberInBatch); } if (isMetadataMutation(m)) { isTXNStateTransaction = true; getOutTransaction(0, trIn.read_snapshot).mutations.push_back(requests[0].arena, m); } } if (isTXNStateTransaction && !trRequest.isLockAware()) { // This mitigates https://github.com/apple/foundationdb/issues/3647. Since this transaction is not lock // aware, if this transaction got a read version then \xff/dbLocked must not have been set at this // transaction's read snapshot. If that changes by commit time, then it won't commit on any proxy because of // a conflict. A client could set a read version manually so this isn't totally bulletproof. trIn.read_conflict_ranges.push_back(trRequest.arena, KeyRangeRef(databaseLockedKey, databaseLockedKeyEnd)); } std::vector> rCRIndexMap( requests.size()); // [resolver_index][read_conflict_range_index_on_the_resolver] // -> read_conflict_range's original index for (int idx = 0; idx < trIn.read_conflict_ranges.size(); ++idx) { const auto& r = trIn.read_conflict_ranges[idx]; auto ranges = self->keyResolvers.intersectingRanges(r); std::set resolvers; for (auto& ir : ranges) { auto& version_resolver = ir.value(); for (int i = version_resolver.size() - 1; i >= 0; i--) { resolvers.insert(version_resolver[i].second); if (version_resolver[i].first < trIn.read_snapshot) break; } } ASSERT(resolvers.size()); for (int resolver : resolvers) { getOutTransaction(resolver, trIn.read_snapshot) .read_conflict_ranges.push_back(requests[resolver].arena, r); rCRIndexMap[resolver].push_back(idx); } } txReadConflictRangeIndexMap.push_back(std::move(rCRIndexMap)); for (auto& r : trIn.write_conflict_ranges) { auto ranges = self->keyResolvers.intersectingRanges(r); std::set resolvers; for (auto& ir : ranges) resolvers.insert(ir.value().back().second); ASSERT(resolvers.size()); for (int resolver : resolvers) getOutTransaction(resolver, trIn.read_snapshot) .write_conflict_ranges.push_back(requests[resolver].arena, r); } if (isTXNStateTransaction) { for (int r = 0; r < requests.size(); r++) { int transactionNumberInRequest = &getOutTransaction(r, trIn.read_snapshot) - requests[r].transactions.begin(); requests[r].txnStateTransactions.push_back(requests[r].arena, transactionNumberInRequest); } // Note only Resolver 0 got the correct spanContext, which means // the reply from Resolver 0 has the right one back. getOutTransaction(0, trIn.read_snapshot).spanContext = trRequest.spanContext; } std::vector resolversUsed; for (int r = 0; r < outTr.size(); r++) if (outTr[r]) { resolversUsed.push_back(r); outTr[r]->report_conflicting_keys = trIn.report_conflicting_keys; } transactionResolverMap.emplace_back(std::move(resolversUsed)); } }; ACTOR Future commitBatcher(ProxyCommitData* commitData, PromiseStream, int>> out, FutureStream in, int desiredBytes, int64_t memBytesLimit) { wait(delayJittered(commitData->commitBatchInterval, TaskPriority::ProxyCommitBatcher)); state double lastBatch = 0; loop { state Future timeout; state std::vector batch; state int batchBytes = 0; if (SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL <= 0) { timeout = Never(); } else { timeout = delayJittered(SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL, TaskPriority::ProxyCommitBatcher); } while (!timeout.isReady() && !(batch.size() == SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_COUNT_MAX || batchBytes >= desiredBytes)) { choose { when(CommitTransactionRequest req = waitNext(in)) { // WARNING: this code is run at a high priority, so it needs to do as little work as possible int bytes = getBytes(req); // Drop requests if memory is under severe pressure if (commitData->commitBatchesMemBytesCount + bytes > memBytesLimit) { ++commitData->stats.txnCommitErrors; req.reply.sendError(proxy_memory_limit_exceeded()); TraceEvent(SevWarnAlways, "ProxyCommitBatchMemoryThresholdExceeded") .suppressFor(60) .detail("MemBytesCount", commitData->commitBatchesMemBytesCount) .detail("MemLimit", memBytesLimit); continue; } if (bytes > FLOW_KNOBS->PACKET_WARNING) { TraceEvent(!g_network->isSimulated() ? SevWarnAlways : SevWarn, "LargeTransaction") .suppressFor(1.0) .detail("Size", bytes) .detail("Client", req.reply.getEndpoint().getPrimaryAddress()); } ++commitData->stats.txnCommitIn; if (req.debugID.present()) { g_traceBatch.addEvent("CommitDebug", req.debugID.get().first(), "CommitProxyServer.batcher"); } if (!batch.size()) { if (now() - lastBatch > commitData->commitBatchInterval) { timeout = delayJittered(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, TaskPriority::ProxyCommitBatcher); } else { timeout = delayJittered(commitData->commitBatchInterval - (now() - lastBatch), TaskPriority::ProxyCommitBatcher); } } if ((batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) && batch.size()) { out.send({ std::move(batch), batchBytes }); lastBatch = now(); timeout = delayJittered(commitData->commitBatchInterval, TaskPriority::ProxyCommitBatcher); batch.clear(); batchBytes = 0; } batch.push_back(req); batchBytes += bytes; commitData->commitBatchesMemBytesCount += bytes; } when(wait(timeout)) {} } } out.send({ std::move(batch), batchBytes }); lastBatch = now(); } } void createWhitelistBinPathVec(const std::string& binPath, std::vector>& binPathVec) { TraceEvent(SevDebug, "BinPathConverter").detail("Input", binPath); StringRef input(binPath); while (input != StringRef()) { StringRef token = input.eat(LiteralStringRef(",")); if (token != StringRef()) { const uint8_t* ptr = token.begin(); while (ptr != token.end() && *ptr == ' ') { ptr++; } if (ptr != token.end()) { Standalone newElement(token.substr(ptr - token.begin())); TraceEvent(SevDebug, "BinPathItem").detail("Element", newElement); binPathVec.push_back(newElement); } } } return; } bool isWhitelisted(const std::vector>& binPathVec, StringRef binPath) { TraceEvent("BinPath").detail("Value", binPath); for (const auto& item : binPathVec) { TraceEvent("Element").detail("Value", item); } return std::find(binPathVec.begin(), binPathVec.end(), binPath) != binPathVec.end(); } ACTOR Future addBackupMutations(ProxyCommitData* self, const std::map* logRangeMutations, LogPushData* toCommit, Version commitVersion, double* computeDuration, double* computeStart) { state std::map::const_iterator logRangeMutation = logRangeMutations->cbegin(); state int32_t version = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; state int yieldBytes = 0; state BinaryWriter valueWriter(Unversioned()); toCommit->addTransactionInfo(SpanID()); // Serialize the log range mutations within the map for (; logRangeMutation != logRangeMutations->cend(); ++logRangeMutation) { // FIXME: this is re-implementing the serialize function of MutationListRef in order to have a yield valueWriter = BinaryWriter(IncludeVersion(ProtocolVersion::withBackupMutations())); valueWriter << logRangeMutation->second.totalSize(); state MutationListRef::Blob* blobIter = logRangeMutation->second.blob_begin; while (blobIter) { if (yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) { yieldBytes = 0; if (g_network->check_yield(TaskPriority::ProxyCommitYield1)) { *computeDuration += g_network->timer() - *computeStart; wait(delay(0, TaskPriority::ProxyCommitYield1)); *computeStart = g_network->timer(); } } valueWriter.serializeBytes(blobIter->data); yieldBytes += blobIter->data.size(); blobIter = blobIter->next; } Key val = valueWriter.toValue(); BinaryWriter wr(Unversioned()); // Serialize the log destination wr.serializeBytes(logRangeMutation->first); // Write the log keys and version information wr << (uint8_t)hashlittle(&version, sizeof(version), 0); wr << bigEndian64(commitVersion); MutationRef backupMutation; backupMutation.type = MutationRef::SetValue; uint32_t* partBuffer = nullptr; for (int part = 0; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < val.size(); part++) { // Assign the second parameter as the part backupMutation.param2 = val.substr( part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, std::min(val.size() - part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, CLIENT_KNOBS->MUTATION_BLOCK_SIZE)); // Write the last part of the mutation to the serialization, if the buffer is not defined if (!partBuffer) { // Serialize the part to the writer wr << bigEndian32(part); // Define the last buffer part partBuffer = (uint32_t*)((char*)wr.getData() + wr.getLength() - sizeof(uint32_t)); } else { *partBuffer = bigEndian32(part); } // Define the mutation type and and location backupMutation.param1 = wr.toValue(); ASSERT(backupMutation.param1.startsWith( logRangeMutation->first)); // We are writing into the configured destination auto& tags = self->tagsForKey(backupMutation.param1); toCommit->addTags(tags); toCommit->writeTypedMessage(backupMutation); // if (DEBUG_MUTATION("BackupProxyCommit", commitVersion, backupMutation)) { // TraceEvent("BackupProxyCommitTo", self->dbgid).detail("To", // describe(tags)).detail("BackupMutation", backupMutation.toString()) // .detail("BackupMutationSize", val.size()).detail("Version", commitVersion).detail("DestPath", // logRangeMutation.first) .detail("PartIndex", part).detail("PartIndexEndian", // bigEndian32(part)).detail("PartData", backupMutation.param1); // } } } return Void(); } ACTOR Future releaseResolvingAfter(ProxyCommitData* self, Future releaseDelay, int64_t localBatchNumber) { wait(releaseDelay); ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber - 1); self->latestLocalCommitBatchResolving.set(localBatchNumber); return Void(); } ACTOR static Future trackResolutionMetrics(Reference dist, Future in) { state double startTime = now(); ResolveTransactionBatchReply reply = wait(in); dist->sampleSeconds(now() - startTime); return reply; } namespace CommitBatch { struct CommitBatchContext { using StoreCommit_t = std::vector, Future>>; ProxyCommitData* const pProxyCommitData; std::vector trs; int currentBatchMemBytesCount; double startTime; Optional debugID; bool forceRecovery = false; bool rejected = false; // If rejected due to long queue length int64_t localBatchNumber; LogPushData toCommit; int batchOperations = 0; Span span; int64_t batchBytes = 0; int latencyBucket = 0; Version commitVersion; Version prevVersion; int64_t maxTransactionBytes; std::vector> transactionResolverMap; std::vector>> txReadConflictRangeIndexMap; Future releaseDelay; Future releaseFuture; std::vector resolution; double computeStart; double computeDuration = 0; Arena arena; /// true if the batch is the 1st batch for this proxy, additional metadata /// processing is involved for this batch. bool isMyFirstBatch; bool firstStateMutations; Optional oldCoordinators; StoreCommit_t storeCommits; std::vector committed; Optional lockedKey; bool locked; int commitCount = 0; std::vector nextTr; bool lockedAfter; Optional metadataVersionAfter; int mutationCount = 0; int mutationBytes = 0; std::map logRangeMutations; Arena logRangeMutationsArena; int transactionNum = 0; int yieldBytes = 0; LogSystemDiskQueueAdapter::CommitMessage msg; Future loggingComplete; double commitStartTime; std::unordered_map tpcvMap; // obtained from sequencer std::set writtenTLogs; // the set of tlog locations written to in the mutation. std::set writtenTags; // final set tags written to in the batch std::set writtenTagsPreResolution; // tags written to in the batch not including any changes from the resolver. bool hasMetadataMutation = false; bool metadataMutationFromProxy = false; CommitBatchContext(ProxyCommitData*, const std::vector*, const int); void setupTraceBatch(); std::set getWrittenTagsPreResolution(); private: void evaluateBatchSize(); }; std::set CommitBatchContext::getWrittenTagsPreResolution() { std::set transactionTags; std::vector cacheVector = { cacheTag }; for (int transactionNum = 0; transactionNum < trs.size(); transactionNum++) { int mutationNum = 0; VectorRef* pMutations = &trs[transactionNum].transaction.mutations; for (; mutationNum < pMutations->size(); mutationNum++) { auto& m = (*pMutations)[mutationNum]; if (isSingleKeyMutation((MutationRef::Type)m.type)) { auto& tags = pProxyCommitData->tagsForKey(m.param1); transactionTags.insert(tags.begin(), tags.end()); toCommit.getLocations(tags, writtenTLogs); if (pProxyCommitData->cacheInfo[m.param1]) { toCommit.getLocations(cacheVector, writtenTLogs); transactionTags.insert(cacheTag); } } else if (m.type == MutationRef::ClearRange) { KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2)); auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange); auto firstRange = ranges.begin(); ++firstRange; if (firstRange == ranges.end()) { std::set filteredTags; ranges.begin().value().populateTags(); filteredTags.insert(ranges.begin().value().tags.begin(), ranges.begin().value().tags.end()); transactionTags.insert(ranges.begin().value().tags.begin(), ranges.begin().value().tags.end()); toCommit.getLocations(filteredTags, writtenTLogs); } else { std::set allSources; for (auto r : ranges) { r.value().populateTags(); allSources.insert(r.value().tags.begin(), r.value().tags.end()); transactionTags.insert(r.value().tags.begin(), r.value().tags.end()); } toCommit.getLocations(allSources, writtenTLogs); } if (pProxyCommitData->needsCacheTag(clearRange)) { toCommit.getLocations(cacheVector, writtenTLogs); transactionTags.insert(cacheTag); } } else { UNREACHABLE(); } } hasMetadataMutation = containsMetadataMutation(trs[transactionNum].transaction.mutations); } return transactionTags; } CommitBatchContext::CommitBatchContext(ProxyCommitData* const pProxyCommitData_, const std::vector* trs_, const int currentBatchMemBytesCount) : pProxyCommitData(pProxyCommitData_), trs(std::move(*const_cast*>(trs_))), currentBatchMemBytesCount(currentBatchMemBytesCount), startTime(g_network->now()), localBatchNumber(++pProxyCommitData->localCommitBatchesStarted), toCommit(pProxyCommitData->logSystem), span("MP:commitBatch"_loc), committed(trs.size()) { evaluateBatchSize(); if (batchOperations != 0) { latencyBucket = std::min(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS - 1, SERVER_KNOBS->PROXY_COMPUTE_BUCKETS * batchBytes / (batchOperations * (CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->KEY_SIZE_LIMIT))); } // since we are using just the former to limit the number of versions actually in flight! ASSERT(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS <= SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT); } void CommitBatchContext::setupTraceBatch() { for (const auto& tr : trs) { if (tr.debugID.present()) { if (!debugID.present()) { debugID = nondeterministicRandom()->randomUniqueID(); } g_traceBatch.addAttach("CommitAttachID", tr.debugID.get().first(), debugID.get().first()); } span.addParent(tr.spanContext); } if (debugID.present()) { g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.Before"); } } void CommitBatchContext::evaluateBatchSize() { for (const auto& tr : trs) { const auto& mutations = tr.transaction.mutations; batchOperations += mutations.size(); batchBytes += mutations.expectedSize(); } } // Try to identify recovery transaction and backup's apply mutations (blind writes). // Both cannot be rejected and are approximated by looking at first mutation // starting with 0xff. bool canReject(const std::vector& trs) { for (const auto& tr : trs) { if (tr.transaction.mutations.empty()) continue; if (tr.transaction.mutations[0].param1.startsWith(LiteralStringRef("\xff")) || tr.transaction.read_conflict_ranges.empty()) { return false; } } return true; } ACTOR Future preresolutionProcessing(CommitBatchContext* self) { state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; state std::vector& trs = self->trs; state const int64_t localBatchNumber = self->localBatchNumber; state const int latencyBucket = self->latencyBucket; state const Optional& debugID = self->debugID; state Span span("MP:preresolutionProcessing"_loc, self->span.context); state double timeStart = now(); if (self->localBatchNumber - self->pProxyCommitData->latestLocalCommitBatchResolving.get() > SERVER_KNOBS->RESET_MASTER_BATCHES && now() - self->pProxyCommitData->lastMasterReset > SERVER_KNOBS->RESET_MASTER_DELAY) { TraceEvent(SevWarnAlways, "ResetMasterNetwork", self->pProxyCommitData->dbgid) .detail("CurrentBatch", self->localBatchNumber) .detail("InProcessBatch", self->pProxyCommitData->latestLocalCommitBatchResolving.get()); FlowTransport::transport().resetConnection(self->pProxyCommitData->master.address()); self->pProxyCommitData->lastMasterReset = now(); } // Pre-resolution the commits TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1); // Wait for local batch wait(pProxyCommitData->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber - 1)); double queuingDelay = g_network->now() - timeStart; pProxyCommitData->stats.commitBatchQueuingDist->sampleSeconds(queuingDelay); if ((queuingDelay > (double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND || (g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01))) && SERVER_KNOBS->PROXY_REJECT_BATCH_QUEUED_TOO_LONG && canReject(trs)) { // Disabled for the recovery transaction. otherwise, recovery can't finish and keeps doing more recoveries. TEST(true); // Reject transactions in the batch TraceEvent(SevWarnAlways, "ProxyReject", pProxyCommitData->dbgid) .suppressFor(0.1) .detail("QDelay", queuingDelay) .detail("Transactions", trs.size()) .detail("BatchNumber", localBatchNumber); ASSERT(pProxyCommitData->latestLocalCommitBatchResolving.get() == localBatchNumber - 1); pProxyCommitData->latestLocalCommitBatchResolving.set(localBatchNumber); wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1)); ASSERT(pProxyCommitData->latestLocalCommitBatchLogging.get() == localBatchNumber - 1); pProxyCommitData->latestLocalCommitBatchLogging.set(localBatchNumber); for (const auto& tr : trs) { tr.reply.sendError(transaction_too_old()); } ++pProxyCommitData->stats.commitBatchOut; pProxyCommitData->stats.txnCommitOut += trs.size(); pProxyCommitData->stats.txnRejectedForQueuedTooLong += trs.size(); self->rejected = true; return Void(); } self->releaseDelay = delay(std::min(SERVER_KNOBS->MAX_PROXY_COMPUTE, self->batchOperations * pProxyCommitData->commitComputePerOperation[latencyBucket]), TaskPriority::ProxyMasterVersionReply); if (debugID.present()) { g_traceBatch.addEvent( "CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.GettingCommitVersion"); } if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { self->writtenTagsPreResolution = self->getWrittenTagsPreResolution(); if (self->hasMetadataMutation) { int numLogs = pProxyCommitData->db->get().logSystemConfig.numLogs(); for (int i = 0; i < numLogs; i++) { self->writtenTLogs.insert(i); } } } GetCommitVersionRequest req(span.context, pProxyCommitData->commitVersionRequestNumber++, pProxyCommitData->mostRecentProcessedRequestNumber, pProxyCommitData->dbgid, self->writtenTLogs); state double beforeGettingCommitVersion = now(); GetCommitVersionReply versionReply = wait(brokenPromiseToNever( pProxyCommitData->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply))); if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { self->tpcvMap = versionReply.tpcvMap; } pProxyCommitData->mostRecentProcessedRequestNumber = versionReply.requestNum; pProxyCommitData->stats.txnCommitVersionAssigned += trs.size(); pProxyCommitData->stats.lastCommitVersionAssigned = versionReply.version; pProxyCommitData->stats.getCommitVersionDist->sampleSeconds(now() - beforeGettingCommitVersion); self->commitVersion = versionReply.version; self->prevVersion = versionReply.prevVersion; for (auto it : versionReply.resolverChanges) { auto rs = pProxyCommitData->keyResolvers.modify(it.range); for (auto r = rs.begin(); r != rs.end(); ++r) r->value().emplace_back(versionReply.resolverChangesVersion, it.dest); } //TraceEvent("ProxyGotVer", pProxyContext->dbgid).detail("Commit", commitVersion).detail("Prev", prevVersion); if (debugID.present()) { g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.GotCommitVersion"); } return Void(); } ACTOR Future getResolution(CommitBatchContext* self) { state double resolutionStart = now(); // Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with // resolution processing but is still using CPU ProxyCommitData* pProxyCommitData = self->pProxyCommitData; std::vector& trs = self->trs; state Span span("MP:getResolution"_loc, self->span.context); ResolutionRequestBuilder requests( pProxyCommitData, self->commitVersion, self->prevVersion, pProxyCommitData->version, span); int conflictRangeCount = 0; self->maxTransactionBytes = 0; for (int t = 0; t < trs.size(); t++) { requests.addTransaction(trs[t], t); conflictRangeCount += trs[t].transaction.read_conflict_ranges.size() + trs[t].transaction.write_conflict_ranges.size(); //TraceEvent("MPTransactionDump", self->dbgid).detail("Snapshot", trs[t].transaction.read_snapshot); // for(auto& m : trs[t].transaction.mutations) self->maxTransactionBytes = std::max(self->maxTransactionBytes, trs[t].transaction.expectedSize()); // TraceEvent("MPTransactionsDump", self->dbgid).detail("Mutation", m.toString()); } pProxyCommitData->stats.conflictRanges += conflictRangeCount; for (int r = 1; r < pProxyCommitData->resolvers.size(); r++) ASSERT(requests.requests[r].txnStateTransactions.size() == requests.requests[0].txnStateTransactions.size()); pProxyCommitData->stats.txnCommitResolving += trs.size(); std::vector> replies; for (int r = 0; r < pProxyCommitData->resolvers.size(); r++) { requests.requests[r].debugID = self->debugID; replies.push_back(trackResolutionMetrics(pProxyCommitData->stats.resolverDist[r], brokenPromiseToNever(pProxyCommitData->resolvers[r].resolve.getReply( requests.requests[r], TaskPriority::ProxyResolverReply)))); } self->transactionResolverMap.swap(requests.transactionResolverMap); // Used to report conflicting keys self->txReadConflictRangeIndexMap.swap(requests.txReadConflictRangeIndexMap); self->releaseFuture = releaseResolvingAfter(pProxyCommitData, self->releaseDelay, self->localBatchNumber); if (self->localBatchNumber - self->pProxyCommitData->latestLocalCommitBatchLogging.get() > SERVER_KNOBS->RESET_RESOLVER_BATCHES && now() - self->pProxyCommitData->lastResolverReset > SERVER_KNOBS->RESET_RESOLVER_DELAY) { for (int r = 0; r < self->pProxyCommitData->resolvers.size(); r++) { TraceEvent(SevWarnAlways, "ResetResolverNetwork", self->pProxyCommitData->dbgid) .detail("PeerAddr", self->pProxyCommitData->resolvers[r].address()) .detail("CurrentBatch", self->localBatchNumber) .detail("InProcessBatch", self->pProxyCommitData->latestLocalCommitBatchLogging.get()); FlowTransport::transport().resetConnection(self->pProxyCommitData->resolvers[r].address()); } self->pProxyCommitData->lastResolverReset = now(); } // Wait for the final resolution std::vector resolutionResp = wait(getAll(replies)); self->resolution.swap(*const_cast*>(&resolutionResp)); self->pProxyCommitData->stats.resolutionDist->sampleSeconds(now() - resolutionStart); if (self->debugID.present()) { g_traceBatch.addEvent( "CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution"); } return Void(); } void assertResolutionStateMutationsSizeConsistent(const std::vector& resolution) { for (int r = 1; r < resolution.size(); r++) { ASSERT(resolution[r].stateMutations.size() == resolution[0].stateMutations.size()); for (int s = 0; s < resolution[r].stateMutations.size(); s++) { ASSERT(resolution[r].stateMutations[s].size() == resolution[0].stateMutations[s].size()); } } } // Compute and apply "metadata" effects of each other proxy's most recent batch void applyMetadataEffect(CommitBatchContext* self) { bool initialState = self->isMyFirstBatch; self->firstStateMutations = self->isMyFirstBatch; for (int versionIndex = 0; versionIndex < self->resolution[0].stateMutations.size(); versionIndex++) { // pProxyCommitData->logAdapter->setNextVersion( ??? ); << Ideally we would be telling the log adapter that the // pushes in this commit will be in the version at which these state mutations were committed by another proxy, // but at present we don't have that information here. So the disk queue may be unnecessarily conservative // about popping. for (int transactionIndex = 0; transactionIndex < self->resolution[0].stateMutations[versionIndex].size() && !self->forceRecovery; transactionIndex++) { bool committed = true; for (int resolver = 0; resolver < self->resolution.size(); resolver++) committed = committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed; if (committed) { applyMetadataMutations(SpanID(), *self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem, self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations, /* pToCommit= */ nullptr, self->forceRecovery, /* popVersion= */ 0, /* initialCommit */ false); if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && containsMetadataMutation( self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations)) { self->metadataMutationFromProxy = true; } } if (self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations.size() && self->firstStateMutations) { ASSERT(committed); self->firstStateMutations = false; self->forceRecovery = false; } } // These changes to txnStateStore will be committed by the other proxy, so we simply discard the commit message auto fcm = self->pProxyCommitData->logAdapter->getCommitMessage(); self->storeCommits.emplace_back(fcm, self->pProxyCommitData->txnStateStore->commit()); if (initialState) { initialState = false; self->forceRecovery = false; self->pProxyCommitData->txnStateStore->resyncLog(); for (auto& p : self->storeCommits) { ASSERT(!p.second.isReady()); p.first.get().acknowledge.send(Void()); ASSERT(p.second.isReady()); } self->storeCommits.clear(); } } } /// Determine which transactions actually committed (conservatively) by combining results from the resolvers void determineCommittedTransactions(CommitBatchContext* self) { auto pProxyCommitData = self->pProxyCommitData; const auto& trs = self->trs; ASSERT(self->transactionResolverMap.size() == self->committed.size()); // For each commitTransactionRef, it is only sent to resolvers specified in transactionResolverMap // Thus, we use this nextTr to track the correct transaction index on each resolver. self->nextTr.resize(self->resolution.size()); for (int t = 0; t < trs.size(); t++) { uint8_t commit = ConflictBatch::TransactionCommitted; for (int r : self->transactionResolverMap[t]) { commit = std::min(self->resolution[r].committed[self->nextTr[r]++], commit); } self->committed[t] = commit; } for (int r = 0; r < self->resolution.size(); r++) ASSERT(self->nextTr[r] == self->resolution[r].committed.size()); pProxyCommitData->logAdapter->setNextVersion(self->commitVersion); self->lockedKey = pProxyCommitData->txnStateStore->readValue(databaseLockedKey).get(); self->locked = self->lockedKey.present() && self->lockedKey.get().size(); const Optional mustContainSystemKey = pProxyCommitData->txnStateStore->readValue(mustContainSystemMutationsKey).get(); if (mustContainSystemKey.present() && mustContainSystemKey.get().size()) { for (int t = 0; t < trs.size(); t++) { if (self->committed[t] == ConflictBatch::TransactionCommitted) { bool foundSystem = false; for (auto& m : trs[t].transaction.mutations) { if ((m.type == MutationRef::ClearRange ? m.param2 : m.param1) >= nonMetadataSystemKeys.end) { foundSystem = true; break; } } if (!foundSystem) { self->committed[t] = ConflictBatch::TransactionConflict; } } } } } // This first pass through committed transactions deals with "metadata" effects (modifications of txnStateStore, changes // to storage servers' responsibilities) ACTOR Future applyMetadataToCommittedTransactions(CommitBatchContext* self) { auto pProxyCommitData = self->pProxyCommitData; const auto& trs = self->trs; int t; for (t = 0; t < trs.size() && !self->forceRecovery; t++) { if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || trs[t].isLockAware())) { self->commitCount++; applyMetadataMutations(trs[t].spanContext, *pProxyCommitData, self->arena, pProxyCommitData->logSystem, trs[t].transaction.mutations, &self->toCommit, self->forceRecovery, self->commitVersion + 1, /* initialCommit= */ false); } if (self->firstStateMutations) { ASSERT(self->committed[t] == ConflictBatch::TransactionCommitted); self->firstStateMutations = false; self->forceRecovery = false; } } auto privateMutations = self->toCommit.getAllMessages(); ResolveTransactionBatchReply& reply = self->resolution[0]; ASSERT_WE_THINK(privateMutations.size() == reply.privateMutations.size()); for (int i = 0; i < privateMutations.size(); i++) { // std::cout << i << "\n" << printable(privateMutations[i]) << "\n" << printable(reply.privateMutations[i]) << "\n\n"; ASSERT_WE_THINK(privateMutations[i] == reply.privateMutations[i]); } if (self->forceRecovery) { for (; t < trs.size(); t++) self->committed[t] = ConflictBatch::TransactionConflict; TraceEvent(SevWarn, "RestartingTxnSubsystem", pProxyCommitData->dbgid).detail("Stage", "AwaitCommit"); } self->lockedKey = pProxyCommitData->txnStateStore->readValue(databaseLockedKey).get(); self->lockedAfter = self->lockedKey.present() && self->lockedKey.get().size(); self->metadataVersionAfter = pProxyCommitData->txnStateStore->readValue(metadataVersionKey).get(); auto fcm = pProxyCommitData->logAdapter->getCommitMessage(); self->storeCommits.emplace_back(fcm, pProxyCommitData->txnStateStore->commit()); pProxyCommitData->version = self->commitVersion; if (!pProxyCommitData->validState.isSet()) pProxyCommitData->validState.send(Void()); ASSERT(self->commitVersion); if (!self->isMyFirstBatch && pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get() != self->oldCoordinators.get()) { wait(brokenPromiseToNever(pProxyCommitData->master.changeCoordinators.getReply( ChangeCoordinatorsRequest(pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get())))); ASSERT(false); // ChangeCoordinatorsRequest should always throw } return Void(); } ACTOR Future getTPCV(CommitBatchContext* self, std::set writtenTLogs) { state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; GetTLogPrevCommitVersionReply rep = wait(brokenPromiseToNever(pProxyCommitData->master.getTLogPrevCommitVersion.getReply( GetTLogPrevCommitVersionRequest(writtenTLogs, self->commitVersion, self->prevVersion)))); // TraceEvent("GetTLogPrevCommitVersionRequest"); self->tpcvMap.insert(rep.tpcvMap.begin(), rep.tpcvMap.end()); return Void(); } /// This second pass through committed transactions assigns the actual mutations to the appropriate storage servers' /// tags ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; state std::vector& trs = self->trs; for (; self->transactionNum < trs.size(); self->transactionNum++) { if (!(self->committed[self->transactionNum] == ConflictBatch::TransactionCommitted && (!self->locked || trs[self->transactionNum].isLockAware()))) { continue; } state bool checkSample = trs[self->transactionNum].commitCostEstimation.present(); state Optional* trCost = &trs[self->transactionNum].commitCostEstimation; state int mutationNum = 0; state VectorRef* pMutations = &trs[self->transactionNum].transaction.mutations; self->toCommit.addTransactionInfo(trs[self->transactionNum].spanContext); for (; mutationNum < pMutations->size(); mutationNum++) { if (self->yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) { self->yieldBytes = 0; if (g_network->check_yield(TaskPriority::ProxyCommitYield1)) { self->computeDuration += g_network->timer() - self->computeStart; wait(delay(0, TaskPriority::ProxyCommitYield1)); self->computeStart = g_network->timer(); } } auto& m = (*pMutations)[mutationNum]; self->mutationCount++; self->mutationBytes += m.expectedSize(); self->yieldBytes += m.expectedSize(); // Determine the set of tags (responsible storage servers) for the mutation, splitting it // if necessary. Serialize (splits of) the mutation into the message buffer and add the tags. if (isSingleKeyMutation((MutationRef::Type)m.type)) { auto& tags = pProxyCommitData->tagsForKey(m.param1); // sample single key mutation based on cost // the expectation of sampling is every COMMIT_SAMPLE_COST sample once if (checkSample) { double totalCosts = trCost->get().writeCosts; double cost = getWriteOperationCost(m.expectedSize()); double mul = std::max(1.0, totalCosts / std::max(1.0, (double)CLIENT_KNOBS->COMMIT_SAMPLE_COST)); ASSERT(totalCosts > 0); double prob = mul * cost / totalCosts; if (deterministicRandom()->random01() < prob) { for (const auto& ssInfo : pProxyCommitData->keyInfo[m.param1].src_info) { auto id = ssInfo->interf.id(); // scale cost cost = cost < CLIENT_KNOBS->COMMIT_SAMPLE_COST ? CLIENT_KNOBS->COMMIT_SAMPLE_COST : cost; pProxyCommitData->updateSSTagCost(id, trs[self->transactionNum].tagSet.get(), m, cost); } } } if (pProxyCommitData->singleKeyMutationEvent->enabled) { KeyRangeRef shard = pProxyCommitData->keyInfo.rangeContaining(m.param1).range(); pProxyCommitData->singleKeyMutationEvent->tag1 = (int64_t)tags[0].id; pProxyCommitData->singleKeyMutationEvent->tag2 = (int64_t)tags[1].id; pProxyCommitData->singleKeyMutationEvent->tag3 = (int64_t)tags[2].id; pProxyCommitData->singleKeyMutationEvent->shardBegin = shard.begin; pProxyCommitData->singleKeyMutationEvent->shardEnd = shard.end; pProxyCommitData->singleKeyMutationEvent->log(); } DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid).detail("To", tags); self->toCommit.addTags(tags); if (pProxyCommitData->cacheInfo[m.param1]) { self->toCommit.addTag(cacheTag); } self->toCommit.writeTypedMessage(m); } else if (m.type == MutationRef::ClearRange) { KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2)); auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange); auto firstRange = ranges.begin(); ++firstRange; if (firstRange == ranges.end()) { // Fast path DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid) .detail("To", ranges.begin().value().tags); ranges.begin().value().populateTags(); self->toCommit.addTags(ranges.begin().value().tags); // check whether clear is sampled if (checkSample && !trCost->get().clearIdxCosts.empty() && trCost->get().clearIdxCosts[0].first == mutationNum) { for (const auto& ssInfo : ranges.begin().value().src_info) { auto id = ssInfo->interf.id(); pProxyCommitData->updateSSTagCost( id, trs[self->transactionNum].tagSet.get(), m, trCost->get().clearIdxCosts[0].second); } trCost->get().clearIdxCosts.pop_front(); } } else { TEST(true); // A clear range extends past a shard boundary std::set allSources; for (auto r : ranges) { r.value().populateTags(); allSources.insert(r.value().tags.begin(), r.value().tags.end()); // check whether clear is sampled if (checkSample && !trCost->get().clearIdxCosts.empty() && trCost->get().clearIdxCosts[0].first == mutationNum) { for (const auto& ssInfo : r.value().src_info) { auto id = ssInfo->interf.id(); pProxyCommitData->updateSSTagCost(id, trs[self->transactionNum].tagSet.get(), m, trCost->get().clearIdxCosts[0].second); } trCost->get().clearIdxCosts.pop_front(); } } DEBUG_MUTATION("ProxyCommit", self->commitVersion, m) .detail("Dbgid", pProxyCommitData->dbgid) .detail("To", allSources) .detail("Mutation", m); self->toCommit.addTags(allSources); } if (pProxyCommitData->needsCacheTag(clearRange)) { self->toCommit.addTag(cacheTag); } self->toCommit.writeTypedMessage(m); } else { UNREACHABLE(); } // Check on backing up key, if backup ranges are defined and a normal key if (!(pProxyCommitData->vecBackupKeys.size() > 1 && (normalKeys.contains(m.param1) || m.param1 == metadataVersionKey))) { continue; } if (m.type != MutationRef::Type::ClearRange) { // Add the mutation to the relevant backup tag for (auto backupName : pProxyCommitData->vecBackupKeys[m.param1]) { self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena, m); } } else { KeyRangeRef mutationRange(m.param1, m.param2); KeyRangeRef intersectionRange; // Identify and add the intersecting ranges of the mutation to the array of mutations to serialize for (auto backupRange : pProxyCommitData->vecBackupKeys.intersectingRanges(mutationRange)) { // Get the backup sub range const auto& backupSubrange = backupRange.range(); // Determine the intersecting range intersectionRange = mutationRange & backupSubrange; // Create the custom mutation for the specific backup tag MutationRef backupMutation( MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end); // Add the mutation to the relevant backup tag for (auto backupName : backupRange.value()) { self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena, backupMutation); } } } } if (checkSample) { self->pProxyCommitData->stats.txnExpensiveClearCostEstCount += trs[self->transactionNum].commitCostEstimation.get().expensiveCostEstCount; } } return Void(); } ACTOR Future postResolution(CommitBatchContext* self) { state double postResolutionStart = now(); state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; state std::vector& trs = self->trs; state const int64_t localBatchNumber = self->localBatchNumber; state const Optional& debugID = self->debugID; state Span span("MP:postResolution"_loc, self->span.context); bool queuedCommits = pProxyCommitData->latestLocalCommitBatchLogging.get() < localBatchNumber - 1; TEST(queuedCommits); // Queuing post-resolution commit processing wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1)); state double postResolutionQueuing = now(); pProxyCommitData->stats.postResolutionDist->sampleSeconds(postResolutionQueuing - postResolutionStart); wait(yield(TaskPriority::ProxyCommitYield1)); self->computeStart = g_network->timer(); pProxyCommitData->stats.txnCommitResolved += trs.size(); if (debugID.present()) { g_traceBatch.addEvent( "CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.ProcessingMutations"); } self->isMyFirstBatch = !pProxyCommitData->version; self->oldCoordinators = pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get(); assertResolutionStateMutationsSizeConsistent(self->resolution); applyMetadataEffect(self); determineCommittedTransactions(self); if (self->forceRecovery) { wait(Future(Never())); } if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && self->metadataMutationFromProxy && !self->hasMetadataMutation) { // TraceEvent("Abort metadataMutationFromProxy"); for (int transactionNum = 0; transactionNum < trs.size(); transactionNum++) { self->committed[transactionNum] = ConflictBatch::TransactionConflict; } } // First pass wait(applyMetadataToCommittedTransactions(self)); // Second pass wait(assignMutationsToStorageServers(self)); self->toCommit.saveTags(self->writtenTags); if (self->writtenTags.size() && !self->metadataMutationFromProxy && SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { // confirm all serialized tags are sent to a tLog for which the previous commit version was obtained. std::set postResolutionTLogs; self->toCommit.getLocations(self->writtenTags, postResolutionTLogs); for (auto& t : postResolutionTLogs) { if (self->writtenTLogs.find(t) == self->writtenTLogs.end()) { TraceEvent(SevError, "TagHasNoPCV", pProxyCommitData->dbgid) .detail("tagsBeforeResolution", self->writtenTagsPreResolution) .detail("tagsAfterResolution", self->writtenTags) .detail("numTLogsBeforeResolution", self->writtenTLogs.size()) .detail("numTLogsAfterResolution", postResolutionTLogs.size()) .detail("hasMetadataMutation", self->hasMetadataMutation) .detail("metadataMutationFromProxy", self->metadataMutationFromProxy); ASSERT(false); } } } // Serialize and backup the mutations as a single mutation if ((pProxyCommitData->vecBackupKeys.size() > 1) && self->logRangeMutations.size()) { wait(addBackupMutations(pProxyCommitData, &self->logRangeMutations, &self->toCommit, self->commitVersion, &self->computeDuration, &self->computeStart)); } pProxyCommitData->stats.mutations += self->mutationCount; pProxyCommitData->stats.mutationBytes += self->mutationBytes; // Storage servers mustn't make durable versions which are not fully committed (because then they are impossible to // roll back) We prevent this by limiting the number of versions which are semi-committed but not fully committed to // be less than the MVCC window if (pProxyCommitData->committedVersion.get() < self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) { self->computeDuration += g_network->timer() - self->computeStart; state Span waitVersionSpan; while (pProxyCommitData->committedVersion.get() < self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) { // This should be *extremely* rare in the real world, but knob buggification should make it happen in // simulation TEST(true); // Semi-committed pipeline limited by MVCC window //TraceEvent("ProxyWaitingForCommitted", pProxyCommitData->dbgid).detail("CommittedVersion", pProxyCommitData->committedVersion.get()).detail("NeedToCommit", commitVersion); waitVersionSpan = Span( deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, { span.context }); choose { when(wait(pProxyCommitData->committedVersion.whenAtLeast( self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) { wait(yield()); break; } when(wait(pProxyCommitData->cx->onProxiesChanged())) {} // @todo probably there is no need to get the (entire) version vector from the sequencer // in this case, and if so, consider adding a flag to the request to tell the sequencer // to not send the version vector information. when(GetRawCommittedVersionReply v = wait(pProxyCommitData->master.getLiveCommittedVersion.getReply( GetRawCommittedVersionRequest(waitVersionSpan.context, debugID, invalidVersion), TaskPriority::GetLiveCommittedVersionReply))) { if (v.version > pProxyCommitData->committedVersion.get()) { pProxyCommitData->locked = v.locked; pProxyCommitData->metadataVersion = v.metadataVersion; pProxyCommitData->committedVersion.set(v.version); } if (pProxyCommitData->committedVersion.get() < self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) wait(delay(SERVER_KNOBS->PROXY_SPIN_DELAY)); } } } waitVersionSpan = Span{}; self->computeStart = g_network->timer(); } self->msg = self->storeCommits.back().first.get(); if (self->debugID.present()) g_traceBatch.addEvent( "CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterStoreCommits"); // txnState (transaction subsystem state) tag: message extracted from log adapter bool firstMessage = true; for (auto m : self->msg.messages) { if (firstMessage) { self->toCommit.addTxsTag(); } self->toCommit.writeMessage(StringRef(m.begin(), m.size()), !firstMessage); firstMessage = false; } if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) debug_advanceMaxCommittedVersion(UID(), self->commitVersion); //< Is this valid? // TraceEvent("ProxyPush", pProxyCommitData->dbgid) // .detail("PrevVersion", self->prevVersion) // .detail("Version", self->commitVersion) // .detail("TransactionsSubmitted", trs.size()) // .detail("TransactionsCommitted", self->commitCount) // .detail("TxsPopTo", self->msg.popTo); if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) debug_advanceMaxCommittedVersion(UID(), self->commitVersion); self->commitStartTime = now(); pProxyCommitData->lastStartCommit = self->commitStartTime; Optional> tpcvMap = Optional>(); if (SERVER_KNOBS->ENABLE_VERSION_VECTOR) { tpcvMap = self->tpcvMap; } self->loggingComplete = pProxyCommitData->logSystem->push(self->prevVersion, self->commitVersion, pProxyCommitData->committedVersion.get(), pProxyCommitData->minKnownCommittedVersion, self->toCommit, span.context, self->debugID, tpcvMap); float ratio = self->toCommit.getEmptyMessageRatio(); pProxyCommitData->stats.commitBatchingEmptyMessageRatio.addMeasurement(ratio); if (!self->forceRecovery) { ASSERT(pProxyCommitData->latestLocalCommitBatchLogging.get() == self->localBatchNumber - 1); pProxyCommitData->latestLocalCommitBatchLogging.set(self->localBatchNumber); } self->computeDuration += g_network->timer() - self->computeStart; if (self->batchOperations > 0) { double computePerOperation = std::min(SERVER_KNOBS->MAX_COMPUTE_PER_OPERATION, self->computeDuration / self->batchOperations); if (computePerOperation <= pProxyCommitData->commitComputePerOperation[self->latencyBucket]) { pProxyCommitData->commitComputePerOperation[self->latencyBucket] = computePerOperation; } else { pProxyCommitData->commitComputePerOperation[self->latencyBucket] = SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE * computePerOperation + ((1.0 - SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE) * pProxyCommitData->commitComputePerOperation[self->latencyBucket]); } pProxyCommitData->stats.maxComputeNS = std::max(pProxyCommitData->stats.maxComputeNS, 1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]); pProxyCommitData->stats.minComputeNS = std::min(pProxyCommitData->stats.minComputeNS, 1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]); } pProxyCommitData->stats.processingMutationDist->sampleSeconds(now() - postResolutionQueuing); return Void(); } ACTOR Future transactionLogging(CommitBatchContext* self) { state double tLoggingStart = now(); state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; state Span span("MP:transactionLogging"_loc, self->span.context); try { choose { when(Version ver = wait(self->loggingComplete)) { pProxyCommitData->minKnownCommittedVersion = std::max(pProxyCommitData->minKnownCommittedVersion, ver); } when(wait(pProxyCommitData->committedVersion.whenAtLeast(self->commitVersion + 1))) {} } } catch (Error& e) { if (e.code() == error_code_broken_promise) { throw master_tlog_failed(); } throw; } pProxyCommitData->lastCommitLatency = now() - self->commitStartTime; pProxyCommitData->lastCommitTime = std::max(pProxyCommitData->lastCommitTime.get(), self->commitStartTime); wait(yield(TaskPriority::ProxyCommitYield2)); if (pProxyCommitData->popRemoteTxs && self->msg.popTo > (pProxyCommitData->txsPopVersions.size() ? pProxyCommitData->txsPopVersions.back().second : pProxyCommitData->lastTxsPop)) { if (pProxyCommitData->txsPopVersions.size() >= SERVER_KNOBS->MAX_TXS_POP_VERSION_HISTORY) { TraceEvent(SevWarnAlways, "DiscardingTxsPopHistory").suppressFor(1.0); pProxyCommitData->txsPopVersions.pop_front(); } pProxyCommitData->txsPopVersions.emplace_back(self->commitVersion, self->msg.popTo); } pProxyCommitData->logSystem->popTxs(self->msg.popTo); pProxyCommitData->stats.tlogLoggingDist->sampleSeconds(now() - tLoggingStart); return Void(); } ACTOR Future reply(CommitBatchContext* self) { state double replyStart = now(); state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; state Span span("MP:reply"_loc, self->span.context); const Optional& debugID = self->debugID; if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) debug_advanceMinCommittedVersion(UID(), self->commitVersion); // TraceEvent("ProxyPushed", pProxyCommitData->dbgid) // .detail("PrevVersion", self->prevVersion) // .detail("Version", self->commitVersion); if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.AfterLogPush"); for (auto& p : self->storeCommits) { ASSERT(!p.second.isReady()); p.first.get().acknowledge.send(Void()); ASSERT(p.second.isReady()); } // After logging finishes, we report the commit version to master so that every other proxy can get the most // up-to-date live committed version. We also maintain the invariant that master's committed version >= // self->committedVersion by reporting commit version first before updating self->committedVersion. Otherwise, a // client may get a commit version that the master is not aware of, and next GRV request may get a version less than // self->committedVersion. TEST(pProxyCommitData->committedVersion.get() > self->commitVersion); // later version was reported committed first if (self->commitVersion >= pProxyCommitData->committedVersion.get()) { state Optional> writtenTags; if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { writtenTags = self->writtenTags; } wait(pProxyCommitData->master.reportLiveCommittedVersion.getReply( ReportRawCommittedVersionRequest(self->commitVersion, self->lockedAfter, self->metadataVersionAfter, pProxyCommitData->minKnownCommittedVersion, self->prevVersion, writtenTags), TaskPriority::ProxyMasterVersionReply)); } if (self->commitVersion > pProxyCommitData->committedVersion.get()) { pProxyCommitData->locked = self->lockedAfter; pProxyCommitData->metadataVersion = self->metadataVersionAfter; pProxyCommitData->committedVersion.set(self->commitVersion); } if (self->forceRecovery) { TraceEvent(SevWarn, "RestartingTxnSubsystem", pProxyCommitData->dbgid).detail("Stage", "ProxyShutdown"); throw worker_removed(); } // Send replies to clients double endTime = g_network->timer(); // Reset all to zero, used to track the correct index of each commitTransacitonRef on each resolver std::fill(self->nextTr.begin(), self->nextTr.end(), 0); for (int t = 0; t < self->trs.size(); t++) { auto& tr = self->trs[t]; if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || tr.isLockAware())) { ASSERT_WE_THINK(self->commitVersion != invalidVersion); tr.reply.send(CommitID(self->commitVersion, t, self->metadataVersionAfter)); } else if (self->committed[t] == ConflictBatch::TransactionTooOld) { tr.reply.sendError(transaction_too_old()); } else { // If enable the option to report conflicting keys from resolvers, we send back all keyranges' indices // through CommitID if (tr.transaction.report_conflicting_keys) { Standalone> conflictingKRIndices; for (int resolverInd : self->transactionResolverMap[t]) { auto const& cKRs = self->resolution[resolverInd] .conflictingKeyRangeMap[self->nextTr[resolverInd]]; // nextTr[resolverInd] -> index of this // trs[t] on the resolver for (auto const& rCRIndex : cKRs) // read_conflict_range can change when sent to resolvers, mapping the index from resolver-side // to original index in commitTransactionRef conflictingKRIndices.push_back(conflictingKRIndices.arena(), self->txReadConflictRangeIndexMap[t][resolverInd][rCRIndex]); } // At least one keyRange index should be returned ASSERT(conflictingKRIndices.size()); tr.reply.send(CommitID( invalidVersion, t, Optional(), Optional>>(conflictingKRIndices))); } else { tr.reply.sendError(not_committed()); } } // Update corresponding transaction indices on each resolver for (int resolverInd : self->transactionResolverMap[t]) self->nextTr[resolverInd]++; // TODO: filter if pipelined with large commit const double duration = endTime - tr.requestTime(); pProxyCommitData->stats.commitLatencySample.addMeasurement(duration); if (pProxyCommitData->latencyBandConfig.present()) { bool filter = self->maxTransactionBytes > pProxyCommitData->latencyBandConfig.get().commitConfig.maxCommitBytes.orDefault( std::numeric_limits::max()); pProxyCommitData->stats.commitLatencyBands.addMeasurement(duration, filter); } } ++pProxyCommitData->stats.commitBatchOut; pProxyCommitData->stats.txnCommitOut += self->trs.size(); pProxyCommitData->stats.txnConflicts += self->trs.size() - self->commitCount; pProxyCommitData->stats.txnCommitOutSuccess += self->commitCount; if (now() - pProxyCommitData->lastCoalesceTime > SERVER_KNOBS->RESOLVER_COALESCE_TIME) { pProxyCommitData->lastCoalesceTime = now(); int lastSize = pProxyCommitData->keyResolvers.size(); auto rs = pProxyCommitData->keyResolvers.ranges(); Version oldestVersion = self->prevVersion - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS; for (auto r = rs.begin(); r != rs.end(); ++r) { while (r->value().size() > 1 && r->value()[1].first < oldestVersion) r->value().pop_front(); if (r->value().size() && r->value().front().first < oldestVersion) r->value().front().first = 0; } pProxyCommitData->keyResolvers.coalesce(allKeys); if (pProxyCommitData->keyResolvers.size() != lastSize) TraceEvent("KeyResolverSize", pProxyCommitData->dbgid) .detail("Size", pProxyCommitData->keyResolvers.size()); } // Dynamic batching for commits double target_latency = (now() - self->startTime) * SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION; pProxyCommitData->commitBatchInterval = std::max(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN, std::min(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MAX, target_latency * SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA + pProxyCommitData->commitBatchInterval * (1 - SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA))); pProxyCommitData->stats.commitBatchingWindowSize.addMeasurement(pProxyCommitData->commitBatchInterval); pProxyCommitData->commitBatchesMemBytesCount -= self->currentBatchMemBytesCount; ASSERT_ABORT(pProxyCommitData->commitBatchesMemBytesCount >= 0); wait(self->releaseFuture); pProxyCommitData->stats.replyCommitDist->sampleSeconds(now() - replyStart); return Void(); } } // namespace CommitBatch // Commit one batch of transactions trs ACTOR Future commitBatch(ProxyCommitData* self, std::vector* trs, int currentBatchMemBytesCount) { // WARNING: this code is run at a high priority (until the first delay(0)), so it needs to do as little work as // possible state CommitBatch::CommitBatchContext context(self, trs, currentBatchMemBytesCount); getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::Commit; // Active load balancing runs at a very high priority (to obtain accurate estimate of memory used by commit batches) // so we need to downgrade here wait(delay(0, TaskPriority::ProxyCommit)); context.pProxyCommitData->lastVersionTime = context.startTime; ++context.pProxyCommitData->stats.commitBatchIn; context.setupTraceBatch(); /////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined /// and *should* be available by now (unless empty commit); ordered; currently atomic but could yield) wait(CommitBatch::preresolutionProcessing(&context)); if (context.rejected) { self->commitBatchesMemBytesCount -= currentBatchMemBytesCount; return Void(); } /////// Phase 2: Resolution (waiting on the network; pipelined) wait(CommitBatch::getResolution(&context)); ////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but /// doesn't need to be) wait(CommitBatch::postResolution(&context)); /////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop above)) wait(CommitBatch::transactionLogging(&context)); /////// Phase 5: Replies (CPU bound; no particular order required, though ordered execution would be best for /// latency) wait(CommitBatch::reply(&context)); return Void(); } // Add tss mapping data to the reply, if any of the included storage servers have a TSS pair void maybeAddTssMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitData, std::unordered_set& included, UID ssId) { if (!included.count(ssId)) { auto mappingItr = commitData->tssMapping.find(ssId); if (mappingItr != commitData->tssMapping.end()) { reply.resultsTssMapping.push_back(*mappingItr); } included.insert(ssId); } } void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitData) { for (const auto& [_, shard] : reply.results) { for (auto& ssi : shard) { auto iter = commitData->storageCache.find(ssi.id()); ASSERT_WE_THINK(iter != commitData->storageCache.end()); reply.resultsTagMapping.emplace_back(ssi.id(), iter->second->tag); } } } ACTOR static Future doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) { // We can't respond to these requests until we have valid txnStateStore getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations; getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); wait(commitData->validState.getFuture()); wait(delay(0, TaskPriority::DefaultEndpoint)); std::unordered_set tssMappingsIncluded; GetKeyServerLocationsReply rep; if (!req.end.present()) { auto r = req.reverse ? commitData->keyInfo.rangeContainingKeyBefore(req.begin) : commitData->keyInfo.rangeContaining(req.begin); std::vector ssis; ssis.reserve(r.value().src_info.size()); for (auto& it : r.value().src_info) { ssis.push_back(it->interf); maybeAddTssMapping(rep, commitData, tssMappingsIncluded, it->interf.id()); } rep.results.emplace_back(r.range(), ssis); } else if (!req.reverse) { int count = 0; for (auto r = commitData->keyInfo.rangeContaining(req.begin); r != commitData->keyInfo.ranges().end() && count < req.limit && r.begin() < req.end.get(); ++r) { std::vector ssis; ssis.reserve(r.value().src_info.size()); for (auto& it : r.value().src_info) { ssis.push_back(it->interf); maybeAddTssMapping(rep, commitData, tssMappingsIncluded, it->interf.id()); } rep.results.emplace_back(r.range(), ssis); count++; } } else { int count = 0; auto r = commitData->keyInfo.rangeContainingKeyBefore(req.end.get()); while (count < req.limit && req.begin < r.end()) { std::vector ssis; ssis.reserve(r.value().src_info.size()); for (auto& it : r.value().src_info) { ssis.push_back(it->interf); maybeAddTssMapping(rep, commitData, tssMappingsIncluded, it->interf.id()); } rep.results.emplace_back(r.range(), ssis); if (r == commitData->keyInfo.ranges().begin()) { break; } count++; --r; } } addTagMapping(rep, commitData); req.reply.send(rep); ++commitData->stats.keyServerLocationOut; return Void(); } ACTOR static Future readRequestServer(CommitProxyInterface proxy, PromiseStream> addActor, ProxyCommitData* commitData) { loop { GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture()); // WARNING: this code is run at a high priority, so it needs to do as little work as possible if (req.limit != CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT && // Always do data distribution requests commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() > SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE) { ++commitData->stats.keyServerLocationErrors; req.reply.sendError(proxy_memory_limit_exceeded()); TraceEvent(SevWarnAlways, "ProxyLocationRequestThresholdExceeded").suppressFor(60); } else { ++commitData->stats.keyServerLocationIn; addActor.send(doKeyServerLocationRequest(req, commitData)); } } } ACTOR static Future rejoinServer(CommitProxyInterface proxy, ProxyCommitData* commitData) { // We can't respond to these requests until we have valid txnStateStore wait(commitData->validState.getFuture()); TraceEvent("ProxyReadyForReads", proxy.id()).log(); loop { GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture()); if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) { GetStorageServerRejoinInfoReply rep; rep.version = commitData->version; rep.tag = decodeServerTagValue(commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get()); RangeResult history = commitData->txnStateStore->readRange(serverTagHistoryRangeFor(req.id)).get(); for (int i = history.size() - 1; i >= 0; i--) { rep.history.push_back( std::make_pair(decodeServerTagHistoryKey(history[i].key), decodeServerTagValue(history[i].value))); } auto localityKey = commitData->txnStateStore->readValue(tagLocalityListKeyFor(req.dcId)).get(); rep.newLocality = false; if (localityKey.present()) { int8_t locality = decodeTagLocalityListValue(localityKey.get()); if (rep.tag.locality != tagLocalityUpgraded && locality != rep.tag.locality) { TraceEvent(SevWarnAlways, "SSRejoinedWithChangedLocality") .detail("Tag", rep.tag.toString()) .detail("DcId", req.dcId) .detail("NewLocality", locality); } else if (locality != rep.tag.locality) { uint16_t tagId = 0; std::vector usedTags; auto tagKeys = commitData->txnStateStore->readRange(serverTagKeys).get(); for (auto& kv : tagKeys) { Tag t = decodeServerTagValue(kv.value); if (t.locality == locality) { usedTags.push_back(t.id); } } auto historyKeys = commitData->txnStateStore->readRange(serverTagHistoryKeys).get(); for (auto& kv : historyKeys) { Tag t = decodeServerTagValue(kv.value); if (t.locality == locality) { usedTags.push_back(t.id); } } std::sort(usedTags.begin(), usedTags.end()); int usedIdx = 0; for (; usedTags.size() > 0 && tagId <= usedTags.end()[-1]; tagId++) { if (tagId < usedTags[usedIdx]) { break; } else { usedIdx++; } } rep.newTag = Tag(locality, tagId); } } else if (rep.tag.locality != tagLocalityUpgraded) { TraceEvent(SevWarnAlways, "SSRejoinedWithUnknownLocality") .detail("Tag", rep.tag.toString()) .detail("DcId", req.dcId); } else { rep.newLocality = true; int8_t maxTagLocality = -1; auto localityKeys = commitData->txnStateStore->readRange(tagLocalityListKeys).get(); for (auto& kv : localityKeys) { maxTagLocality = std::max(maxTagLocality, decodeTagLocalityListValue(kv.value)); } rep.newTag = Tag(maxTagLocality + 1, 0); } req.reply.send(rep); } else { req.reply.sendError(worker_removed()); } } } ACTOR Future ddMetricsRequestServer(CommitProxyInterface proxy, Reference const> db) { loop { choose { when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture())) { if (!db->get().distributor.present()) { req.reply.sendError(dd_not_found()); continue; } ErrorOr reply = wait(errorOr(db->get().distributor.get().dataDistributorMetrics.getReply( GetDataDistributorMetricsRequest(req.keys, req.shardLimit)))); if (reply.isError()) { req.reply.sendError(reply.getError()); } else { GetDDMetricsReply newReply; newReply.storageMetricsList = reply.get().storageMetricsList; req.reply.send(newReply); } } } } } ACTOR Future monitorRemoteCommitted(ProxyCommitData* self) { loop { wait(delay(0)); // allow this actor to be cancelled if we are removed after db changes. state Optional>> remoteLogs; if (self->db->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) { for (auto& logSet : self->db->get().logSystemConfig.tLogs) { if (!logSet.isLocal) { remoteLogs = logSet.tLogs; for (auto& tLog : logSet.tLogs) { if (!tLog.present()) { remoteLogs = Optional>>(); break; } } break; } } } if (!remoteLogs.present()) { wait(self->db->onChange()); continue; } self->popRemoteTxs = true; state Future onChange = self->db->onChange(); loop { state std::vector> replies; for (auto& it : remoteLogs.get()) { replies.push_back( brokenPromiseToNever(it.interf().getQueuingMetrics.getReply(TLogQueuingMetricsRequest()))); } wait(waitForAll(replies) || onChange); if (onChange.isReady()) { break; } // FIXME: use the configuration to calculate a more precise minimum recovery version. Version minVersion = std::numeric_limits::max(); for (auto& it : replies) { minVersion = std::min(minVersion, it.get().v); } while (self->txsPopVersions.size() && self->txsPopVersions.front().first <= minVersion) { self->lastTxsPop = self->txsPopVersions.front().second; self->logSystem->popTxs(self->txsPopVersions.front().second, tagLocalityRemoteLog); self->txsPopVersions.pop_front(); } wait(delay(SERVER_KNOBS->UPDATE_REMOTE_LOG_VERSION_INTERVAL) || onChange); if (onChange.isReady()) { break; } } } } ACTOR Future proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* commitData) { TraceEvent("SnapCommitProxy_SnapReqEnter") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); try { // whitelist check ExecCmdValueString execArg(snapReq.snapPayload); StringRef binPath = execArg.getBinaryPath(); if (!isWhitelisted(commitData->whitelistedBinPathVec, binPath)) { TraceEvent("SnapCommitProxy_WhiteListCheckFailed") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); throw snap_path_not_whitelisted(); } // db fully recovered check if (commitData->db->get().recoveryState != RecoveryState::FULLY_RECOVERED) { // Cluster is not fully recovered and needs TLogs // from previous generation for full recovery. // Currently, snapshot of old tlog generation is not // supported and hence failing the snapshot request until // cluster is fully_recovered. TraceEvent("SnapCommitProxy_ClusterNotFullyRecovered") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); throw snap_not_fully_recovered_unsupported(); } auto result = commitData->txnStateStore->readValue(LiteralStringRef("log_anti_quorum").withPrefix(configKeysPrefix)) .get(); int logAntiQuorum = 0; if (result.present()) { logAntiQuorum = atoi(result.get().toString().c_str()); } // FIXME: logAntiQuorum not supported, remove it later, // In version2, we probably don't need this limtiation, but this needs to be tested. if (logAntiQuorum > 0) { TraceEvent("SnapCommitProxy_LogAnitQuorumNotSupported") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); throw snap_log_anti_quorum_unsupported(); } // send a snap request to DD if (!commitData->db->get().distributor.present()) { TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "SnapRequest"); throw dd_not_found(); } state Future> ddSnapReq = commitData->db->get().distributor.get().distributorSnapReq.tryGetReply( DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID)); try { wait(throwErrorOr(ddSnapReq)); } catch (Error& e) { TraceEvent("SnapCommitProxy_DDSnapResponseError") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID) .error(e, true /*includeCancelled*/); throw e; } snapReq.reply.send(Void()); } catch (Error& e) { TraceEvent("SnapCommitProxy_SnapReqError") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID) .error(e, true /*includeCancelled*/); if (e.code() != error_code_operation_cancelled) { snapReq.reply.sendError(e); } else { throw e; } } TraceEvent("SnapCommitProxy_SnapReqExit") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); return Void(); } ACTOR Future proxyCheckSafeExclusion(Reference const> db, ExclusionSafetyCheckRequest req) { TraceEvent("SafetyCheckCommitProxyBegin").log(); state ExclusionSafetyCheckReply reply(false); if (!db->get().distributor.present()) { TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "ExclusionSafetyCheck"); req.reply.send(reply); return Void(); } try { state Future> safeFuture = db->get().distributor.get().distributorExclCheckReq.tryGetReply( DistributorExclusionSafetyCheckRequest(req.exclusions)); DistributorExclusionSafetyCheckReply _reply = wait(throwErrorOr(safeFuture)); reply.safe = _reply.safe; } catch (Error& e) { TraceEvent("SafetyCheckCommitProxyResponseError").error(e); if (e.code() != error_code_operation_cancelled) { req.reply.sendError(e); return Void(); } else { throw e; } } TraceEvent("SafetyCheckCommitProxyFinish").log(); req.reply.send(reply); return Void(); } ACTOR Future reportTxnTagCommitCost(UID myID, Reference const> db, UIDTransactionTagMap* ssTrTagCommitCost) { state Future nextRequestTimer = Never(); state Future nextReply = Never(); if (db->get().ratekeeper.present()) nextRequestTimer = Void(); loop choose { when(wait(db->onChange())) { if (db->get().ratekeeper.present()) { TraceEvent("ProxyRatekeeperChanged", myID).detail("RKID", db->get().ratekeeper.get().id()); nextRequestTimer = Void(); } else { TraceEvent("ProxyRatekeeperDied", myID).log(); nextRequestTimer = Never(); } } when(wait(nextRequestTimer)) { nextRequestTimer = Never(); if (db->get().ratekeeper.present()) { nextReply = brokenPromiseToNever(db->get().ratekeeper.get().reportCommitCostEstimation.getReply( ReportCommitCostEstimationRequest(*ssTrTagCommitCost))); } else { nextReply = Never(); } } when(wait(nextReply)) { nextReply = Never(); ssTrTagCommitCost->clear(); nextRequestTimer = delay(SERVER_KNOBS->REPORT_TRANSACTION_COST_ESTIMATION_DELAY); } } } namespace { struct TransactionStateResolveContext { // Maximum sequence for txnStateRequest, this is defined when the request last flag is set. Sequence maxSequence = std::numeric_limits::max(); // Flags marks received transaction state requests, we only process the transaction request when *all* requests are // received. std::unordered_set receivedSequences; ProxyCommitData* pCommitData = nullptr; // Pointer to transaction state store, shortcut for commitData.txnStateStore IKeyValueStore* pTxnStateStore = nullptr; // Actor streams PromiseStream>* pActors = nullptr; // Flag reports if the transaction state request is complete. This request should only happen during recover, i.e. // once per commit proxy. bool processed = false; TransactionStateResolveContext() = default; TransactionStateResolveContext(ProxyCommitData* pCommitData_, PromiseStream>* pActors_) : pCommitData(pCommitData_), pTxnStateStore(pCommitData_->txnStateStore), pActors(pActors_) { ASSERT(pTxnStateStore != nullptr); } }; ACTOR Future processCompleteTransactionStateRequest(TransactionStateResolveContext* pContext) { state KeyRange txnKeys = allKeys; state std::map tag_uid; RangeResult UIDtoTagMap = pContext->pTxnStateStore->readRange(serverTagKeys).get(); for (const KeyValueRef& kv : UIDtoTagMap) { tag_uid[decodeServerTagValue(kv.value)] = decodeServerTagKey(kv.key); } loop { wait(yield()); RangeResult data = pContext->pTxnStateStore ->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES) .get(); if (!data.size()) break; ((KeyRangeRef&)txnKeys) = KeyRangeRef(keyAfter(data.back().key, txnKeys.arena()), txnKeys.end); MutationsVec mutations; std::vector, int>> keyInfoData; std::vector src, dest; ServerCacheInfo info; // NOTE: An ACTOR will be compiled into several classes, the this pointer is from one of them. auto updateTagInfo = [this](const std::vector& uids, std::vector& tags, std::vector>& storageInfoItems) { for (const auto& id : uids) { auto storageInfo = getStorageInfo(id, &pContext->pCommitData->storageCache, pContext->pTxnStateStore); ASSERT(storageInfo->tag != invalidTag); tags.push_back(storageInfo->tag); storageInfoItems.push_back(storageInfo); } }; for (auto& kv : data) { if (!kv.key.startsWith(keyServersPrefix)) { mutations.emplace_back(mutations.arena(), MutationRef::SetValue, kv.key, kv.value); continue; } KeyRef k = kv.key.removePrefix(keyServersPrefix); if (k == allKeys.end) { continue; } decodeKeyServersValue(tag_uid, kv.value, src, dest); info.tags.clear(); info.src_info.clear(); updateTagInfo(src, info.tags, info.src_info); info.dest_info.clear(); updateTagInfo(dest, info.tags, info.dest_info); uniquify(info.tags); keyInfoData.emplace_back(MapPair(k, info), 1); } // insert keyTag data separately from metadata mutations so that we can do one bulk insert which // avoids a lot of map lookups. pContext->pCommitData->keyInfo.rawInsert(keyInfoData); Arena arena; bool confChanges; applyMetadataMutations(SpanID(), *pContext->pCommitData, arena, Reference(), mutations, /* pToCommit= */ nullptr, confChanges, /* popVersion= */ 0, /* initialCommit= */ true); } // loop auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get(); pContext->pCommitData->locked = lockedKey.present() && lockedKey.get().size(); pContext->pCommitData->metadataVersion = pContext->pTxnStateStore->readValue(metadataVersionKey).get(); pContext->pTxnStateStore->enableSnapshot(); return Void(); } ACTOR Future processTransactionStateRequestPart(TransactionStateResolveContext* pContext, TxnStateRequest request) { state const TxnStateRequest& req = request; state ProxyCommitData& commitData = *pContext->pCommitData; state PromiseStream>& addActor = *pContext->pActors; state Sequence& maxSequence = pContext->maxSequence; state ReplyPromise reply = req.reply; state std::unordered_set& txnSequences = pContext->receivedSequences; ASSERT(pContext->pCommitData != nullptr); ASSERT(pContext->pActors != nullptr); if (pContext->receivedSequences.count(request.sequence)) { // This part is already received. Still we will re-broadcast it to other CommitProxies pContext->pActors->send(broadcastTxnRequest(request, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true)); wait(yield()); return Void(); } if (request.last) { // This is the last piece of subsequence, yet other pieces might still on the way. pContext->maxSequence = request.sequence + 1; } pContext->receivedSequences.insert(request.sequence); // Although we may receive the CommitTransactionRequest for the recovery transaction before all of the // TxnStateRequest, we will not get a resolution result from any resolver until the master has submitted its initial // (sequence 0) resolution request, which it doesn't do until we have acknowledged all TxnStateRequests ASSERT(!pContext->pCommitData->validState.isSet()); for (auto& kv : request.data) { pContext->pTxnStateStore->set(kv, &request.arena); } pContext->pTxnStateStore->commit(true); if (pContext->receivedSequences.size() == pContext->maxSequence) { // Received all components of the txnStateRequest ASSERT(!pContext->processed); wait(processCompleteTransactionStateRequest(pContext)); pContext->processed = true; } pContext->pActors->send(broadcastTxnRequest(request, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true)); wait(yield()); return Void(); } } // anonymous namespace ACTOR Future commitProxyServerCore(CommitProxyInterface proxy, MasterInterface master, Reference const> db, LogEpoch epoch, Version recoveryTransactionVersion, bool firstProxy, std::string whitelistBinPaths) { state ProxyCommitData commitData( proxy.id(), master, proxy.getConsistentReadVersion, recoveryTransactionVersion, proxy.commit, db, firstProxy); state Future sequenceFuture = (Sequence)0; state PromiseStream, int>> batchedCommits; state Future commitBatcherActor; state Future lastCommitComplete = Void(); state PromiseStream> addActor; state Future onError = transformError(actorCollection(addActor.getFuture()), broken_promise(), master_tlog_failed()); state double lastCommit = 0; state GetHealthMetricsReply healthMetricsReply; state GetHealthMetricsReply detailedHealthMetricsReply; addActor.send(waitFailureServer(proxy.waitFailure.getFuture())); addActor.send(traceRole(Role::COMMIT_PROXY, proxy.id())); //TraceEvent("CommitProxyInit1", proxy.id()); // Wait until we can load the "real" logsystem, since we don't support switching them currently while (!(commitData.db->get().master.id() == master.id() && commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) { //TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch); wait(commitData.db->onChange()); } state Future dbInfoChange = commitData.db->onChange(); //TraceEvent("ProxyInit3", proxy.id()); commitData.resolvers = commitData.db->get().resolvers; ASSERT(commitData.resolvers.size() != 0); for (int i = 0; i < commitData.resolvers.size(); ++i) { commitData.stats.resolverDist.push_back( Histogram::getHistogram(LiteralStringRef("CommitProxy"), "ToResolver_" + commitData.resolvers[i].id().toString(), Histogram::Unit::microseconds)); } auto rs = commitData.keyResolvers.modify(allKeys); for (auto r = rs.begin(); r != rs.end(); ++r) r->value().emplace_back(0, 0); commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor); commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, Reference>(), 1, false); commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true); createWhitelistBinPathVec(whitelistBinPaths, commitData.whitelistedBinPathVec); commitData.updateLatencyBandConfig(commitData.db->get().latencyBandConfig); // ((SERVER_MEM_LIMIT * COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR) is // only a approximate formula for limiting the memory used. COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR is an // estimate based on experiments and not an accurate one. state int64_t commitBatchesMemoryLimit = std::min( SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, static_cast((SERVER_KNOBS->SERVER_MEM_LIMIT * SERVER_KNOBS->COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / SERVER_KNOBS->COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR)); TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit); addActor.send(monitorRemoteCommitted(&commitData)); addActor.send(readRequestServer(proxy, addActor, &commitData)); addActor.send(rejoinServer(proxy, &commitData)); addActor.send(ddMetricsRequestServer(proxy, db)); addActor.send(reportTxnTagCommitCost(proxy.id(), db, &commitData.ssTrTagCommitCost)); // wait for txnStateStore recovery wait(success(commitData.txnStateStore->readValue(StringRef()))); int commitBatchByteLimit = (int)std::min(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MAX, std::max(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MIN, SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE * pow(commitData.db->get().client.commitProxies.size(), SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER))); commitBatcherActor = commitBatcher( &commitData, batchedCommits, proxy.commit.getFuture(), commitBatchByteLimit, commitBatchesMemoryLimit); // This has to be declared after the commitData.txnStateStore get initialized state TransactionStateResolveContext transactionStateResolveContext(&commitData, &addActor); loop choose { when(wait(dbInfoChange)) { dbInfoChange = commitData.db->onChange(); if (commitData.db->get().master.id() == master.id() && commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) { commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor); for (auto it : commitData.tag_popped) { commitData.logSystem->pop(it.second, it.first); } commitData.logSystem->popTxs(commitData.lastTxsPop, tagLocalityRemoteLog); } commitData.updateLatencyBandConfig(commitData.db->get().latencyBandConfig); } when(wait(onError)) {} when(std::pair, int> batchedRequests = waitNext(batchedCommits.getFuture())) { // WARNING: this code is run at a high priority, so it needs to do as little work as possible const std::vector& trs = batchedRequests.first; int batchBytes = batchedRequests.second; //TraceEvent("CommitProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount); if (trs.size() || (commitData.db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) { lastCommit = now(); if (trs.size() || lastCommitComplete.isReady()) { lastCommitComplete = transformError( timeoutError( commitBatch(&commitData, const_cast*>(&batchedRequests.first), batchBytes), SERVER_KNOBS->COMMIT_PROXY_LIVENESS_TIMEOUT), timed_out(), failed_to_progress()); addActor.send(lastCommitComplete); } } } when(ProxySnapRequest snapReq = waitNext(proxy.proxySnapReq.getFuture())) { TraceEvent(SevDebug, "SnapMasterEnqueue").log(); addActor.send(proxySnapCreate(snapReq, &commitData)); } when(ExclusionSafetyCheckRequest exclCheckReq = waitNext(proxy.exclusionSafetyCheckReq.getFuture())) { addActor.send(proxyCheckSafeExclusion(db, exclCheckReq)); } when(TxnStateRequest request = waitNext(proxy.txnState.getFuture())) { addActor.send(processTransactionStateRequestPart(&transactionStateResolveContext, request)); } } } ACTOR Future checkRemoved(Reference const> db, uint64_t recoveryCount, CommitProxyInterface myInterface) { loop { if (db->get().recoveryCount >= recoveryCount && !std::count(db->get().client.commitProxies.begin(), db->get().client.commitProxies.end(), myInterface)) { throw worker_removed(); } wait(db->onChange()); } } ACTOR Future commitProxyServer(CommitProxyInterface proxy, InitializeCommitProxyRequest req, Reference const> db, std::string whitelistBinPaths) { try { state Future core = commitProxyServerCore(proxy, req.master, db, req.recoveryCount, req.recoveryTransactionVersion, req.firstProxy, whitelistBinPaths); wait(core || checkRemoved(db, req.recoveryCount, proxy)); } catch (Error& e) { TraceEvent("CommitProxyTerminated", proxy.id()).error(e, true); if (e.code() != error_code_worker_removed && e.code() != error_code_tlog_stopped && e.code() != error_code_master_tlog_failed && e.code() != error_code_coordinators_changed && e.code() != error_code_coordinated_state_conflict && e.code() != error_code_new_coordinators_timed_out && e.code() != error_code_failed_to_progress) { throw; } TEST(e.code() == error_code_failed_to_progress); // Commit proxy failed to progress } return Void(); }