Add histograms to CommitProxyServer. (#5299)

This commit is contained in:
yao-xiao-github 2021-08-05 09:17:37 -07:00 committed by GitHub
parent f6beda6623
commit 8609b45354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 6 deletions

View File

@ -399,6 +399,14 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
return Void();
}
ACTOR static Future<ResolveTransactionBatchReply> trackResolutionMetrics(Reference<Histogram> dist,
Future<ResolveTransactionBatchReply> in) {
state double startTime = now();
ResolveTransactionBatchReply reply = wait(in);
dist->sampleSeconds(now() - startTime);
return reply;
}
namespace CommitBatch {
struct CommitBatchContext {
@ -579,6 +587,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1); // Wait for local batch
wait(pProxyCommitData->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber - 1));
double queuingDelay = g_network->now() - timeStart;
pProxyCommitData->stats.commitBatchQueuingDist->sampleSeconds(queuingDelay);
if ((queuingDelay > (double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND ||
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01))) &&
SERVER_KNOBS->PROXY_REJECT_BATCH_QUEUED_TOO_LONG && canReject(trs)) {
@ -619,6 +628,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
pProxyCommitData->commitVersionRequestNumber++,
pProxyCommitData->mostRecentProcessedRequestNumber,
pProxyCommitData->dbgid);
state double beforeGettingCommitVersion = now();
GetCommitVersionReply versionReply = wait(brokenPromiseToNever(
pProxyCommitData->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)));
@ -626,6 +636,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
pProxyCommitData->stats.txnCommitVersionAssigned += trs.size();
pProxyCommitData->stats.lastCommitVersionAssigned = versionReply.version;
pProxyCommitData->stats.getCommitVersionDist->sampleSeconds(now() - beforeGettingCommitVersion);
self->commitVersion = versionReply.version;
self->prevVersion = versionReply.prevVersion;
@ -646,6 +657,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
}
ACTOR Future<Void> getResolution(CommitBatchContext* self) {
state double resolutionStart = now();
// Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with
// resolution processing but is still using CPU
ProxyCommitData* pProxyCommitData = self->pProxyCommitData;
@ -674,8 +686,9 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
std::vector<Future<ResolveTransactionBatchReply>> replies;
for (int r = 0; r < pProxyCommitData->resolvers.size(); r++) {
requests.requests[r].debugID = self->debugID;
replies.push_back(brokenPromiseToNever(
pProxyCommitData->resolvers[r].resolve.getReply(requests.requests[r], TaskPriority::ProxyResolverReply)));
replies.push_back(trackResolutionMetrics(pProxyCommitData->stats.resolverDist[r],
brokenPromiseToNever(pProxyCommitData->resolvers[r].resolve.getReply(
requests.requests[r], TaskPriority::ProxyResolverReply))));
}
self->transactionResolverMap.swap(requests.transactionResolverMap);
@ -700,6 +713,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
std::vector<ResolveTransactionBatchReply> resolutionResp = wait(getAll(replies));
self->resolution.swap(*const_cast<std::vector<ResolveTransactionBatchReply>*>(&resolutionResp));
self->pProxyCommitData->stats.resolutionDist->sampleSeconds(now() - resolutionStart);
if (self->debugID.present()) {
g_traceBatch.addEvent(
"CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution");
@ -1055,6 +1069,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
}
ACTOR Future<Void> postResolution(CommitBatchContext* self) {
state double postResolutionStart = now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state std::vector<CommitTransactionRequest>& trs = self->trs;
state const int64_t localBatchNumber = self->localBatchNumber;
@ -1064,6 +1079,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
bool queuedCommits = pProxyCommitData->latestLocalCommitBatchLogging.get() < localBatchNumber - 1;
TEST(queuedCommits); // Queuing post-resolution commit processing
wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1));
state double postResolutionQueuing = now();
pProxyCommitData->stats.postResolutionDist->sampleSeconds(postResolutionQueuing - postResolutionStart);
wait(yield(TaskPriority::ProxyCommitYield1));
self->computeStart = g_network->timer();
@ -1212,10 +1229,12 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
}
pProxyCommitData->stats.processingMutationDist->sampleSeconds(now() - postResolutionQueuing);
return Void();
}
ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
state double tLoggingStart = now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:transactionLogging"_loc, self->span.context);
@ -1249,11 +1268,12 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
pProxyCommitData->txsPopVersions.emplace_back(self->commitVersion, self->msg.popTo);
}
pProxyCommitData->logSystem->popTxs(self->msg.popTo);
pProxyCommitData->stats.tlogLoggingDist->sampleSeconds(now() - tLoggingStart);
return Void();
}
ACTOR Future<Void> reply(CommitBatchContext* self) {
state double replyStart = now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:reply"_loc, self->span.context);
@ -1385,7 +1405,7 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
pProxyCommitData->commitBatchesMemBytesCount -= self->currentBatchMemBytesCount;
ASSERT_ABORT(pProxyCommitData->commitBatchesMemBytesCount >= 0);
wait(self->releaseFuture);
pProxyCommitData->stats.replyCommitDist->sampleSeconds(now() - replyStart);
return Void();
}
@ -1856,7 +1876,10 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
commitData.resolvers = commitData.db->get().resolvers;
ASSERT(commitData.resolvers.size() != 0);
for (int i = 0; i < commitData.resolvers.size(); ++i) {
commitData.stats.resolverDist.push_back(Histogram::getHistogram(
LiteralStringRef("CommitProxy"), "ToResolver_" + commitData.resolvers[i].id().toString(), Histogram::Unit::microseconds));
}
auto rs = commitData.keyResolvers.modify(allKeys);
for (auto r = rs.begin(); r != rs.end(); ++r)
r->value().emplace_back(0, 0);

View File

@ -31,6 +31,7 @@
#include "fdbserver/MutationTracking.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/Histogram.h"
#include "flow/IndexedSet.h"
#include "flow/Knobs.h"
#include "fdbrpc/ReplicationPolicy.h"
@ -57,6 +58,7 @@ public:
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logRouters;
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
std::vector<Reference<ConnectionResetInfo>> connectionResetTrackers;
std::vector<Reference<Histogram>> tlogPushDistTrackers;
int32_t tLogWriteAntiQuorum;
int32_t tLogReplicationFactor;
std::vector<LocalityData> tLogLocalities; // Stores the localities of the log servers

View File

@ -74,6 +74,15 @@ struct ProxyStats {
int64_t maxComputeNS;
int64_t minComputeNS;
Reference<Histogram> commitBatchQueuingDist;
Reference<Histogram> getCommitVersionDist;
std::vector<Reference<Histogram>> resolverDist;
Reference<Histogram> resolutionDist;
Reference<Histogram> postResolutionDist;
Reference<Histogram> processingMutationDist;
Reference<Histogram> tlogLoggingDist;
Reference<Histogram> replyCommitDist;
int64_t getAndResetMaxCompute() {
int64_t r = maxComputeNS;
maxComputeNS = 0;
@ -113,7 +122,28 @@ struct ProxyStats {
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
maxComputeNS(0), minComputeNS(1e12) {
maxComputeNS(0), minComputeNS(1e12),
commitBatchQueuingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("CommitBatchQueuing"),
Histogram::Unit::microseconds)),
getCommitVersionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("GetCommitVersion"),
Histogram::Unit::microseconds)),
resolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("Resolution"),
Histogram::Unit::microseconds)),
postResolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("PostResolutionQueuing"),
Histogram::Unit::microseconds)),
processingMutationDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("ProcessingMutation"),
Histogram::Unit::microseconds)),
tlogLoggingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("TlogLogging"),
Histogram::Unit::microseconds)),
replyCommitDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("ReplyCommit"),
Histogram::Unit::microseconds)) {
specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; });
specialCounter(cc, "Version", [pVersion]() { return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });

View File

@ -527,6 +527,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
}
ACTOR static Future<TLogCommitReply> recordPushMetrics(Reference<ConnectionResetInfo> self,
Reference<Histogram> dist,
NetworkAddress addr,
Future<TLogCommitReply> in) {
state double startTime = now();
@ -541,6 +542,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
self->fastReplies++;
}
}
dist->sampleSeconds(now() - startTime);
return t;
}
@ -563,12 +565,21 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
it->connectionResetTrackers.push_back(makeReference<ConnectionResetInfo>());
}
}
if (it->tlogPushDistTrackers.empty()) {
for (int i = 0; i < it->logServers.size(); i++) {
it->tlogPushDistTrackers.push_back(
Histogram::getHistogram("ToTlog_" + it->logServers[i]->get().interf().uniqueID.toString(),
it->logServers[i]->get().interf().address().toString(),
Histogram::Unit::microseconds));
}
}
vector<Future<Void>> tLogCommitResults;
for (int loc = 0; loc < it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
data.recordEmptyMessage(location, msg);
allReplies.push_back(recordPushMetrics(
it->connectionResetTrackers[loc],
it->tlogPushDistTrackers[loc],
it->logServers[loc]->get().interf().address(),
it->logServers[loc]->get().interf().commit.getReply(TLogCommitRequest(spanContext,
msg.arena(),