Merge pull request #3960 from dongxinEric/misc/expose-proxy-local-rate-info

Expose local transaction rate and limit info on commit proxies.
This commit is contained in:
Xin Dong 2020-11-12 15:23:50 -08:00 committed by GitHub
commit 8343c78bf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -63,6 +63,14 @@ struct ProxyStats {
Counter conflictRanges;
Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors;
Version lastCommitVersionAssigned;
double transactionRateAllowed, batchTransactionRateAllowed;
double transactionLimit, batchTransactionLimit;
// how much of the GRV requests queue was processed in one attempt to hand out read version.
double percentageOfDefaultGRVQueueProcessed;
double percentageOfBatchGRVQueueProcessed;
LatencySample defaultTxnGRVTimeInQueue;
LatencySample batchTxnGRVTimeInQueue;
LatencySample commitLatencySample;
LatencySample grvLatencySample;
@ -72,24 +80,56 @@ struct ProxyStats {
Future<Void> logger;
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestErrors("TxnRequestErrors", cc),
txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc), txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc), txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc), txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc), txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc), txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc), txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc), txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc), keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
lastCommitVersionAssigned(0), commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY)
{
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc),
mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc),
keyServerLocationIn("KeyServerLocationIn", cc), keyServerLocationOut("KeyServerLocationOut", cc),
keyServerLocationErrors("KeyServerLocationErrors", cc), lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
batchTxnGRVTimeInQueue("BatchTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
transactionRateAllowed(0), batchTransactionRateAllowed(0), transactionLimit(0), batchTransactionLimit(0),
percentageOfDefaultGRVQueueProcessed(0), percentageOfBatchGRVQueueProcessed(0) {
specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
specialCounter(cc, "Version", [pVersion](){return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
// The rate at which the limit(budget) is allowed to grow.
specialCounter(cc, "SystemAndDefaultTxnRateAllowed", [this]() { return this->transactionRateAllowed; });
specialCounter(cc, "BatchTransactionRateAllowed", [this]() { return this->batchTransactionRateAllowed; });
specialCounter(cc, "SystemAndDefaultTxnLimit", [this]() { return this->transactionLimit; });
specialCounter(cc, "BatchTransactionLimit", [this]() { return this->batchTransactionLimit; });
specialCounter(cc, "PercentageOfDefaultGRVQueueProcessed",
[this]() { return this->percentageOfDefaultGRVQueueProcessed; });
specialCounter(cc, "PercentageOfBatchGRVQueueProcessed",
[this]() { return this->percentageOfBatchGRVQueueProcessed; });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
}
};
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate,
double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount,
int64_t* inBatchTransactionCount, double* outTransactionRate,
double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply, ProxyStats* stats) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -120,7 +160,14 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
reply = Never();
*outTransactionRate = rep.transactionRate;
*outBatchTransactionRate = rep.batchTransactionRate;
//TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
stats->transactionRateAllowed = rep.transactionRate;
stats->batchTransactionRateAllowed = rep.batchTransactionRate;
// TraceEvent("MasterProxyTxRate", myID)
// .detail("RKID", db->get().ratekeeper.get().id())
// .detail("RateAllowed", rep.transactionRate)
// .detail("BatchRateAllowed", rep.batchTransactionRate)
// .detail("Lease", rep.leaseDuration)
// .detail("ReleasedTransactions", *inTransactionCount - lastTC);
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
nextRequestTimer = delayJittered(rep.leaseDuration / 2);
@ -1266,13 +1313,10 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
return Void();
}
ACTOR static Future<Void> transactionStarter(
MasterProxyInterface proxy,
Reference<AsyncVar<ServerDBInfo>> db,
PromiseStream<Future<Void>> addActor,
ProxyCommitData* commitData, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply)
{
ACTOR static Future<Void> transactionStarter(MasterProxyInterface proxy, Reference<AsyncVar<ServerDBInfo>> db,
PromiseStream<Future<Void>> addActor, ProxyCommitData* commitData,
GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply, ProxyStats* stats) {
state double lastGRVTime = 0;
state PromiseStream<Void> GRVTimer;
state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
@ -1288,7 +1332,8 @@ ACTOR static Future<Void> transactionStarter(
state vector<MasterProxyInterface> otherProxies;
state PromiseStream<double> replyTimes;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate,
&batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply, stats));
addActor.send(queueTransactionStartRequests(&systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
// Get a list of the other proxies that go together with us
@ -1315,6 +1360,9 @@ ACTOR static Future<Void> transactionStarter(
normalRateInfo.reset(elapsed);
batchRateInfo.reset(elapsed);
stats->transactionLimit = normalRateInfo.limit;
stats->batchTransactionLimit = batchRateInfo.limit;
int transactionsStarted[2] = {0,0};
int systemTransactionsStarted[2] = {0,0};
int defaultPriTransactionsStarted[2] = { 0, 0 };
@ -1325,6 +1373,8 @@ ACTOR static Future<Void> transactionStarter(
int requestsToStart = 0;
uint32_t defaultQueueSize = defaultQueue.size();
uint32_t batchQueueSize = batchQueue.size();
while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
Deque<GetReadVersionRequest>* transactionQueue;
if(!systemQueue.empty()) {
@ -1353,12 +1403,16 @@ ACTOR static Future<Void> transactionStarter(
}
transactionsStarted[req.flags&1] += tc;
if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE)
double currentTime = g_network->timer();
if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
systemTransactionsStarted[req.flags & 1] += tc;
else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT)
} else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT) {
defaultPriTransactionsStarted[req.flags & 1] += tc;
else
stats->defaultTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime());
} else {
batchPriTransactionsStarted[req.flags & 1] += tc;
stats->batchTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime());
}
start[req.flags & 1].push_back(std::move(req)); static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
transactionQueue->pop_front();
@ -1390,6 +1444,8 @@ ACTOR static Future<Void> transactionStarter(
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");
}
int defaultGRVProcessed = 0;
int batchGRVProcessed = 0;
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
@ -1399,8 +1455,12 @@ ACTOR static Future<Void> transactionStarter(
if (i == 0) {
addActor.send(timeReply(readVersionReply, replyTimes));
}
defaultGRVProcessed += defaultPriTransactionsStarted[i];
batchGRVProcessed += batchPriTransactionsStarted[i];
}
}
stats->percentageOfDefaultGRVQueueProcessed = (double)defaultGRVProcessed / defaultQueueSize;
stats->percentageOfBatchGRVQueueProcessed = (double)batchGRVProcessed / batchQueueSize;
}
}
@ -1755,7 +1815,8 @@ ACTOR Future<Void> masterProxyServerCore(
TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
addActor.send(monitorRemoteCommitted(&commitData));
addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply,
&detailedHealthMetricsReply, &commitData.stats));
addActor.send(readRequestServer(proxy, addActor, &commitData));
addActor.send(rejoinServer(proxy, &commitData));
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));