From 0352e8ee0b9abcf3eb3c6bb91835a0d2e8604849 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Sun, 2 Aug 2020 18:02:36 +0000 Subject: [PATCH] pick busiest commit tag periodically --- fdbclient/Knobs.cpp | 2 +- fdbserver/Knobs.cpp | 4 +- fdbserver/Knobs.h | 4 +- fdbserver/Ratekeeper.actor.cpp | 122 +++++++++++++++++------------- fdbserver/storageserver.actor.cpp | 6 +- 5 files changed, 78 insertions(+), 60 deletions(-) diff --git a/fdbclient/Knobs.cpp b/fdbclient/Knobs.cpp index 9549626213..a5ed5a585a 100644 --- a/fdbclient/Knobs.cpp +++ b/fdbclient/Knobs.cpp @@ -230,7 +230,7 @@ void ClientKnobs::initialize(bool randomize) { // transaction tags init( MAX_TAGS_PER_TRANSACTION, 5 ); init( MAX_TRANSACTION_TAG_LENGTH, 16 ); - init( COMMIT_SAMPLE_BYTE, 100000 ); + init( COMMIT_SAMPLE_BYTE, 16384 ); if( randomize && BUGGIFY ) COMMIT_SAMPLE_BYTE = 4096; // The same as SERVER_KNOBS->OPERATION_COST_BYTE_FACTOR init( READ_TAG_SAMPLE_RATE, 0.01 ); if( randomize && BUGGIFY ) READ_TAG_SAMPLE_RATE = 1.0; // Communicated to clients from cluster init( TAG_THROTTLE_SMOOTHING_WINDOW, 2.0 ); init( TAG_THROTTLE_RECHECK_INTERVAL, 5.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_RECHECK_INTERVAL = 0.0; diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 58e9d0438b..2992fce9ff 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -555,8 +555,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( BEHIND_CHECK_COUNT, 2 ); init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND ); init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 ); - init( MIN_TAG_PAGES_READ_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_READ_RATE = 0; - init( READ_TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) READ_TAG_MEASUREMENT_INTERVAL = 1.0; + init( MIN_TAG_PAGES_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_RATE = 0; + init( TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) TAG_MEASUREMENT_INTERVAL = 1.0; init( OPERATION_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) OPERATION_COST_BYTE_FACTOR = 4096; init( PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS, false ); if( randomize && BUGGIFY ) PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS = true; diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index da4d8c3ec2..2a8d9d10dc 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -484,8 +484,8 @@ public: int BEHIND_CHECK_COUNT; int64_t BEHIND_CHECK_VERSIONS; double WAIT_METRICS_WRONG_SHARD_CHANCE; - int64_t MIN_TAG_PAGES_READ_RATE; - double READ_TAG_MEASUREMENT_INTERVAL; + int64_t MIN_TAG_PAGES_RATE; + double TAG_MEASUREMENT_INTERVAL; int64_t OPERATION_COST_BYTE_FACTOR; bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS; diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index e1f13b8120..37ef4562ad 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -97,17 +97,21 @@ struct StorageQueueInfo { Smoother smoothTotalSpace; limitReason_t limitReason; - Optional busiestTag; - double busiestTagFractionalBusyness; - double busiestTagRate; + Optional busiestReadTag, busiestWriteTag; + double busiestReadTagFractionalBusyness = 0, busiestWriteTagFractionalBusyness = 0; + double busiestReadTagRate = 0, busiestWriteTagRate = 0; + + // refresh periodically + TransactionTagMap tagCostEst; + uint64_t totalWriteBytes; + int totalWriteOps; StorageQueueInfo(UID id, LocalityData locality) : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited), busiestTagFractionalBusyness(0), - busiestTagRate(0) { + smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited) { // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo lastReply.instanceID = -1; } @@ -534,7 +538,6 @@ struct RatekeeperData { Database db; Map storageQueueInfo; - int validSS = 0, numBusySS = 0; Map tlogQueueInfo; std::map proxyInfo; @@ -548,6 +551,7 @@ struct RatekeeperData { double lastWarning; double lastSSListFetchedTimestamp; + double lastBusiestCommitTagPick = 0; RkTagThrottleCollection throttledTags; uint64_t throttledTagChangeId; @@ -613,9 +617,9 @@ ACTOR Future trackStorageServerQueueInfo( RatekeeperData* self, StorageSer myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version); } - myQueueInfo->value.busiestTag = reply.get().busiestTag; - myQueueInfo->value.busiestTagFractionalBusyness = reply.get().busiestTagFractionalBusyness; - myQueueInfo->value.busiestTagRate = reply.get().busiestTagRate; + myQueueInfo->value.busiestReadTag = reply.get().busiestTag; + myQueueInfo->value.busiestReadTagFractionalBusyness = reply.get().busiestTagFractionalBusyness; + myQueueInfo->value.busiestReadTagRate = reply.get().busiestTagRate; } else { if(myQueueInfo->value.valid) { TraceEvent("RkStorageServerDidNotRespond", self->id) @@ -842,6 +846,49 @@ ACTOR Future monitorThrottlingChanges(RatekeeperData *self) { } } +Future refreshStorageServerCommitCost(RatekeeperData *self) { + double elapsed = now() - self->lastBusiestCommitTagPick; + if(elapsed <= 0) return Void(); + // for each SS, select the busiest commit tag from ssTagCommitCost + for(auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++it) { + it->value.busiestWriteTag.reset(); + TransactionTag busiestTag; + TransactionCommitCostEstimation maxCost; + double maxRate = 0, maxBusyness = 0; + for(const auto& [tag, cost] : it->value.tagCostEst) { + // opsBeforeSample / opsSampled = COMMIT_SAMPLE_BYTE / 1 + double rate = CLIENT_KNOBS->COMMIT_SAMPLE_BYTE * cost.getOpsSum() / elapsed; + if(rate > maxRate) { + busiestTag = tag; + maxRate = rate; + maxCost = cost; + } + } + if(maxRate > SERVER_KNOBS->MIN_TAG_PAGES_RATE) { + it->value.busiestWriteTag = busiestTag; + ASSERT(it->value.totalWriteBytes > 0 && it->value.totalWriteOps > 0); + maxBusyness = 0.5 * (maxCost.getBytesSum() / it->value.totalWriteBytes + maxCost.getOpsSum() / it->value.totalWriteOps); + it->value.busiestWriteTagFractionalBusyness = maxBusyness; + it->value.busiestWriteTagRate = maxRate; + } + // reset statistics + it->value.tagCostEst.clear(); + it->value.totalWriteOps = 0; + it->value.totalWriteBytes = 0; + + TraceEvent("BusiestWriteTag", it->key) + .detail("Elapsed", elapsed) + .detail("Tag", printable(busiestTag)) + .detail("TagOps", maxCost.getOpsSum()) + .detail("TagBytes", maxCost.getBytesSum()) + .detail("TagRate", maxRate) + .detail("TagBusyness", maxBusyness) + .detail("Reported", it->value.busiestWriteTag.present()) + .trackLatest(it->key.toString() + "_BusiestReadTag"); + } + self->lastBusiestCommitTagPick = now(); + return Void(); +} //ACTOR Future monitorDDMetricsChanges(RatekeeperData *self, Reference> db) { // state bool isFirstRep = true; // loop { @@ -870,13 +917,13 @@ ACTOR Future monitorThrottlingChanges(RatekeeperData *self) { //} void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss) { - if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST) { + if(ss.busiestReadTag.present() && ss.busiestReadTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && ss.busiestReadTagRate > SERVER_KNOBS->MIN_TAG_COST) { TEST(true); // Transaction tag auto-throttled - Optional clientRate = self->throttledTags.autoThrottleTag(self->id, ss.busiestTag.get(), ss.busiestTagFractionalBusyness); + Optional clientRate = self->throttledTags.autoThrottleTag(self->id, ss.busiestReadTag.get(), ss.busiestReadTagFractionalBusyness); if(clientRate.present()) { TagSet tags; - tags.addTag(ss.busiestTag.get()); + tags.addTag(ss.busiestReadTag.get()); self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION)); } @@ -1010,8 +1057,6 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { ssReasons[ss.id] = ssLimitReason; } - self->validSS = sscount; - self->numBusySS = writeSaturatedSSCount; std::set>> ignoredMachines; for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) { @@ -1240,44 +1285,15 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { } void updateCommitCostEstimation(RatekeeperData* self, UIDTransactionTagMap const& costEstimation) { -// if(self->validSS <= 0) return; -// int opsSum = 0; -// double bytesSum = 0; -// std::multimap, TransactionTag> costTagReverseIndex; -// for(const auto& [tagName, costEst] : costEstimation) { -// if(self->throttledTags.tagData.count(tagName) == 0) continue; -// int ops = costEst.numClear + costEst.numAtomicWrite + costEst.numWrite; -// opsSum += ops; -// double bytes = costEst.bytesClearEst + costEst.bytesAtomicWrite + costEst.bytesWrite + costEst.numClearShards * self->smoothMeanShardSize.smoothTotal(); -// bytesSum += bytes; -// costTagReverseIndex.emplace(std::make_pair(-bytes, -ops), tagName); -// } -// -// ASSERT(self->validSS > 0); -// int throttledNum = std::max(1, (int)(self->throttledTags.tagData.size() * self->numBusySS / self->validSS)); -// // calculate fractionalBusyness -// for(auto& [byteOps, tagName] : costTagReverseIndex) { // descending order -// if(throttledNum <= 0) break; -// if(self->throttledTags.manualThrottledTags.count(tagName) > 0) continue; -// double fractionalBusyness = 0.5 * -byteOps.first / (bytesSum+1) + 0.5 * -byteOps.second / (opsSum+1); -// if(self->throttledTags.autoThrottledTags.count(tagName) > 0) { // has been throttled -// self->throttledTags.autoThrottleTag(self->id, tagName, fractionalBusyness); -// } -// else { // new auto-throttled -// double opsRate = -byteOps.second / (costEstimation.find(tagName)->second.existTime + 1); -// if(fractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && opsRate > SERVER_KNOBS->MIN_TAG_COST) -// { -// Optional clientRate = self->throttledTags.autoThrottleTag(self->id, tagName, fractionalBusyness); -// if(clientRate.present()) { -// TagSet tags; -// tags.addTag(tagName); -// -// self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION)); -// } -// } -// } -// throttledNum --; -// } + for(auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++ it) { + auto tagCostIt = costEstimation.find(it->key); + if(tagCostIt == costEstimation.end()) continue; + for(const auto& [tagName, cost] : tagCostIt->second) { + it->value.tagCostEst[tagName] += cost; + it->value.totalWriteBytes += cost.getBytesSum(); + it->value.totalWriteOps += cost.getOpsSum(); + } + } } ACTOR Future configurationMonitor(RatekeeperData *self) { loop { @@ -1321,6 +1337,8 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, ReferenceTAG_MEASUREMENT_INTERVAL)); TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()).detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG) .detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0)); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 908657f3db..4a451ee282 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -498,7 +498,7 @@ public: previousBusiestTag.reset(); if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) { double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed; - if(rate > SERVER_KNOBS->MIN_TAG_PAGES_READ_RATE) { + if(rate > SERVER_KNOBS->MIN_TAG_PAGES_RATE) { previousBusiestTag = TagInfo(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount); } @@ -508,7 +508,7 @@ public: .detail("TagCost", busiestTagCount) .detail("TotalSampledCost", intervalTotalSampledCount) .detail("Reported", previousBusiestTag.present()) - .trackLatest(id.toString() + "/BusiestReadTag"); + .trackLatest(id.toString() + "_BusiestReadTag"); } intervalCounts.clear(); @@ -3775,7 +3775,7 @@ ACTOR Future storageServerCore( StorageServer* self, StorageServerInterfac self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id())); self->transactionTagCounter.startNewInterval(self->thisServerID); - self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL)); + self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); self->coreStarted.send( Void() );