mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 10:45:56 +08:00
Add a bunch of TEST macros and some other little things
This commit is contained in:
parent
31cef6075a
commit
bb3d4b6b89
@ -801,6 +801,7 @@ void DatabaseContext::expireThrottles() {
|
||||
for(auto &priorityItr : throttledTags) {
|
||||
for(auto tagItr = priorityItr.second.begin(); tagItr != priorityItr.second.end();) {
|
||||
if(tagItr->second.expired()) {
|
||||
TEST(true); // Expiring client throttle
|
||||
tagItr = priorityItr.second.erase(tagItr);
|
||||
}
|
||||
else {
|
||||
@ -2538,6 +2539,7 @@ double Transaction::getBackoff(int errCode) {
|
||||
if(priorityItr != cx->throttledTags.end()) {
|
||||
auto tagItr = priorityItr->second.find(tag);
|
||||
if(tagItr != priorityItr->second.end()) {
|
||||
TEST(true); // Returning throttle backoff
|
||||
returnedBackoff = std::min(CLIENT_KNOBS->TAG_THROTTLE_RECHECK_INTERVAL, std::max(returnedBackoff, tagItr->second.throttleDuration()));
|
||||
if(returnedBackoff == CLIENT_KNOBS->TAG_THROTTLE_RECHECK_INTERVAL) {
|
||||
break;
|
||||
@ -3219,9 +3221,11 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx,
|
||||
for(auto& tag : tags) {
|
||||
auto itr = v.tagThrottleInfo.find(tag.first);
|
||||
if(itr == v.tagThrottleInfo.end()) {
|
||||
TEST(true); // Removing client throttle
|
||||
priorityThrottledTags.erase(tag.first);
|
||||
}
|
||||
else {
|
||||
TEST(true); // Setting client throttle
|
||||
auto result = priorityThrottledTags.try_emplace(tag.first, itr->second);
|
||||
if(!result.second) {
|
||||
result.first->second.update(itr->second);
|
||||
@ -3342,6 +3346,7 @@ ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, TransactionPriorit
|
||||
priorityThrottledTags.erase(itr);
|
||||
}
|
||||
else if(itr->second.throttleDuration() > 0) {
|
||||
TEST(true); // Throttling transaction after getting read version
|
||||
++cx->transactionReadVersionsThrottled;
|
||||
throw tag_throttled();
|
||||
}
|
||||
@ -3403,10 +3408,14 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
||||
}
|
||||
|
||||
if(maxThrottleDelay > 0.0 && !canRecheck) { // TODO: allow delaying?
|
||||
TEST(true); // Throttling tag before GRV request
|
||||
++cx->transactionReadVersionsThrottled;
|
||||
readVersion = tag_throttled();
|
||||
return readVersion;
|
||||
}
|
||||
else {
|
||||
TEST(maxThrottleDelay > 0.0); // Rechecking throttle
|
||||
}
|
||||
|
||||
for(auto &tag : options.tags) {
|
||||
auto itr = priorityThrottledTags.find(tag);
|
||||
|
@ -26,6 +26,7 @@
|
||||
|
||||
void TagSet::addTag(TransactionTagRef tag) {
|
||||
ASSERT(CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < 256); // Tag length is encoded with a single byte
|
||||
ASSERT(CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION < 256); // Number of tags is encoded with a single byte
|
||||
|
||||
if(tag.size() > CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH) {
|
||||
throw tag_too_long();
|
||||
|
@ -1408,10 +1408,12 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
|
||||
auto tagItr = priorityThrottledTags.find(tag.first);
|
||||
if(tagItr != priorityThrottledTags.end()) {
|
||||
if(tagItr->second.expiration > now()) {
|
||||
TEST(true); // Proxy returning tag throttle
|
||||
reply.tagThrottleInfo[tag.first] = tagItr->second;
|
||||
}
|
||||
else {
|
||||
// This isn't required, but we might as well
|
||||
TEST(true); // Proxy expiring tag throttle
|
||||
priorityThrottledTags.erase(tagItr);
|
||||
}
|
||||
}
|
||||
|
@ -163,6 +163,7 @@ private:
|
||||
return rate;
|
||||
}
|
||||
else {
|
||||
TEST(true); // Get throttle rate for expired throttle
|
||||
return Optional<double>();
|
||||
}
|
||||
}
|
||||
@ -207,12 +208,14 @@ public:
|
||||
tpsRate = std::numeric_limits<double>::max();
|
||||
}
|
||||
else if(now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) {
|
||||
TEST(true); // Tag auto-throttled too quickly
|
||||
return Optional<double>();
|
||||
}
|
||||
else {
|
||||
tpsRate = computeTargetTpsRate(fractionalBusyness, SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS, tagData[tag].requestRate.smoothRate());
|
||||
|
||||
if(throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) {
|
||||
TEST(true); // Tag auto-throttle rate increase attempt while active
|
||||
return Optional<double>();
|
||||
}
|
||||
|
||||
@ -254,6 +257,7 @@ public:
|
||||
result.first->second.limits.expiration = expiration;
|
||||
|
||||
if(!oldLimits.present()) {
|
||||
TEST(true); // Transaction tag manually throttled
|
||||
TraceEvent("RatekeeperAddingManualThrottle")
|
||||
.detail("Tag", tag)
|
||||
.detail("Rate", tpsRate)
|
||||
@ -261,6 +265,7 @@ public:
|
||||
.detail("SecondsToExpiration", expiration - now());
|
||||
}
|
||||
else if(oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) {
|
||||
TEST(true); // Manual transaction tag throttle updated
|
||||
TraceEvent("RatekeeperUpdatingManualThrottle")
|
||||
.detail("Tag", tag)
|
||||
.detail("Rate", tpsRate)
|
||||
@ -299,21 +304,30 @@ public:
|
||||
if(priorityItr != manualItr->second.end()) {
|
||||
Optional<double> priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate);
|
||||
if(!priorityClientRate.present()) {
|
||||
TEST(true); // Manual priority throttle expired
|
||||
priorityItr = manualItr->second.erase(priorityItr);
|
||||
}
|
||||
else if(!manualClientRate.present() || manualClientRate.get().tpsRate > priorityClientRate.get()) {
|
||||
manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(), priorityItr->second.limits.expiration);
|
||||
else {
|
||||
if(!manualClientRate.present() || manualClientRate.get().tpsRate > priorityClientRate.get()) {
|
||||
manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(), priorityItr->second.limits.expiration);
|
||||
}
|
||||
else {
|
||||
TEST(true); // Manual throttle overriden by higher priority
|
||||
}
|
||||
|
||||
++priorityItr;
|
||||
}
|
||||
}
|
||||
|
||||
if(manualClientRate.present()) {
|
||||
tagPresent = true;
|
||||
TEST(true); // Using manual throttle
|
||||
clientRates[*priority][tagItr->first] = manualClientRate.get();
|
||||
}
|
||||
}
|
||||
|
||||
if(manualItr->second.empty()) {
|
||||
TEST(true); // All manual throttles expired
|
||||
manualThrottledTags.erase(manualItr);
|
||||
break;
|
||||
}
|
||||
@ -341,14 +355,19 @@ public:
|
||||
if(!result.second && result.first->second.tpsRate > adjustedRate) {
|
||||
result.first->second = ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
|
||||
}
|
||||
else {
|
||||
TEST(true); // Auto throttle overriden by manual throttle
|
||||
}
|
||||
clientRates[TransactionPriority::BATCH][tagItr->first] = ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
|
||||
}
|
||||
else {
|
||||
TEST(true); // Auto throttle expired
|
||||
autoThrottledTags.erase(autoItr);
|
||||
}
|
||||
}
|
||||
|
||||
if(!tagPresent) {
|
||||
TEST(true); // All tag throttles expired
|
||||
tagItr = tagData.erase(tagItr);
|
||||
}
|
||||
else {
|
||||
@ -360,6 +379,7 @@ public:
|
||||
}
|
||||
|
||||
void addRequests(TransactionTag const& tag, int requests) {
|
||||
TEST(true); // Requests reported for throttled tag
|
||||
ASSERT(requests > 0);
|
||||
|
||||
auto tagItr = tagData.try_emplace(tag);
|
||||
@ -673,18 +693,21 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
||||
wait(success(throttledTagKeys) && success(autoThrottlingEnabled));
|
||||
|
||||
if(autoThrottlingEnabled.get().present() && autoThrottlingEnabled.get().get() == LiteralStringRef("0")) {
|
||||
TEST(true); // Auto-throttling disabled
|
||||
if(self->autoThrottlingEnabled) {
|
||||
TraceEvent("AutoTagThrottlingDisabled");
|
||||
}
|
||||
self->autoThrottlingEnabled = false;
|
||||
}
|
||||
else if(autoThrottlingEnabled.get().present() && autoThrottlingEnabled.get().get() == LiteralStringRef("1")) {
|
||||
TEST(true); // Auto-throttling enabled
|
||||
if(!self->autoThrottlingEnabled) {
|
||||
TraceEvent("AutoTagThrottlingEnabled");
|
||||
}
|
||||
self->autoThrottlingEnabled = true;
|
||||
}
|
||||
else {
|
||||
TEST(true); // Auto-throttling unspecified
|
||||
if(autoThrottlingEnabled.get().present()) {
|
||||
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue").detail("Value", autoThrottlingEnabled.get().get());
|
||||
}
|
||||
@ -701,6 +724,7 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
||||
ASSERT(tagKey.tags.size() == 1); // Currently, only 1 tag per throttle is supported
|
||||
|
||||
if(tagValue.expirationTime == 0 || tagValue.expirationTime > now() + tagValue.initialDuration) {
|
||||
TEST(true); // Converting tag throttle duration to absolute time
|
||||
tagValue.expirationTime = now() + tagValue.initialDuration;
|
||||
BinaryWriter wr(IncludeVersion());
|
||||
wr << tagValue;
|
||||
@ -731,6 +755,7 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
||||
|
||||
wait(watchFuture);
|
||||
TraceEvent("RatekeeperThrottleSignaled");
|
||||
TEST(true); // Tag throttle changes detected
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
@ -739,10 +764,12 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
||||
}
|
||||
}
|
||||
|
||||
void autoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss, RkTagThrottleCollection& throttledTags) {
|
||||
void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss, RkTagThrottleCollection& throttledTags) {
|
||||
if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS
|
||||
&& ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST && throttledTags.autoThrottleCount() <= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS)
|
||||
{
|
||||
TEST(true); // Transaction tag auto-throttled
|
||||
|
||||
Optional<double> clientRate = self->throttledTags.autoThrottleTag(ss.busiestTag.get(), ss.busiestTagFractionalBusyness);
|
||||
if(clientRate.present()) {
|
||||
TagSet tags;
|
||||
@ -822,7 +849,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits, RkTagThrottleCol
|
||||
|
||||
TraceEvent("RkCheckingAutoThrottle").detail("StorageQueue", storageQueue).detail("BusiestTag", ss.busiestTag.present() ? ss.busiestTag.get() : LiteralStringRef("<none>")).detail("FractionalBusyness", ss.busiestTagFractionalBusyness).detail("BusiestTagRate", ss.busiestTagRate).detail("AutoThrottledTags", throttledTags.autoThrottleCount());
|
||||
if(limits->priority == TransactionPriority::DEFAULT && (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) {
|
||||
autoThrottleTag(self, ss, throttledTags);
|
||||
tryAutoThrottleTag(self, ss, throttledTags);
|
||||
}
|
||||
|
||||
double inputRate = ss.smoothInputBytes.smoothRate();
|
||||
@ -1198,6 +1225,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
||||
p.lastTagPushTime = now();
|
||||
|
||||
reply.throttledTags = self.throttledTags.getClientRates();
|
||||
TEST(reply.throttledTags.present() && reply.throttledTags.get().size() > 0); // Returning tag throttles to a proxy
|
||||
}
|
||||
|
||||
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
|
||||
|
@ -56,7 +56,7 @@ struct ClientTagThrottleLimits {
|
||||
double tpsRate;
|
||||
double expiration;
|
||||
|
||||
ClientTagThrottleLimits() {}
|
||||
ClientTagThrottleLimits() : tpsRate(0), expiration(0) {}
|
||||
ClientTagThrottleLimits(double tpsRate, double expiration) : tpsRate(tpsRate), expiration(expiration) {}
|
||||
|
||||
template <class Archive>
|
||||
|
@ -474,6 +474,7 @@ public:
|
||||
|
||||
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||
if(tags.present()) {
|
||||
TEST(true); // Tracking tag on storage server
|
||||
double cost = costFunction(bytes);
|
||||
for(auto& tag : tags.get()) {
|
||||
int64_t &count = intervalCounts[TransactionTag(tag, tags.get().arena)];
|
||||
|
Loading…
x
Reference in New Issue
Block a user