1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-06-01 10:45:56 +08:00

pick busiest commit tag periodically

This commit is contained in:
Xiaoxi Wang 2020-08-02 18:02:36 +00:00
parent 92c1112c74
commit 0352e8ee0b
5 changed files with 78 additions and 60 deletions

@ -230,7 +230,7 @@ void ClientKnobs::initialize(bool randomize) {
// transaction tags
init( MAX_TAGS_PER_TRANSACTION, 5 );
init( MAX_TRANSACTION_TAG_LENGTH, 16 );
init( COMMIT_SAMPLE_BYTE, 100000 );
init( COMMIT_SAMPLE_BYTE, 16384 ); if( randomize && BUGGIFY ) COMMIT_SAMPLE_BYTE = 4096; // The same as SERVER_KNOBS->OPERATION_COST_BYTE_FACTOR
init( READ_TAG_SAMPLE_RATE, 0.01 ); if( randomize && BUGGIFY ) READ_TAG_SAMPLE_RATE = 1.0; // Communicated to clients from cluster
init( TAG_THROTTLE_SMOOTHING_WINDOW, 2.0 );
init( TAG_THROTTLE_RECHECK_INTERVAL, 5.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_RECHECK_INTERVAL = 0.0;

@ -555,8 +555,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( BEHIND_CHECK_COUNT, 2 );
init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND );
init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 );
init( MIN_TAG_PAGES_READ_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_READ_RATE = 0;
init( READ_TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) READ_TAG_MEASUREMENT_INTERVAL = 1.0;
init( MIN_TAG_PAGES_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_RATE = 0;
init( TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) TAG_MEASUREMENT_INTERVAL = 1.0;
init( OPERATION_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) OPERATION_COST_BYTE_FACTOR = 4096;
init( PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS, false ); if( randomize && BUGGIFY ) PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS = true;

@ -484,8 +484,8 @@ public:
int BEHIND_CHECK_COUNT;
int64_t BEHIND_CHECK_VERSIONS;
double WAIT_METRICS_WRONG_SHARD_CHANCE;
int64_t MIN_TAG_PAGES_READ_RATE;
double READ_TAG_MEASUREMENT_INTERVAL;
int64_t MIN_TAG_PAGES_RATE;
double TAG_MEASUREMENT_INTERVAL;
int64_t OPERATION_COST_BYTE_FACTOR;
bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS;

@ -97,17 +97,21 @@ struct StorageQueueInfo {
Smoother smoothTotalSpace;
limitReason_t limitReason;
Optional<TransactionTag> busiestTag;
double busiestTagFractionalBusyness;
double busiestTagRate;
Optional<TransactionTag> busiestReadTag, busiestWriteTag;
double busiestReadTagFractionalBusyness = 0, busiestWriteTagFractionalBusyness = 0;
double busiestReadTagRate = 0, busiestWriteTagRate = 0;
// refresh periodically
TransactionTagMap<TransactionCommitCostEstimation> tagCostEst;
uint64_t totalWriteBytes;
int totalWriteOps;
StorageQueueInfo(UID id, LocalityData locality)
: valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited), busiestTagFractionalBusyness(0),
busiestTagRate(0) {
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1;
}
@ -534,7 +538,6 @@ struct RatekeeperData {
Database db;
Map<UID, StorageQueueInfo> storageQueueInfo;
int validSS = 0, numBusySS = 0;
Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, ProxyInfo> proxyInfo;
@ -548,6 +551,7 @@ struct RatekeeperData {
double lastWarning;
double lastSSListFetchedTimestamp;
double lastBusiestCommitTagPick = 0;
RkTagThrottleCollection throttledTags;
uint64_t throttledTagChangeId;
@ -613,9 +617,9 @@ ACTOR Future<Void> trackStorageServerQueueInfo( RatekeeperData* self, StorageSer
myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version);
}
myQueueInfo->value.busiestTag = reply.get().busiestTag;
myQueueInfo->value.busiestTagFractionalBusyness = reply.get().busiestTagFractionalBusyness;
myQueueInfo->value.busiestTagRate = reply.get().busiestTagRate;
myQueueInfo->value.busiestReadTag = reply.get().busiestTag;
myQueueInfo->value.busiestReadTagFractionalBusyness = reply.get().busiestTagFractionalBusyness;
myQueueInfo->value.busiestReadTagRate = reply.get().busiestTagRate;
} else {
if(myQueueInfo->value.valid) {
TraceEvent("RkStorageServerDidNotRespond", self->id)
@ -842,6 +846,49 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
}
}
Future<Void> refreshStorageServerCommitCost(RatekeeperData *self) {
double elapsed = now() - self->lastBusiestCommitTagPick;
if(elapsed <= 0) return Void();
// for each SS, select the busiest commit tag from ssTagCommitCost
for(auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++it) {
it->value.busiestWriteTag.reset();
TransactionTag busiestTag;
TransactionCommitCostEstimation maxCost;
double maxRate = 0, maxBusyness = 0;
for(const auto& [tag, cost] : it->value.tagCostEst) {
// opsBeforeSample / opsSampled = COMMIT_SAMPLE_BYTE / 1
double rate = CLIENT_KNOBS->COMMIT_SAMPLE_BYTE * cost.getOpsSum() / elapsed;
if(rate > maxRate) {
busiestTag = tag;
maxRate = rate;
maxCost = cost;
}
}
if(maxRate > SERVER_KNOBS->MIN_TAG_PAGES_RATE) {
it->value.busiestWriteTag = busiestTag;
ASSERT(it->value.totalWriteBytes > 0 && it->value.totalWriteOps > 0);
maxBusyness = 0.5 * (maxCost.getBytesSum() / it->value.totalWriteBytes + maxCost.getOpsSum() / it->value.totalWriteOps);
it->value.busiestWriteTagFractionalBusyness = maxBusyness;
it->value.busiestWriteTagRate = maxRate;
}
// reset statistics
it->value.tagCostEst.clear();
it->value.totalWriteOps = 0;
it->value.totalWriteBytes = 0;
TraceEvent("BusiestWriteTag", it->key)
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagOps", maxCost.getOpsSum())
.detail("TagBytes", maxCost.getBytesSum())
.detail("TagRate", maxRate)
.detail("TagBusyness", maxBusyness)
.detail("Reported", it->value.busiestWriteTag.present())
.trackLatest(it->key.toString() + "_BusiestReadTag");
}
self->lastBusiestCommitTagPick = now();
return Void();
}
//ACTOR Future<Void> monitorDDMetricsChanges(RatekeeperData *self, Reference<AsyncVar<ServerDBInfo>> db) {
// state bool isFirstRep = true;
// loop {
@ -870,13 +917,13 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
//}
void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss) {
if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST) {
if(ss.busiestReadTag.present() && ss.busiestReadTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && ss.busiestReadTagRate > SERVER_KNOBS->MIN_TAG_COST) {
TEST(true); // Transaction tag auto-throttled
Optional<double> clientRate = self->throttledTags.autoThrottleTag(self->id, ss.busiestTag.get(), ss.busiestTagFractionalBusyness);
Optional<double> clientRate = self->throttledTags.autoThrottleTag(self->id, ss.busiestReadTag.get(), ss.busiestReadTagFractionalBusyness);
if(clientRate.present()) {
TagSet tags;
tags.addTag(ss.busiestTag.get());
tags.addTag(ss.busiestReadTag.get());
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
}
@ -1010,8 +1057,6 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
ssReasons[ss.id] = ssLimitReason;
}
self->validSS = sscount;
self->numBusySS = writeSaturatedSSCount;
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) {
@ -1240,44 +1285,15 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
}
void updateCommitCostEstimation(RatekeeperData* self, UIDTransactionTagMap<TransactionCommitCostEstimation> const& costEstimation) {
// if(self->validSS <= 0) return;
// int opsSum = 0;
// double bytesSum = 0;
// std::multimap<std::pair<double, int>, TransactionTag> costTagReverseIndex;
// for(const auto& [tagName, costEst] : costEstimation) {
// if(self->throttledTags.tagData.count(tagName) == 0) continue;
// int ops = costEst.numClear + costEst.numAtomicWrite + costEst.numWrite;
// opsSum += ops;
// double bytes = costEst.bytesClearEst + costEst.bytesAtomicWrite + costEst.bytesWrite + costEst.numClearShards * self->smoothMeanShardSize.smoothTotal();
// bytesSum += bytes;
// costTagReverseIndex.emplace(std::make_pair(-bytes, -ops), tagName);
// }
//
// ASSERT(self->validSS > 0);
// int throttledNum = std::max(1, (int)(self->throttledTags.tagData.size() * self->numBusySS / self->validSS));
// // calculate fractionalBusyness
// for(auto& [byteOps, tagName] : costTagReverseIndex) { // descending order
// if(throttledNum <= 0) break;
// if(self->throttledTags.manualThrottledTags.count(tagName) > 0) continue;
// double fractionalBusyness = 0.5 * -byteOps.first / (bytesSum+1) + 0.5 * -byteOps.second / (opsSum+1);
// if(self->throttledTags.autoThrottledTags.count(tagName) > 0) { // has been throttled
// self->throttledTags.autoThrottleTag(self->id, tagName, fractionalBusyness);
// }
// else { // new auto-throttled
// double opsRate = -byteOps.second / (costEstimation.find(tagName)->second.existTime + 1);
// if(fractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && opsRate > SERVER_KNOBS->MIN_TAG_COST)
// {
// Optional<double> clientRate = self->throttledTags.autoThrottleTag(self->id, tagName, fractionalBusyness);
// if(clientRate.present()) {
// TagSet tags;
// tags.addTag(tagName);
//
// self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
// }
// }
// }
// throttledNum --;
// }
for(auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++ it) {
auto tagCostIt = costEstimation.find(it->key);
if(tagCostIt == costEstimation.end()) continue;
for(const auto& [tagName, cost] : tagCostIt->second) {
it->value.tagCostEst[tagName] += cost;
it->value.totalWriteBytes += cost.getBytesSum();
it->value.totalWriteOps += cost.getOpsSum();
}
}
}
ACTOR Future<Void> configurationMonitor(RatekeeperData *self) {
loop {
@ -1321,6 +1337,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
self.addActor.send( traceRole(Role::RATEKEEPER, rkInterf.id()) );
self.addActor.send(monitorThrottlingChanges(&self));
RatekeeperData* selfPtr = &self;
self.addActor.send(recurring([selfPtr](){refreshStorageServerCommitCost(selfPtr);}, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()).detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));

@ -498,7 +498,7 @@ public:
previousBusiestTag.reset();
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed;
if(rate > SERVER_KNOBS->MIN_TAG_PAGES_READ_RATE) {
if(rate > SERVER_KNOBS->MIN_TAG_PAGES_RATE) {
previousBusiestTag = TagInfo(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount);
}
@ -508,7 +508,7 @@ public:
.detail("TagCost", busiestTagCount)
.detail("TotalSampledCost", intervalTotalSampledCount)
.detail("Reported", previousBusiestTag.present())
.trackLatest(id.toString() + "/BusiestReadTag");
.trackLatest(id.toString() + "_BusiestReadTag");
}
intervalCounts.clear();
@ -3775,7 +3775,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
self->transactionTagCounter.startNewInterval(self->thisServerID);
self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL));
self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
self->coreStarted.send( Void() );