Checkpointing a bunch of work on throttles. Rudimentary implementation of auto-throttling. Support for manual throttling via fdbcli. Throttles are stored in the system keyspace.

This commit is contained in:
A.J. Beamon 2020-04-03 15:24:14 -07:00
parent e0424a52f8
commit 2336f073ad
16 changed files with 468 additions and 34 deletions

View File

@ -267,6 +267,8 @@
"transactions_per_second_limit":0,
"batch_released_transactions_per_second":0,
"released_transactions_per_second":0,
"batch_tags_throttled":0,
"tags_throttled":0,
"limiting_queue_bytes_storage_server":0,
"worst_queue_bytes_storage_server":0,
"limiting_version_lag_storage_server":0,

View File

@ -30,6 +30,7 @@
#include "fdbclient/Schemas.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/TagThrottle.h"
#include "flow/DeterministicRandom.h"
#include "flow/Platform.h"
@ -565,6 +566,11 @@ void initHelp() {
"consistencycheck [on|off]",
"permits or prevents consistency checking",
"Calling this command with `on' permits consistency check processes to run and `off' will halt their checking. Calling this command with no arguments will display if consistency checking is currently allowed.\n");
helpMap["throttle"] = CommandHelp(
"throttle <on|off|enable auto|disable auto|list> [ARGS]",
"view and control throttled tags",
"Use `on' and `off' to manually throttle or unthrottle tags. Use `enable auto' or `disable auto' to enable or disable automatic tag throttling. Use `list' to print the list of throttled tags.\n"
);
hiddenCommands.insert("expensive_data_check");
hiddenCommands.insert("datadistribution");
@ -3682,6 +3688,159 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if(tokencmp(tokens[0], "throttle")) {
if(tokens.size() == 1) {
printUsage(tokens[0]);
is_error = true;
continue;
}
else if(tokencmp(tokens[1], "list")) {
if(tokens.size() > 4) {
printf("Usage: throttle list [LIMIT] [PREFIX]\n");
printf("\n");
printf("Lists tags that are currently throttled, optionally limited to a certain tag PREFIX.\n");
printf("The default LIMIT is 100 tags, and by default all tags will be searched.\n");
is_error = true;
continue;
}
state int throttleListLimit = 100;
state StringRef prefix;
if(tokens.size() >= 3) {
char *end;
throttleListLimit = std::strtol((const char*)tokens[2].begin(), &end, 10);
if ((tokens.size() > 3 && !std::isspace(*end)) || (tokens.size() == 3 && *end != '\0')) {
printf("ERROR: failed to parse limit `%s'.\n", printable(tokens[2]).c_str());
is_error = true;
continue;
}
}
if(tokens.size() >= 4) {
prefix = tokens[3];
}
std::map<Standalone<StringRef>, TagThrottleInfo> tags = wait(ThrottleApi::getTags(db, throttleListLimit, prefix));
std::string prefixString = "";
if(prefix.size() > 0) {
prefixString = format(" with prefix `%s'", prefix.toString().c_str());
}
if(tags.size() > 0) {
printf("Throttled tags%s:\n\n", prefixString.c_str());
printf(" Rate | Expiration (s) | Priority | Type | Tag\n");
printf(" ------+----------------+-----------+--------+------------------\n");
for(auto itr = tags.begin(); itr != tags.end(); ++itr) {
printf(" %3d%% | %13ds | %9s | %6s | %s\n",
(int)(itr->second.rate*100),
(int)(itr->second.expiration-now()),
TagThrottleInfo::priorityToString(itr->second.priority),
itr->second.autoThrottled ? "auto" : "manual",
itr->first.substr(tagThrottleKeysPrefix.size()).toString().c_str());
}
if(tags.size() == throttleListLimit) {
printf("\nThe tag limit `%d' was reached. Use the [LIMIT] or [PREFIX] arguments to view additional tags.\n", throttleListLimit);
printf("Usage: throttle list [LIMIT] [PREFIX]\n");
}
}
else {
printf("There are no throttled tags%s\n", prefixString.c_str());
}
}
else if(tokencmp(tokens[1], "on") && tokens.size() <=6) {
if(tokens.size() < 4 || !tokencmp(tokens[2], "tag")) {
printf("Usage: throttle on tag <TAG> [RATE] [DURATION]\n");
printf("\n");
printf("Enables throttling for transactions with the specified tag.\n");
printf("An optional throttling rate (out of 1.0) can be specified (default 1.0).\n");
printf("An optional duration can be specified, which must include a time suffix (s, m, h, d) (default 1h).\n");
is_error = true;
continue;
}
double rate = 1.0;
uint64_t duration = 3600;
if(tokens.size() >= 5) {
char *end;
rate = std::strtod((const char*)tokens[4].begin(), &end);
if((tokens.size() > 5 && !std::isspace(*end)) || (tokens.size() == 5 && *end != '\0')) {
printf("ERROR: failed to parse rate `%s'.\n", printable(tokens[4]).c_str());
is_error = true;
continue;
}
if(rate <= 0 || rate > 1) {
printf("ERROR: invalid rate `%f'; must satisfy 0 < rate <= 1\n", rate);
is_error = true;
continue;
}
}
if(tokens.size() == 6) {
char *end;
Optional<uint64_t> parsedDuration = parseDuration(tokens[5].toString());
if(!parsedDuration.present()) {
printf("ERROR: failed to parse duration `%s'.\n", printable(tokens[5]).c_str());
is_error = true;
continue;
}
duration = parsedDuration.get();
}
wait(ThrottleApi::throttleTag(db, tokens[3], rate, now()+duration, false)); // TODO: express in versions or somehow deal with time?
printf("Tag `%s' has been throttled\n", tokens[3].toString().c_str());
}
else if(tokencmp(tokens[1], "off")) {
if(tokencmp(tokens[2], "tag") && tokens.size() == 4) {
bool success = wait(ThrottleApi::unthrottleTag(db, tokens[3]));
if(success) {
printf("Unthrottled tag `%s'\n", tokens[3].toString().c_str());
}
else {
printf("Tag `%s' was not throttled\n", tokens[3].toString().c_str());
}
}
else if(tokencmp(tokens[2], "all") && tokens.size() == 3) {
uint64_t unthrottledTags = wait(ThrottleApi::unthrottleAll(db));
printf("Unthrottled %lld tags\n", unthrottledTags);
}
else if(tokencmp(tokens[2], "auto") && tokens.size() == 3) {
uint64_t unthrottledTags = wait(ThrottleApi::unthrottleAuto(db));
printf("Unthrottled %lld tags\n", unthrottledTags);
}
else if(tokencmp(tokens[2], "manual") && tokens.size() == 3) {
uint64_t unthrottledTags = wait(ThrottleApi::unthrottleManual(db));
printf("Unthrottled %lld tags\n", unthrottledTags);
}
else {
printf("Usage: throttle off <all|auto|manual|tag> [TAG]\n");
printf("\n");
printf("Disables throttling for the specified tag(s).\n");
printf("Use `all' to turn off all tag throttles, `auto' to turn off throttles created by\n");
printf("the cluster, and `manual' to turn off throttles created manually. Use `tag <TAG>'\n");
printf("to turn off throttles for a specific tag\n");
is_error = true;
}
}
else if((tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) && tokens.size() == 3 && tokencmp(tokens[2], "auto")) {
if(tokens.size() != 3 || !tokencmp(tokens[2], "auto")) {
printf("Usage: throttle <enable|disable> auto\n");
printf("\n");
printf("Enables or disable automatic tag throttling.\n");
is_error = true;
continue;
}
state bool autoTagThrottlingEnabled = tokencmp(tokens[1], "enable");
wait(ThrottleApi::enableAuto(db, autoTagThrottlingEnabled));
printf("Automatic tag throttling has been %s\n", autoTagThrottlingEnabled ? "enabled" : "disabled");
}
else {
printUsage(tokens[0]);
is_error = true;
}
continue;
}
printf("ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
is_error = true;
}

View File

@ -61,6 +61,8 @@ set(FDBCLIENT_SRCS
Subspace.h
SystemData.cpp
SystemData.h
TagThrottle.actor.cpp
TagThrottle.h
TaskBucket.actor.cpp
TaskBucket.h
ThreadSafeTransaction.actor.cpp

View File

@ -125,9 +125,17 @@ public:
QueueModel queueModel;
bool enableLocalityLoadBalance;
struct VersionRequest {
Promise<GetReadVersionReply> reply;
Standalone<VectorRef<StringRef>> tags;
Optional<UID> debugID;
VersionRequest(Standalone<VectorRef<StringRef>> tags = Standalone<VectorRef<StringRef>>(), Optional<UID> debugID = Optional<UID>()) : tags(tags), debugID(debugID) {}
};
// Transaction start request batching
struct VersionBatcher {
PromiseStream< std::pair< Promise<GetReadVersionReply>, Optional<UID> > > stream;
PromiseStream<VersionRequest> stream;
Future<Void> actor;
};
std::map<uint32_t, VersionBatcher> versionBatcher;

View File

@ -988,4 +988,41 @@ struct WorkerBackupStatus {
}
};
struct TagThrottleInfo {
enum class Priority {
BATCH,
DEFAULT,
IMMEDIATE
};
static const char* priorityToString(Priority priority) {
switch(priority) {
case Priority::BATCH:
return "batch";
case Priority::DEFAULT:
return "default";
case Priority::IMMEDIATE:
return "immediate";
}
ASSERT(false);
throw internal_error();
}
double rate;
double expiration;
bool autoThrottled;
Priority priority;
TagThrottleInfo() : rate(0), expiration(0), autoThrottled(false) {}
TagThrottleInfo(double rate, double expiration, bool autoThrottled, Priority priority) : rate(rate), expiration(expiration), autoThrottled(autoThrottled), priority(priority) {}
template<class Ar>
void serialize(Ar& ar) {
serializer(ar, rate, expiration, autoThrottled, priority);
}
};
BINARY_SERIALIZABLE(TagThrottleInfo::Priority);
#endif

View File

@ -3071,13 +3071,13 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
}
}
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Optional<UID> debugID ) {
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Standalone<VectorRef<StringRef>> tags, Optional<UID> debugID ) {
try {
++cx->transactionReadVersionBatches;
if( debugID.present() )
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
loop {
state GetReadVersionRequest req( transactionCount, flags, Standalone<VectorRef<StringRef>>(), debugID );
state GetReadVersionRequest req( transactionCount, flags, tags, debugID );
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
@ -3096,7 +3096,7 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx,
}
}
ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::pair< Promise<GetReadVersionReply>, Optional<UID> > > versionStream, uint32_t flags ) {
ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<DatabaseContext::VersionRequest> versionStream, uint32_t flags ) {
state std::vector< Promise<GetReadVersionReply> > requests;
state PromiseStream< Future<Void> > addActor;
state Future<Void> collection = actorCollection( addActor.getFuture() );
@ -3104,6 +3104,8 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::p
state Optional<UID> debugID;
state bool send_batch;
state Standalone<VectorRef<StringRef>> tags;
// dynamic batching
state PromiseStream<double> replyTimes;
state PromiseStream<Error> _errorStream;
@ -3111,18 +3113,21 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::p
loop {
send_batch = false;
choose {
when(std::pair<Promise<GetReadVersionReply>, Optional<UID>> req = waitNext(versionStream)) {
if (req.second.present()) {
// TODO: we have to rethink how we send batches to the MP to deal with tags
when(DatabaseContext::VersionRequest req = waitNext(versionStream)) {
if (req.debugID.present()) {
if (!debugID.present()) {
debugID = nondeterministicRandom()->randomUniqueID();
}
g_traceBatch.addAttach("TransactionAttachID", req.second.get().first(), debugID.get().first());
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
}
requests.push_back(req.first);
if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
requests.push_back(req.reply);
tags = req.tags;
//if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
send_batch = true;
else if (!timeout.isValid())
timeout = delay(batchTime, TaskPriority::GetConsistentReadVersion);
//else if (!timeout.isValid())
//timeout = delay(batchTime, TaskPriority::GetConsistentReadVersion);
}
when(wait(timeout.isValid() ? timeout : Never())) { send_batch = true; }
// dynamic batching monitors reply latencies
@ -3141,7 +3146,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::p
addActor.send(ready(timeReply(GRVReply.getFuture(), replyTimes)));
Future<Void> batch = incrementalBroadcastWithError(
getConsistentReadVersion(cx, count, flags, std::move(debugID)),
getConsistentReadVersion(cx, count, flags, tags, std::move(debugID)),
std::vector<Promise<GetReadVersionReply>>(std::move(requests)), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
debugID = Optional<UID>();
requests = std::vector< Promise<GetReadVersionReply> >();
@ -3208,10 +3213,10 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), flags );
}
Promise<GetReadVersionReply> p;
batcher.stream.send( std::make_pair( p, info.debugID ) );
auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID);
batcher.stream.send(req);
startTime = now();
readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, p.getFuture(), options.lockAware, startTime, metadataVersion);
readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion);
}
return readVersion;
}

View File

@ -295,6 +295,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"transactions_per_second_limit":0,
"batch_released_transactions_per_second":0,
"released_transactions_per_second":0,
"batch_tags_throttled":0,
"tags_throttled":0,
"limiting_queue_bytes_storage_server":0,
"worst_queue_bytes_storage_server":0,
"limiting_version_lag_storage_server":0,

View File

@ -562,6 +562,20 @@ const KeyRef moveKeysLockWriteKey = LiteralStringRef("\xff/moveKeysLock/Write");
const KeyRef dataDistributionModeKey = LiteralStringRef("\xff/dataDistributionMode");
const UID dataDistributionModeLock = UID(6345,3425);
// Keys to view and control tag throttling
const KeyRangeRef tagThrottleKeys = KeyRangeRef(
LiteralStringRef("\xff\x02/throttledTags/tag/"),
LiteralStringRef("\xff\x02/throttledTags/tag0"));
const KeyRef tagThrottleKeysPrefix = tagThrottleKeys.begin;
const KeyRef tagThrottleSignalKey = LiteralStringRef("\xff\x02/throttledTags/signal");
const KeyRef tagThrottleAutoEnabledKey = LiteralStringRef("\xff\x02/throttledTags/autoThrottlingEnabled");
TagThrottleInfo decodeTagThrottleValue(const ValueRef& value) {
TagThrottleInfo throttleInfo;
BinaryReader reader(value, IncludeVersion());
reader >> throttleInfo;
return throttleInfo;
}
// Client status info prefix
const KeyRangeRef fdbClientInfoPrefixRange(LiteralStringRef("\xff\x02/fdbClientInfo/"), LiteralStringRef("\xff\x02/fdbClientInfo0"));
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("\xff\x02/fdbClientInfo/client_txn_sample_rate/");

View File

@ -212,6 +212,12 @@ extern const KeyRef moveKeysLockOwnerKey, moveKeysLockWriteKey;
extern const KeyRef dataDistributionModeKey;
extern const UID dataDistributionModeLock;
// Keys to view and control tag throttling
extern const KeyRangeRef tagThrottleKeys;
extern const KeyRef tagThrottleKeysPrefix;
extern const KeyRef tagThrottleSignalKey;
extern const KeyRef tagThrottleAutoEnabledKey;
TagThrottleInfo decodeTagThrottleValue(const ValueRef& value);
// Log Range constant variables
// \xff/logRanges/[16-byte UID][begin key] := serialize( make_pair([end key], [destination key prefix]), IncludeVersion() )

View File

@ -91,6 +91,7 @@
<ActorCompiler Include="RestoreWorkerInterface.actor.h">
<EnableCompile>false</EnableCompile>
</ActorCompiler>
<ClInclude Include="TagThrottle.h" />
<ClInclude Include="TaskBucket.h" />
<ClInclude Include="ThreadSafeTransaction.h" />
<ClInclude Include="Tuple.h" />
@ -129,6 +130,7 @@
<ClCompile Include="Schemas.cpp" />
<ClCompile Include="SystemData.cpp" />
<ClCompile Include="sha1\SHA1.cpp" />
<ActorCompiler Include="TagThrottle.actor.cpp" />
<ActorCompiler Include="ThreadSafeTransaction.actor.cpp" />
<ActorCompiler Include="TaskBucket.actor.cpp" />
<ClCompile Include="Subspace.cpp" />

View File

@ -471,6 +471,11 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( DURABILITY_LAG_INCREASE_RATE, 1.001 );
init( STORAGE_SERVER_LIST_FETCH_TIMEOUT, 20.0 );
init( MAX_THROTTLED_TAGS, 10 ); if(randomize && BUGGIFY) MAX_THROTTLED_TAGS = 1;
init( MIN_TAG_BUSYNESS, 0.1 ); if(randomize && BUGGIFY) MIN_TAG_BUSYNESS = 0.0;
init( TAG_THROTTLE_DURATION, 120.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_DURATION = 5.0;
init( AUTO_TAG_THROTTLING_ENABLED, true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false;
//Storage Metrics
init( STORAGE_METRICS_AVERAGE_INTERVAL, 120.0 );
init( STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, 1000.0 / STORAGE_METRICS_AVERAGE_INTERVAL ); // milliHz!

View File

@ -384,7 +384,8 @@ public:
int64_t MAX_THROTTLED_TAGS;
int64_t MIN_TAG_BUSYNESS;
double TAG_THROTTLE_DURATION;
bool AUTO_TAG_THROTTLING_ENABLED;
// disk snapshot
double SNAP_CREATE_MAX_TIMEOUT;

View File

@ -94,12 +94,18 @@ struct StorageQueueInfo {
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
limitReason_t limitReason;
Optional<Standalone<StringRef>> busiestTag;
double busiestTagFractionalBusyness;
double busiestTagRate;
StorageQueueInfo(UID id, LocalityData locality)
: valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited) {
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited), busiestTagFractionalBusyness(0),
busiestTagRate(0) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1;
}
@ -164,6 +170,8 @@ struct TransactionCounts {
};
struct RatekeeperData {
Database db;
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
@ -178,18 +186,28 @@ struct RatekeeperData {
double lastWarning;
double lastSSListFetchedTimestamp;
typedef std::map<Standalone<StringRef>, TagThrottleInfo> ThrottleMap;
std::map<TagThrottleInfo::Priority, ThrottleMap> tagThrottles;
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;
Deque<double> actualTpsHistory;
Optional<Key> remoteDC;
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
bool autoThrottlingEnabled;
RatekeeperData(Database db) : db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
lastWarning(0), lastSSListFetchedTimestamp(now()),
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS),
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH)
{}
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH),
autoThrottlingEnabled(false)
{
tagThrottles.try_emplace(TagThrottleInfo::Priority::IMMEDIATE);
tagThrottles.try_emplace(TagThrottleInfo::Priority::DEFAULT);
tagThrottles.try_emplace(TagThrottleInfo::Priority::BATCH);
}
};
//SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
@ -222,6 +240,10 @@ ACTOR Future<Void> trackStorageServerQueueInfo( RatekeeperData* self, StorageSer
myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion);
myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version);
}
myQueueInfo->value.busiestTag = reply.get().busiestTag;
myQueueInfo->value.busiestTagFractionalBusyness = reply.get().busiestTagFractionalBusyness;
myQueueInfo->value.busiestTagRate = reply.get().busiestTagRate;
} else {
if(myQueueInfo->value.valid) {
TraceEvent("RkStorageServerDidNotRespond", ssi.id());
@ -312,11 +334,9 @@ ACTOR Future<Void> trackEachStorageServer(
ACTOR Future<Void> monitorServerListChange(
RatekeeperData* self,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges) {
state Database db = openDBOnServer(dbInfo, TaskPriority::Ratekeeper, true, true);
state std::map<UID, StorageServerInterface> oldServers;
state Transaction tr(db);
state Transaction tr(self->db);
loop {
try {
@ -345,7 +365,7 @@ ACTOR Future<Void> monitorServerListChange(
}
oldServers.swap(newServers);
tr = Transaction(db);
tr = Transaction(self->db);
wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY));
} catch(Error& e) {
wait( tr.onError(e) );
@ -353,7 +373,96 @@ ACTOR Future<Void> monitorServerListChange(
}
}
void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
std::string capitalize(std::string str) {
if(str[0] >= 'a' and str[0] <= 'z') {
str[0] += 'A' - 'a';
}
return str;
}
ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
loop {
state ReadYourWritesTransaction tr(self->db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Future<Standalone<RangeResultRef>> throttledTags = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY);
state Future<Optional<Value>> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey);
wait(success(throttledTags) && success(autoThrottlingEnabled));
ASSERT(!throttledTags.get().more && throttledTags.get().size() < CLIENT_KNOBS->TOO_MANY); // TODO: impose throttled tag limit
if(autoThrottlingEnabled.get().present() && autoThrottlingEnabled.get().get() == LiteralStringRef("0")) {
if(self->autoThrottlingEnabled) {
TraceEvent("AutoTagThrottlingDisabled");
}
self->autoThrottlingEnabled = false;
}
else if(autoThrottlingEnabled.get().present() && autoThrottlingEnabled.get().get() == LiteralStringRef("1")) {
if(!self->autoThrottlingEnabled) {
TraceEvent("AutoTagThrottlingEnabled");
}
self->autoThrottlingEnabled = true;
}
else {
if(autoThrottlingEnabled.get().present()) {
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue").detail("Value", autoThrottlingEnabled.get().get());
}
self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
}
TraceEvent("RatekeeperReadThrottles").detail("NumThrottledTags", throttledTags.get().size());
std::map<TagThrottleInfo::Priority, RatekeeperData::ThrottleMap> newThrottles;
std::map<TagThrottleInfo::Priority, std::pair<RatekeeperData::ThrottleMap::iterator, RatekeeperData::ThrottleMap::iterator>> oldThrottleIterators;
for(auto t : self->tagThrottles) {
oldThrottleIterators[t.first] = std::make_pair(t.second.begin(), t.second.end());
}
for(auto entry : throttledTags.get()) {
StringRef tag = entry.key.substr(tagThrottleKeysPrefix.size());
TagThrottleInfo throttleInfo = decodeTagThrottleValue(entry.value);
TraceEvent("RatekeeperReadThrottleRead").detail("Tag", tag).detail("Expiration", throttleInfo.expiration);
if((!self->autoThrottlingEnabled && throttleInfo.autoThrottled) || throttleInfo.expiration <= now()) { // TODO: keep or delete auto throttles when disabling auto-throttling
tr.clear(tag);
}
else {
auto oldItr = oldThrottleIterators[throttleInfo.priority];
while(oldItr.first != oldItr.second && oldItr.first->first < tag) {
++oldItr.first;
}
if(oldItr.first == oldItr.second || oldItr.first->first != tag || oldItr.first->second.rate < throttleInfo.rate * 0.95) {
TraceEvent("RatekeeperDetectedThrottle")
.detail("Tag", tag)
.detail("Rate", throttleInfo.rate)
.detail("Priority", capitalize(TagThrottleInfo::priorityToString(throttleInfo.priority)))
.detail("SecondsToExpiration", throttleInfo.expiration - now())
.detail("AutoThrottled", throttleInfo.autoThrottled);
}
newThrottles[throttleInfo.priority][tag] = throttleInfo;
}
}
self->tagThrottles = newThrottles;
state Future<Void> watchFuture = tr.watch(tagThrottleSignalKey);
wait(tr.commit());
wait(watchFuture);
TraceEvent("RatekeeperThrottleSignaled");
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
}
void updateRate(RatekeeperData* self, RatekeeperLimits* limits, RatekeeperData::ThrottleMap& throttledTags) {
//double controlFactor = ; // dt / eFoldingTime
double actualTps = self->smoothReleasedTransactions.smoothRate();
@ -420,6 +529,29 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
// TODO: limit the number of throttles active for a storage server
if(storageQueue > targetBytes / 2.0 && ss.busiestTag.present() && ss.busiestTagFractionalBusyness > 0.2
&& ss.busiestTagRate > 1000 && throttledTags.size() < 10) {
auto &throttle = throttledTags[ss.busiestTag.get()];
double throttleRate = (storageQueue - targetBytes / 2.0) / targetBytes / 2.0;
if(throttle.expiration <= now() || throttle.rate < throttleRate * 0.95) {
TraceEvent(format("RatekeeperThrottlingTag%s", limits->context).c_str())
.detail("Tag", ss.busiestTag.get())
.detail("ThrottleRate", throttleRate);
}
if(throttle.expiration <= now()) {
throttle.rate = throttleRate;
}
else {
throttle.expiration = std::max(throttle.rate, throttleRate);
}
throttle.expiration = now() + 120.0;
}
double inputRate = ss.smoothInputBytes.smoothRate();
//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
@ -687,14 +819,14 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
.detail("WorstStorageServerDurabilityLag", worstDurabilityLag)
.detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag)
.detail("TagsThrottled", throttledTags.size())
.trackLatest(name);
}
}
ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo, DatabaseConfiguration* conf) {
state Database cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
ACTOR Future<Void> configurationMonitor(RatekeeperData *self) {
loop {
state ReadYourWritesTransaction tr(cx);
state ReadYourWritesTransaction tr(self->db);
loop {
try {
@ -703,7 +835,7 @@ ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo
Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
conf->fromKeyValues( (VectorRef<KeyValueRef>) results );
self->configuration.fromKeyValues( (VectorRef<KeyValueRef>) results );
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey);
wait( tr.commit() );
@ -717,7 +849,7 @@ ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
state RatekeeperData self;
state RatekeeperData self(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true));
state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers;
state std::vector<TLogInterface> tlogInterfs;
@ -726,12 +858,14 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
TraceEvent("RatekeeperStarting", rkInterf.id());
self.addActor.send( waitFailureServer(rkInterf.waitFailure.getFuture()) );
self.addActor.send( configurationMonitor(dbInfo, &self.configuration) );
self.addActor.send( configurationMonitor(&self) );
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
self.addActor.send( monitorServerListChange(&self, dbInfo, serverChanges) );
self.addActor.send( monitorServerListChange(&self, serverChanges) );
self.addActor.send( trackEachStorageServer(&self, serverChanges.getFuture()) );
self.addActor.send(monitorThrottlingChanges(&self));
TraceEvent("RkTLogQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));
@ -748,8 +882,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
state bool lastLimited = false;
loop choose {
when (wait( timeout )) {
updateRate(&self, &self.normalLimits);
updateRate(&self, &self.batchLimits);
updateRate(&self, &self.normalLimits, self.tagThrottles[TagThrottleInfo::Priority::DEFAULT]);
updateRate(&self, &self.batchLimits, self.tagThrottles[TagThrottleInfo::Priority::BATCH]);
lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
double tooOld = now() - 1.0;
@ -781,6 +915,19 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxy_transactionCounts.size();
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
// TODO: avoid iteration every time
for(auto priorityItr = self.tagThrottles.begin(); priorityItr != self.tagThrottles.end(); ++priorityItr) {
for(auto tagItr = priorityItr->second.begin(); tagItr != priorityItr->second.end();) {
if(tagItr->second.expiration > now()) {
reply.throttledTags[tagItr->first] = tagItr->second.rate;
++tagItr;
}
else {
tagItr = priorityItr->second.erase(tagItr);
}
}
}
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit;
reply.healthMetrics.batchLimited = lastLimited;

View File

@ -1709,6 +1709,8 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
double transPerSec = ratekeeper.getDouble("ReleasedTPS");
double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
int throttledTags = ratekeeper.getInt("TagsThrottled");
int batchThrottledTags = batchRatekeeper.getInt("TagsThrottled");
int ssCount = ratekeeper.getInt("StorageServers");
int tlogCount = ratekeeper.getInt("TLogs");
int64_t worstFreeSpaceStorageServer = ratekeeper.getInt64("WorstFreeSpaceStorageServer");
@ -1739,6 +1741,8 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
(*qos)["batch_transactions_per_second_limit"] = batchTpsLimit;
(*qos)["released_transactions_per_second"] = transPerSec;
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
(*qos)["tags_throttled"] = throttledTags;
(*qos)["batch_tags_throttled"] = batchThrottledTags;
JsonBuilderObject perfLimit = getPerfLimit(ratekeeper, transPerSec, tpsLimit);
if(!perfLimit.empty()) {

View File

@ -114,6 +114,45 @@ Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_un
return ret;
}
// Parses a duration with one of the following suffixes and returns the duration in seconds
// s - seconds
// m - minutes
// h - hours
// d - days
Optional<uint64_t> parseDuration(std::string str, std::string defaultUnit) {
char *endptr;
uint64_t ret = strtoull(str.c_str(), &endptr, 10);
if (endptr == str.c_str()) {
return Optional<uint64_t>();
}
std::string unit;
if (*endptr == '\0') {
if (!defaultUnit.empty()) {
unit = defaultUnit;
} else {
return Optional<uint64_t>();
}
} else {
unit = endptr;
}
if (!unit.compare("s")) {
// Nothing to do
} else if (!unit.compare("m")) {
ret *= 60;
} else if (!unit.compare("h")) {
ret *= 60 * 60;
} else if (!unit.compare("d")) {
ret *= 24 * 60 * 60;
} else {
return Optional<uint64_t>();
}
return ret;
}
int vsformat( std::string &outputString, const char* form, va_list args) {
char buf[200];

View File

@ -84,6 +84,7 @@ bool validationIsEnabled(BuggifyType type);
#define EXPENSIVE_VALIDATION (validationIsEnabled(BuggifyType::General) && deterministicRandom()->random01() < P_EXPENSIVE_VALIDATION)
extern Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_unit = "");
extern Optional<uint64_t> parseDuration(std::string str, std::string defaultUnit = "");
extern std::string format(const char* form, ...);
// On success, returns the number of characters written. On failure, returns a negative number.