diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index ea9701b61b..a523f5aaa7 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -399,6 +399,14 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel return Void(); } +ACTOR static Future<ResolveTransactionBatchReply> trackResolutionMetrics(Reference<Histogram> dist, + Future<ResolveTransactionBatchReply> in) { + state double startTime = now(); + ResolveTransactionBatchReply reply = wait(in); + dist->sampleSeconds(now() - startTime); + return reply; +} + namespace CommitBatch { struct CommitBatchContext { @@ -579,6 +587,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) { TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1); // Wait for local batch wait(pProxyCommitData->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber - 1)); double queuingDelay = g_network->now() - timeStart; + pProxyCommitData->stats.commitBatchQueuingDist->sampleSeconds(queuingDelay); if ((queuingDelay > (double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND || (g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01))) && SERVER_KNOBS->PROXY_REJECT_BATCH_QUEUED_TOO_LONG && canReject(trs)) { @@ -619,6 +628,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) { pProxyCommitData->commitVersionRequestNumber++, pProxyCommitData->mostRecentProcessedRequestNumber, pProxyCommitData->dbgid); + state double beforeGettingCommitVersion = now(); GetCommitVersionReply versionReply = wait(brokenPromiseToNever( pProxyCommitData->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply))); @@ -626,6 +636,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) { pProxyCommitData->stats.txnCommitVersionAssigned += trs.size(); pProxyCommitData->stats.lastCommitVersionAssigned = versionReply.version; + pProxyCommitData->stats.getCommitVersionDist->sampleSeconds(now() - beforeGettingCommitVersion); self->commitVersion = versionReply.version; self->prevVersion = versionReply.prevVersion; @@ -646,6 +657,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) { } ACTOR Future<Void> getResolution(CommitBatchContext* self) { + state double resolutionStart = now(); // Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with // resolution processing but is still using CPU ProxyCommitData* pProxyCommitData = self->pProxyCommitData; @@ -674,8 +686,9 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) { std::vector<Future<ResolveTransactionBatchReply>> replies; for (int r = 0; r < pProxyCommitData->resolvers.size(); r++) { requests.requests[r].debugID = self->debugID; - replies.push_back(brokenPromiseToNever( - pProxyCommitData->resolvers[r].resolve.getReply(requests.requests[r], TaskPriority::ProxyResolverReply))); + replies.push_back(trackResolutionMetrics(pProxyCommitData->stats.resolverDist[r], + brokenPromiseToNever(pProxyCommitData->resolvers[r].resolve.getReply( + requests.requests[r], TaskPriority::ProxyResolverReply)))); } self->transactionResolverMap.swap(requests.transactionResolverMap); @@ -700,6 +713,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) { std::vector<ResolveTransactionBatchReply> resolutionResp = wait(getAll(replies)); self->resolution.swap(*const_cast<std::vector<ResolveTransactionBatchReply>*>(&resolutionResp)); + self->pProxyCommitData->stats.resolutionDist->sampleSeconds(now() - resolutionStart); if (self->debugID.present()) { g_traceBatch.addEvent( "CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution"); @@ -1055,6 +1069,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) { } ACTOR Future<Void> postResolution(CommitBatchContext* self) { + state double postResolutionStart = now(); state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; state std::vector<CommitTransactionRequest>& trs = self->trs; state const int64_t localBatchNumber = self->localBatchNumber; @@ -1064,6 +1079,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) { bool queuedCommits = pProxyCommitData->latestLocalCommitBatchLogging.get() < localBatchNumber - 1; TEST(queuedCommits); // Queuing post-resolution commit processing wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1)); + state double postResolutionQueuing = now(); + pProxyCommitData->stats.postResolutionDist->sampleSeconds(postResolutionQueuing - postResolutionStart); wait(yield(TaskPriority::ProxyCommitYield1)); self->computeStart = g_network->timer(); @@ -1212,10 +1229,12 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) { 1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]); } + pProxyCommitData->stats.processingMutationDist->sampleSeconds(now() - postResolutionQueuing); return Void(); } ACTOR Future<Void> transactionLogging(CommitBatchContext* self) { + state double tLoggingStart = now(); state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; state Span span("MP:transactionLogging"_loc, self->span.context); @@ -1249,11 +1268,12 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) { pProxyCommitData->txsPopVersions.emplace_back(self->commitVersion, self->msg.popTo); } pProxyCommitData->logSystem->popTxs(self->msg.popTo); - + pProxyCommitData->stats.tlogLoggingDist->sampleSeconds(now() - tLoggingStart); return Void(); } ACTOR Future<Void> reply(CommitBatchContext* self) { + state double replyStart = now(); state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; state Span span("MP:reply"_loc, self->span.context); @@ -1385,7 +1405,7 @@ ACTOR Future<Void> reply(CommitBatchContext* self) { pProxyCommitData->commitBatchesMemBytesCount -= self->currentBatchMemBytesCount; ASSERT_ABORT(pProxyCommitData->commitBatchesMemBytesCount >= 0); wait(self->releaseFuture); - + pProxyCommitData->stats.replyCommitDist->sampleSeconds(now() - replyStart); return Void(); } @@ -1856,7 +1876,10 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy, commitData.resolvers = commitData.db->get().resolvers; ASSERT(commitData.resolvers.size() != 0); - + for (int i = 0; i < commitData.resolvers.size(); ++i) { + commitData.stats.resolverDist.push_back(Histogram::getHistogram( + LiteralStringRef("CommitProxy"), "ToResolver_" + commitData.resolvers[i].id().toString(), Histogram::Unit::microseconds)); + } auto rs = commitData.keyResolvers.modify(allKeys); for (auto r = rs.begin(); r != rs.end(); ++r) r->value().emplace_back(0, 0); diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 10665e8d30..28c706d67f 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -31,6 +31,7 @@ #include "fdbserver/MutationTracking.h" #include "flow/Arena.h" #include "flow/Error.h" +#include "flow/Histogram.h" #include "flow/IndexedSet.h" #include "flow/Knobs.h" #include "fdbrpc/ReplicationPolicy.h" @@ -57,6 +58,7 @@ public: std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logRouters; std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers; std::vector<Reference<ConnectionResetInfo>> connectionResetTrackers; + std::vector<Reference<Histogram>> tlogPushDistTrackers; int32_t tLogWriteAntiQuorum; int32_t tLogReplicationFactor; std::vector<LocalityData> tLogLocalities; // Stores the localities of the log servers diff --git a/fdbserver/ProxyCommitData.actor.h b/fdbserver/ProxyCommitData.actor.h index d625f1f508..7694b7e6c9 100644 --- a/fdbserver/ProxyCommitData.actor.h +++ b/fdbserver/ProxyCommitData.actor.h @@ -74,6 +74,15 @@ struct ProxyStats { int64_t maxComputeNS; int64_t minComputeNS; + Reference<Histogram> commitBatchQueuingDist; + Reference<Histogram> getCommitVersionDist; + std::vector<Reference<Histogram>> resolverDist; + Reference<Histogram> resolutionDist; + Reference<Histogram> postResolutionDist; + Reference<Histogram> processingMutationDist; + Reference<Histogram> tlogLoggingDist; + Reference<Histogram> replyCommitDist; + int64_t getAndResetMaxCompute() { int64_t r = maxComputeNS; maxComputeNS = 0; @@ -113,7 +122,28 @@ struct ProxyStats { id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), - maxComputeNS(0), minComputeNS(1e12) { + maxComputeNS(0), minComputeNS(1e12), + commitBatchQueuingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"), + LiteralStringRef("CommitBatchQueuing"), + Histogram::Unit::microseconds)), + getCommitVersionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"), + LiteralStringRef("GetCommitVersion"), + Histogram::Unit::microseconds)), + resolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"), + LiteralStringRef("Resolution"), + Histogram::Unit::microseconds)), + postResolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"), + LiteralStringRef("PostResolutionQueuing"), + Histogram::Unit::microseconds)), + processingMutationDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"), + LiteralStringRef("ProcessingMutation"), + Histogram::Unit::microseconds)), + tlogLoggingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"), + LiteralStringRef("TlogLogging"), + Histogram::Unit::microseconds)), + replyCommitDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"), + LiteralStringRef("ReplyCommit"), + Histogram::Unit::microseconds)) { specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; }); specialCounter(cc, "Version", [pVersion]() { return *pVersion; }); specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); }); diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index abb64b678a..f8d75e1c72 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -527,6 +527,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition } ACTOR static Future<TLogCommitReply> recordPushMetrics(Reference<ConnectionResetInfo> self, + Reference<Histogram> dist, NetworkAddress addr, Future<TLogCommitReply> in) { state double startTime = now(); @@ -541,6 +542,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition self->fastReplies++; } } + dist->sampleSeconds(now() - startTime); return t; } @@ -563,12 +565,21 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition it->connectionResetTrackers.push_back(makeReference<ConnectionResetInfo>()); } } + if (it->tlogPushDistTrackers.empty()) { + for (int i = 0; i < it->logServers.size(); i++) { + it->tlogPushDistTrackers.push_back( + Histogram::getHistogram("ToTlog_" + it->logServers[i]->get().interf().uniqueID.toString(), + it->logServers[i]->get().interf().address().toString(), + Histogram::Unit::microseconds)); + } + } vector<Future<Void>> tLogCommitResults; for (int loc = 0; loc < it->logServers.size(); loc++) { Standalone<StringRef> msg = data.getMessages(location); data.recordEmptyMessage(location, msg); allReplies.push_back(recordPushMetrics( it->connectionResetTrackers[loc], + it->tlogPushDistTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply(TLogCommitRequest(spanContext, msg.arena(),