Do the auto-throttle ramp up in a better place. Only commit manual throttle limit once. Add some asserts.

This commit is contained in:
A.J. Beamon 2020-05-03 19:15:29 -07:00
parent b33871a16b
commit 31cef6075a

View File

@ -158,7 +158,9 @@ private:
Optional<double> updateAndGetClientRate(Optional<double> requestRate) {
if(limits.expiration > now()) {
clientRate.setTotal(getTargetRate(requestRate));
return clientRate.smoothTotal();
double rate = clientRate.smoothTotal();
ASSERT(rate >= 0);
return rate;
}
else {
return Optional<double>();
@ -181,30 +183,36 @@ public:
tagData = std::move(other.tagData);
}
double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate) {
ASSERT(currentBusyness > 0);
if(targetBusyness < 1) {
double targetFraction = targetBusyness * (1-currentBusyness) / ((1-targetBusyness) * currentBusyness);
return requestRate * targetFraction;
}
else {
return std::numeric_limits<double>::max();
}
}
// Returns the TPS rate if the throttle is updated, otherwise returns an empty optional
Optional<double> autoThrottleTag(TransactionTag const& tag, double fractionalBusyness, Optional<double> tpsRate = Optional<double>(), Optional<double> expiration = Optional<double>()) {
ASSERT(!tpsRate.present() || tpsRate.get() >= 0);
ASSERT(!expiration.present() || expiration.get() > now());
auto &throttle = autoThrottledTags[tag];
if(!tpsRate.present()) {
if(now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) {
tpsRate = 1e7;
tpsRate = std::numeric_limits<double>::max();
}
else if(now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) {
return Optional<double>();
}
else {
double requestRate = tagData[tag].requestRate.smoothRate();
tpsRate = computeTargetTpsRate(fractionalBusyness, SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS, tagData[tag].requestRate.smoothRate());
double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
if(now() >= throttle.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME) {
double rampLocation = (now() - throttle.lastReduced - SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME) / SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
targetBusyness = pow(targetBusyness, 1 - rampLocation);
}
double targetFraction = targetBusyness * (1-fractionalBusyness) / ((1-targetBusyness) * fractionalBusyness);
tpsRate = requestRate * targetFraction;
if(now() <= throttle.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME && tpsRate.get() >= throttle.limits.tpsRate) {
if(throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) {
return Optional<double>();
}
@ -218,6 +226,8 @@ public:
expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION;
}
ASSERT(tpsRate.present() && tpsRate.get() >= 0);
TraceEvent("RkSetAutoThrottle")
.detail("Tag", tag)
.detail("TargetRate", tpsRate.get())
@ -313,10 +323,23 @@ public:
if(autoItr != autoThrottledTags.end()) {
Optional<double> autoClientRate = autoItr->second.updateAndGetClientRate(requestRate);
if(autoClientRate.present()) {
double adjustedRate = autoClientRate.get();
if(now() >= autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME) {
TEST(true); // Tag auto-throttle ramping up
double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
if(targetBusyness == 0) {
targetBusyness = 0.01;
}
double rampLocation = (now() - autoItr->second.lastReduced - SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME) / targetBusyness;
adjustedRate = computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate);
}
tagPresent = true;
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(tagItr->first, autoClientRate.get(), autoItr->second.limits.expiration);
if(!result.second && result.first->second.tpsRate > autoClientRate.get()) {
result.first->second = ClientTagThrottleLimits(autoClientRate.get(), autoItr->second.limits.expiration);
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(tagItr->first, adjustedRate, autoItr->second.limits.expiration);
if(!result.second && result.first->second.tpsRate > adjustedRate) {
result.first->second = ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
}
clientRates[TransactionPriority::BATCH][tagItr->first] = ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
}
@ -337,6 +360,8 @@ public:
}
void addRequests(TransactionTag const& tag, int requests) {
ASSERT(requests > 0);
auto tagItr = tagData.try_emplace(tag);
tagItr.first->second.requestRate.addDelta(requests);
@ -627,6 +652,7 @@ ACTOR Future<Void> monitorServerListChange(
}
ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
state bool committed = false;
loop {
state ReadYourWritesTransaction tr(self->db);
@ -638,9 +664,11 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
state Future<Standalone<RangeResultRef>> throttledTagKeys = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY);
state Future<Optional<Value>> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey);
BinaryWriter limitWriter(Unversioned());
limitWriter << SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS;
tr.set(tagThrottleLimitKey, limitWriter.toValue());
if(!committed) {
BinaryWriter limitWriter(Unversioned());
limitWriter << SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS;
tr.set(tagThrottleLimitKey, limitWriter.toValue());
}
wait(success(throttledTagKeys) && success(autoThrottlingEnabled));
@ -699,6 +727,8 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
state Future<Void> watchFuture = tr.watch(tagThrottleSignalKey);
wait(tr.commit());
committed = true;
wait(watchFuture);
TraceEvent("RatekeeperThrottleSignaled");
break;