From 2336f073adb067bc67b34b027917df98b3d594c0 Mon Sep 17 00:00:00 2001
From: "A.J. Beamon" <ajbeamon@apple.com>
Date: Fri, 3 Apr 2020 15:24:14 -0700
Subject: [PATCH] Checkpointing a bunch of work on throttles. Rudimentary
 implementation of auto-throttling. Support for manual throttling via fdbcli.
 Throttles are stored in the system keyspace.

---
 .../source/mr-status-json-schemas.rst.inc     |   2 +
 fdbcli/fdbcli.actor.cpp                       | 159 +++++++++++++++
 fdbclient/CMakeLists.txt                      |   2 +
 fdbclient/DatabaseContext.h                   |  10 +-
 fdbclient/FDBTypes.h                          |  37 ++++
 fdbclient/NativeAPI.actor.cpp                 |  33 ++--
 fdbclient/Schemas.cpp                         |   2 +
 fdbclient/SystemData.cpp                      |  14 ++
 fdbclient/SystemData.h                        |   6 +
 fdbclient/fdbclient.vcxproj                   |   2 +
 fdbserver/Knobs.cpp                           |   5 +
 fdbserver/Knobs.h                             |   3 +-
 fdbserver/Ratekeeper.actor.cpp                | 183 ++++++++++++++++--
 fdbserver/Status.actor.cpp                    |   4 +
 flow/flow.cpp                                 |  39 ++++
 flow/flow.h                                   |   1 +
 16 files changed, 468 insertions(+), 34 deletions(-)

diff --git a/documentation/sphinx/source/mr-status-json-schemas.rst.inc b/documentation/sphinx/source/mr-status-json-schemas.rst.inc
index c819729684..a04bbc864c 100644
--- a/documentation/sphinx/source/mr-status-json-schemas.rst.inc
+++ b/documentation/sphinx/source/mr-status-json-schemas.rst.inc
@@ -267,6 +267,8 @@
          "transactions_per_second_limit":0,
          "batch_released_transactions_per_second":0,
          "released_transactions_per_second":0,
+         "batch_tags_throttled":0,
+         "tags_throttled":0,
          "limiting_queue_bytes_storage_server":0,
          "worst_queue_bytes_storage_server":0,
          "limiting_version_lag_storage_server":0,
diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp
index e65383af3c..d627340bf4 100644
--- a/fdbcli/fdbcli.actor.cpp
+++ b/fdbcli/fdbcli.actor.cpp
@@ -30,6 +30,7 @@
 #include "fdbclient/Schemas.h"
 #include "fdbclient/CoordinationInterface.h"
 #include "fdbclient/FDBOptions.g.h"
+#include "fdbclient/TagThrottle.h"
 
 #include "flow/DeterministicRandom.h"
 #include "flow/Platform.h"
@@ -565,6 +566,11 @@ void initHelp() {
 		"consistencycheck [on|off]",
 		"permits or prevents consistency checking",
 		"Calling this command with `on' permits consistency check processes to run and `off' will halt their checking. Calling this command with no arguments will display if consistency checking is currently allowed.\n");
+	helpMap["throttle"] = CommandHelp(
+		"throttle <on|off|enable auto|disable auto|list> [ARGS]",
+		"view and control throttled tags",
+		"Use `on' and `off' to manually throttle or unthrottle tags. Use `enable auto' or `disable auto' to enable or disable automatic tag throttling. Use `list' to print the list of throttled tags.\n"
+	);
 
 	hiddenCommands.insert("expensive_data_check");
 	hiddenCommands.insert("datadistribution");
@@ -3682,6 +3688,159 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 					continue;
 				}
 
+				if(tokencmp(tokens[0], "throttle")) {
+					if(tokens.size() == 1) {
+						printUsage(tokens[0]);
+						is_error = true;
+						continue;
+					}
+					else if(tokencmp(tokens[1], "list")) {
+						if(tokens.size() > 4) {
+							printf("Usage: throttle list [LIMIT] [PREFIX]\n");
+							printf("\n");
+							printf("Lists tags that are currently throttled, optionally limited to a certain tag PREFIX.\n");
+							printf("The default LIMIT is 100 tags, and by default all tags will be searched.\n");
+							is_error = true;
+							continue;
+						}
+
+						state int throttleListLimit = 100;
+						state StringRef prefix;
+						if(tokens.size() >= 3) {
+							char *end;
+							throttleListLimit = std::strtol((const char*)tokens[2].begin(), &end, 10);
+							if ((tokens.size() > 3 && !std::isspace(*end)) || (tokens.size() == 3 && *end != '\0')) {
+								printf("ERROR: failed to parse limit `%s'.\n", printable(tokens[2]).c_str());
+								is_error = true;
+								continue;
+							}
+						}
+						if(tokens.size() >= 4) {
+							prefix = tokens[3];
+						}
+
+						std::map<Standalone<StringRef>, TagThrottleInfo> tags = wait(ThrottleApi::getTags(db, throttleListLimit, prefix));
+
+						std::string prefixString = "";
+						if(prefix.size() > 0) {
+							prefixString = format(" with prefix `%s'", prefix.toString().c_str());
+						}
+
+						if(tags.size() > 0) {
+							printf("Throttled tags%s:\n\n", prefixString.c_str());
+							printf("  Rate | Expiration (s) | Priority  | Type   | Tag\n");
+							printf(" ------+----------------+-----------+--------+------------------\n");
+							for(auto itr = tags.begin(); itr != tags.end(); ++itr) {
+								printf("  %3d%% | %13ds | %9s | %6s | %s\n", 
+								       (int)(itr->second.rate*100), 
+									   (int)(itr->second.expiration-now()), 
+									   TagThrottleInfo::priorityToString(itr->second.priority), 
+									   itr->second.autoThrottled ? "auto" : "manual", 
+									   itr->first.substr(tagThrottleKeysPrefix.size()).toString().c_str());
+							}
+
+							if(tags.size() == throttleListLimit) {
+								printf("\nThe tag limit `%d' was reached. Use the [LIMIT] or [PREFIX] arguments to view additional tags.\n", throttleListLimit);
+								printf("Usage: throttle list [LIMIT] [PREFIX]\n");
+							}
+						}
+						else {
+							printf("There are no throttled tags%s\n", prefixString.c_str());
+						}
+					}
+					else if(tokencmp(tokens[1], "on") && tokens.size() <=6) {	
+						if(tokens.size() < 4 || !tokencmp(tokens[2], "tag")) {
+							printf("Usage: throttle on tag <TAG> [RATE] [DURATION]\n");
+							printf("\n");
+							printf("Enables throttling for transactions with the specified tag.\n");
+							printf("An optional throttling rate (out of 1.0) can be specified (default 1.0).\n");
+							printf("An optional duration can be specified, which must include a time suffix (s, m, h, d) (default 1h).\n");
+							is_error = true;
+							continue;
+						}
+
+						double rate = 1.0;
+						uint64_t duration = 3600;
+
+						if(tokens.size() >= 5) {
+							char *end;
+							rate = std::strtod((const char*)tokens[4].begin(), &end);
+							if((tokens.size() > 5 && !std::isspace(*end)) || (tokens.size() == 5 && *end != '\0')) {
+								printf("ERROR: failed to parse rate `%s'.\n", printable(tokens[4]).c_str());
+								is_error = true;
+								continue;
+							}
+							if(rate <= 0 || rate > 1) {
+								printf("ERROR: invalid rate `%f'; must satisfy 0 < rate <= 1\n", rate);
+								is_error = true;
+								continue;
+							}
+						}
+						if(tokens.size() == 6) {
+							char *end;
+							Optional<uint64_t> parsedDuration = parseDuration(tokens[5].toString());
+							if(!parsedDuration.present()) {
+								printf("ERROR: failed to parse duration `%s'.\n", printable(tokens[5]).c_str());
+								is_error = true;
+								continue;
+							}
+							duration = parsedDuration.get();
+						}
+
+						wait(ThrottleApi::throttleTag(db, tokens[3], rate, now()+duration, false)); // TODO: express in versions or somehow deal with time?
+						printf("Tag `%s' has been throttled\n", tokens[3].toString().c_str());
+					}
+					else if(tokencmp(tokens[1], "off")) {
+						if(tokencmp(tokens[2], "tag") && tokens.size() == 4) {
+							bool success = wait(ThrottleApi::unthrottleTag(db, tokens[3]));
+							if(success) {
+								printf("Unthrottled tag `%s'\n", tokens[3].toString().c_str());
+							}
+							else {
+								printf("Tag `%s' was not throttled\n", tokens[3].toString().c_str());
+							}
+						}
+						else if(tokencmp(tokens[2], "all") && tokens.size() == 3) {
+							uint64_t unthrottledTags = wait(ThrottleApi::unthrottleAll(db));
+							printf("Unthrottled %lld tags\n", unthrottledTags);
+						}
+						else if(tokencmp(tokens[2], "auto") && tokens.size() == 3) {
+							uint64_t unthrottledTags = wait(ThrottleApi::unthrottleAuto(db));
+							printf("Unthrottled %lld tags\n", unthrottledTags);
+						}
+						else if(tokencmp(tokens[2], "manual") && tokens.size() == 3) {
+							uint64_t unthrottledTags = wait(ThrottleApi::unthrottleManual(db));
+							printf("Unthrottled %lld tags\n", unthrottledTags);
+						}
+						else {
+							printf("Usage: throttle off <all|auto|manual|tag> [TAG]\n");
+							printf("\n");
+							printf("Disables throttling for the specified tag(s).\n");
+							printf("Use `all' to turn off all tag throttles, `auto' to turn off throttles created by\n");
+							printf("the cluster, and `manual' to turn off throttles created manually. Use `tag <TAG>'\n");
+							printf("to turn off throttles for a specific tag\n");
+							is_error = true;
+						}
+					}
+					else if((tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) && tokens.size() == 3 && tokencmp(tokens[2], "auto")) {
+						if(tokens.size() != 3 || !tokencmp(tokens[2], "auto")) {
+							printf("Usage: throttle <enable|disable> auto\n");
+							printf("\n");
+							printf("Enables or disable automatic tag throttling.\n");
+							is_error = true;
+							continue;
+						}
+						state bool autoTagThrottlingEnabled = tokencmp(tokens[1], "enable");
+						wait(ThrottleApi::enableAuto(db, autoTagThrottlingEnabled));
+						printf("Automatic tag throttling has been %s\n", autoTagThrottlingEnabled ? "enabled" : "disabled");
+					}
+					else {
+						printUsage(tokens[0]);
+						is_error = true;
+					}
+					continue;
+				}
+
 				printf("ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
 				is_error = true;
 			}
diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt
index f8ac1f310d..2e11f8500e 100644
--- a/fdbclient/CMakeLists.txt
+++ b/fdbclient/CMakeLists.txt
@@ -61,6 +61,8 @@ set(FDBCLIENT_SRCS
   Subspace.h
   SystemData.cpp
   SystemData.h
+  TagThrottle.actor.cpp
+  TagThrottle.h
   TaskBucket.actor.cpp
   TaskBucket.h
   ThreadSafeTransaction.actor.cpp
diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h
index dad82a6923..3105450105 100644
--- a/fdbclient/DatabaseContext.h
+++ b/fdbclient/DatabaseContext.h
@@ -125,9 +125,17 @@ public:
 	QueueModel queueModel;
 	bool enableLocalityLoadBalance;
 
+	struct VersionRequest {
+		Promise<GetReadVersionReply> reply;
+		Standalone<VectorRef<StringRef>> tags;
+		Optional<UID> debugID;
+
+		VersionRequest(Standalone<VectorRef<StringRef>> tags = Standalone<VectorRef<StringRef>>(), Optional<UID> debugID = Optional<UID>()) : tags(tags), debugID(debugID) {}
+	};
+
 	// Transaction start request batching
 	struct VersionBatcher {
-		PromiseStream< std::pair< Promise<GetReadVersionReply>, Optional<UID> > > stream;
+		PromiseStream<VersionRequest> stream;
 		Future<Void> actor;
 	};
 	std::map<uint32_t, VersionBatcher> versionBatcher;
diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h
index 6111b23138..c6846374f1 100644
--- a/fdbclient/FDBTypes.h
+++ b/fdbclient/FDBTypes.h
@@ -988,4 +988,41 @@ struct WorkerBackupStatus {
 	}
 };
 
+struct TagThrottleInfo {
+	enum class Priority {
+		BATCH,
+		DEFAULT,
+		IMMEDIATE
+	};
+
+	static const char* priorityToString(Priority priority) {
+		switch(priority) {
+			case Priority::BATCH:
+				return "batch";
+			case Priority::DEFAULT:
+				return "default";
+			case Priority::IMMEDIATE:
+				return "immediate";
+		}
+
+		ASSERT(false);
+		throw internal_error();
+	}
+
+	double rate;
+	double expiration;
+	bool autoThrottled;
+	Priority priority;
+
+	TagThrottleInfo() : rate(0), expiration(0), autoThrottled(false) {}
+	TagThrottleInfo(double rate, double expiration, bool autoThrottled, Priority priority) : rate(rate), expiration(expiration), autoThrottled(autoThrottled), priority(priority) {}
+
+	template<class Ar>
+	void serialize(Ar& ar) {
+		serializer(ar, rate, expiration, autoThrottled, priority);
+	}
+};
+
+BINARY_SERIALIZABLE(TagThrottleInfo::Priority);
+
 #endif
diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp
index 0ca8775ff4..45a1c3d7d6 100644
--- a/fdbclient/NativeAPI.actor.cpp
+++ b/fdbclient/NativeAPI.actor.cpp
@@ -3071,13 +3071,13 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
 	}
 }
 
-ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Optional<UID> debugID ) {
+ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Standalone<VectorRef<StringRef>> tags, Optional<UID> debugID ) {
 	try {
 		++cx->transactionReadVersionBatches;
 		if( debugID.present() )
 			g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
 		loop {
-			state GetReadVersionRequest req( transactionCount, flags, Standalone<VectorRef<StringRef>>(), debugID );
+			state GetReadVersionRequest req( transactionCount, flags, tags, debugID );
 			choose {
 				when ( wait( cx->onMasterProxiesChanged() ) ) {}
 				when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
@@ -3096,7 +3096,7 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx,
 	}
 }
 
-ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::pair< Promise<GetReadVersionReply>, Optional<UID> > > versionStream, uint32_t flags ) {
+ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<DatabaseContext::VersionRequest> versionStream, uint32_t flags ) {
 	state std::vector< Promise<GetReadVersionReply> > requests;
 	state PromiseStream< Future<Void> > addActor;
 	state Future<Void> collection = actorCollection( addActor.getFuture() );
@@ -3104,6 +3104,8 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::p
 	state Optional<UID> debugID;
 	state bool send_batch;
 
+	state Standalone<VectorRef<StringRef>> tags;
+
 	// dynamic batching
 	state PromiseStream<double> replyTimes;
 	state PromiseStream<Error> _errorStream;
@@ -3111,18 +3113,21 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::p
 	loop {
 		send_batch = false;
 		choose {
-			when(std::pair<Promise<GetReadVersionReply>, Optional<UID>> req = waitNext(versionStream)) {
-				if (req.second.present()) {
+			// TODO: we have to rethink how we send batches to the MP to deal with tags
+			when(DatabaseContext::VersionRequest req = waitNext(versionStream)) {
+				if (req.debugID.present()) {
 					if (!debugID.present()) {
 						debugID = nondeterministicRandom()->randomUniqueID();
 					}
-					g_traceBatch.addAttach("TransactionAttachID", req.second.get().first(), debugID.get().first());
+					g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
 				}
-				requests.push_back(req.first);
-				if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
+				requests.push_back(req.reply);
+				tags = req.tags;
+
+				//if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
 					send_batch = true;
-				else if (!timeout.isValid())
-					timeout = delay(batchTime, TaskPriority::GetConsistentReadVersion);
+				//else if (!timeout.isValid())
+					//timeout = delay(batchTime, TaskPriority::GetConsistentReadVersion);
 			}
 			when(wait(timeout.isValid() ? timeout : Never())) { send_batch = true; }
 			// dynamic batching monitors reply latencies
@@ -3141,7 +3146,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::p
 			addActor.send(ready(timeReply(GRVReply.getFuture(), replyTimes)));
 
 			Future<Void> batch = incrementalBroadcastWithError(
-			    getConsistentReadVersion(cx, count, flags, std::move(debugID)),
+			    getConsistentReadVersion(cx, count, flags, tags, std::move(debugID)),
 			    std::vector<Promise<GetReadVersionReply>>(std::move(requests)), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
 			debugID = Optional<UID>();
 			requests = std::vector< Promise<GetReadVersionReply> >();
@@ -3208,10 +3213,10 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
 			batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), flags );
 		}
 
-		Promise<GetReadVersionReply> p;
-		batcher.stream.send( std::make_pair( p, info.debugID ) );
+		auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID);
+		batcher.stream.send(req);
 		startTime = now();
-		readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, p.getFuture(), options.lockAware, startTime, metadataVersion);
+		readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion);
 	}
 	return readVersion;
 }
diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp
index 6ee6ced1cf..f3370c3294 100644
--- a/fdbclient/Schemas.cpp
+++ b/fdbclient/Schemas.cpp
@@ -295,6 +295,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
          "transactions_per_second_limit":0,
          "batch_released_transactions_per_second":0,
          "released_transactions_per_second":0,
+         "batch_tags_throttled":0,
+         "tags_throttled":0,
          "limiting_queue_bytes_storage_server":0,
          "worst_queue_bytes_storage_server":0,
          "limiting_version_lag_storage_server":0,
diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp
index e63b6969ed..686ed85f21 100644
--- a/fdbclient/SystemData.cpp
+++ b/fdbclient/SystemData.cpp
@@ -562,6 +562,20 @@ const KeyRef moveKeysLockWriteKey = LiteralStringRef("\xff/moveKeysLock/Write");
 const KeyRef dataDistributionModeKey = LiteralStringRef("\xff/dataDistributionMode");
 const UID dataDistributionModeLock = UID(6345,3425);
 
+// Keys to view and control tag throttling
+const KeyRangeRef tagThrottleKeys = KeyRangeRef(
+	LiteralStringRef("\xff\x02/throttledTags/tag/"),
+	LiteralStringRef("\xff\x02/throttledTags/tag0"));
+const KeyRef tagThrottleKeysPrefix = tagThrottleKeys.begin;
+const KeyRef tagThrottleSignalKey = LiteralStringRef("\xff\x02/throttledTags/signal");
+const KeyRef tagThrottleAutoEnabledKey = LiteralStringRef("\xff\x02/throttledTags/autoThrottlingEnabled");
+TagThrottleInfo decodeTagThrottleValue(const ValueRef& value) {
+	TagThrottleInfo throttleInfo;
+	BinaryReader reader(value, IncludeVersion());
+	reader >> throttleInfo;
+	return throttleInfo;
+}
+
 // Client status info prefix
 const KeyRangeRef fdbClientInfoPrefixRange(LiteralStringRef("\xff\x02/fdbClientInfo/"), LiteralStringRef("\xff\x02/fdbClientInfo0"));
 const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("\xff\x02/fdbClientInfo/client_txn_sample_rate/");
diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h
index 26c15ac6ad..fdf6f23498 100644
--- a/fdbclient/SystemData.h
+++ b/fdbclient/SystemData.h
@@ -212,6 +212,12 @@ extern const KeyRef moveKeysLockOwnerKey, moveKeysLockWriteKey;
 extern const KeyRef dataDistributionModeKey;
 extern const UID dataDistributionModeLock;
 
+// Keys to view and control tag throttling
+extern const KeyRangeRef tagThrottleKeys;
+extern const KeyRef tagThrottleKeysPrefix;
+extern const KeyRef tagThrottleSignalKey;
+extern const KeyRef tagThrottleAutoEnabledKey;
+TagThrottleInfo decodeTagThrottleValue(const ValueRef& value);
 
 // Log Range constant variables
 // \xff/logRanges/[16-byte UID][begin key] := serialize( make_pair([end key], [destination key prefix]), IncludeVersion() )
diff --git a/fdbclient/fdbclient.vcxproj b/fdbclient/fdbclient.vcxproj
index 555f257509..1ec7c9f43c 100644
--- a/fdbclient/fdbclient.vcxproj
+++ b/fdbclient/fdbclient.vcxproj
@@ -91,6 +91,7 @@
     <ActorCompiler Include="RestoreWorkerInterface.actor.h">
         <EnableCompile>false</EnableCompile>
     </ActorCompiler>
+		<ClInclude Include="TagThrottle.h" />
     <ClInclude Include="TaskBucket.h" />
     <ClInclude Include="ThreadSafeTransaction.h" />
     <ClInclude Include="Tuple.h" />
@@ -129,6 +130,7 @@
     <ClCompile Include="Schemas.cpp" />
     <ClCompile Include="SystemData.cpp" />
     <ClCompile Include="sha1\SHA1.cpp" />
+    <ActorCompiler Include="TagThrottle.actor.cpp" />
     <ActorCompiler Include="ThreadSafeTransaction.actor.cpp" />
     <ActorCompiler Include="TaskBucket.actor.cpp" />
     <ClCompile Include="Subspace.cpp" />
diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp
index d280994258..afa7a994f6 100644
--- a/fdbserver/Knobs.cpp
+++ b/fdbserver/Knobs.cpp
@@ -471,6 +471,11 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
 	init( DURABILITY_LAG_INCREASE_RATE,                        1.001 );
 	init( STORAGE_SERVER_LIST_FETCH_TIMEOUT,                    20.0 );
 
+	init( MAX_THROTTLED_TAGS,                                     10 ); if(randomize && BUGGIFY) MAX_THROTTLED_TAGS = 1;
+	init( MIN_TAG_BUSYNESS,                                      0.1 ); if(randomize && BUGGIFY) MIN_TAG_BUSYNESS = 0.0;
+	init( TAG_THROTTLE_DURATION,                               120.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_DURATION = 5.0;
+	init( AUTO_TAG_THROTTLING_ENABLED,                          true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false;
+
 	//Storage Metrics
 	init( STORAGE_METRICS_AVERAGE_INTERVAL,                    120.0 );
 	init( STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS,        1000.0 / STORAGE_METRICS_AVERAGE_INTERVAL );  // milliHz!
diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h
index df62eeec6a..40dc752a13 100644
--- a/fdbserver/Knobs.h
+++ b/fdbserver/Knobs.h
@@ -384,7 +384,8 @@ public:
 
 	int64_t MAX_THROTTLED_TAGS;
 	int64_t MIN_TAG_BUSYNESS;
-
+	double TAG_THROTTLE_DURATION;
+	bool AUTO_TAG_THROTTLING_ENABLED;
 
 	// disk snapshot
 	double SNAP_CREATE_MAX_TIMEOUT;
diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp
index 7f2a482f54..6afade29df 100644
--- a/fdbserver/Ratekeeper.actor.cpp
+++ b/fdbserver/Ratekeeper.actor.cpp
@@ -94,12 +94,18 @@ struct StorageQueueInfo {
 	Smoother smoothFreeSpace;
 	Smoother smoothTotalSpace;
 	limitReason_t limitReason;
+
+	Optional<Standalone<StringRef>> busiestTag;
+	double busiestTagFractionalBusyness;
+	double busiestTagRate;
+
 	StorageQueueInfo(UID id, LocalityData locality)
 	  : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
 	    smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
 	    smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
 	    smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
-	    smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited) {
+	    smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited), busiestTagFractionalBusyness(0),
+		busiestTagRate(0) {
 		// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
 		lastReply.instanceID = -1;
 	}
@@ -164,6 +170,8 @@ struct TransactionCounts {
 };
 
 struct RatekeeperData {
+	Database db;
+
 	Map<UID, StorageQueueInfo> storageQueueInfo;
 	Map<UID, TLogQueueInfo> tlogQueueInfo;
 
@@ -178,18 +186,28 @@ struct RatekeeperData {
 	double lastWarning;
 	double lastSSListFetchedTimestamp;
 
+	typedef std::map<Standalone<StringRef>, TagThrottleInfo> ThrottleMap;
+	std::map<TagThrottleInfo::Priority, ThrottleMap> tagThrottles;
+
 	RatekeeperLimits normalLimits;
 	RatekeeperLimits batchLimits;
 
 	Deque<double> actualTpsHistory;
 	Optional<Key> remoteDC;
 
-	RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), 
+	bool autoThrottlingEnabled;
+
+	RatekeeperData(Database db) : db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), 
 		actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
 		lastWarning(0), lastSSListFetchedTimestamp(now()),
 		normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS),
-		batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH)
-	{}
+		batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH),
+		autoThrottlingEnabled(false)
+	{
+		tagThrottles.try_emplace(TagThrottleInfo::Priority::IMMEDIATE);
+		tagThrottles.try_emplace(TagThrottleInfo::Priority::DEFAULT);
+		tagThrottles.try_emplace(TagThrottleInfo::Priority::BATCH);
+	}
 };
 
 //SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
@@ -222,6 +240,10 @@ ACTOR Future<Void> trackStorageServerQueueInfo( RatekeeperData* self, StorageSer
 					myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion);
 					myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version);
 				}
+
+				myQueueInfo->value.busiestTag = reply.get().busiestTag;
+				myQueueInfo->value.busiestTagFractionalBusyness = reply.get().busiestTagFractionalBusyness;
+				myQueueInfo->value.busiestTagRate = reply.get().busiestTagRate;
 			} else {
 				if(myQueueInfo->value.valid) {
 					TraceEvent("RkStorageServerDidNotRespond", ssi.id());
@@ -312,11 +334,9 @@ ACTOR Future<Void> trackEachStorageServer(
 
 ACTOR Future<Void> monitorServerListChange(
 		RatekeeperData* self,
-		Reference<AsyncVar<ServerDBInfo>> dbInfo,
 		PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges) {
-	state Database db = openDBOnServer(dbInfo, TaskPriority::Ratekeeper, true, true);
 	state std::map<UID, StorageServerInterface> oldServers;
-	state Transaction tr(db);
+	state Transaction tr(self->db);
 
 	loop {
 		try {
@@ -345,7 +365,7 @@ ACTOR Future<Void> monitorServerListChange(
 			}
 
 			oldServers.swap(newServers);
-			tr = Transaction(db);
+			tr = Transaction(self->db);
 			wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY));
 		} catch(Error& e) {
 			wait( tr.onError(e) );
@@ -353,7 +373,96 @@ ACTOR Future<Void> monitorServerListChange(
 	}
 }
 
-void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
+std::string capitalize(std::string str) {
+	if(str[0] >= 'a' and str[0] <= 'z') {
+		str[0] += 'A' - 'a';
+	}
+
+	return str;
+}
+
+ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
+	loop {
+		state ReadYourWritesTransaction tr(self->db);
+
+		loop {
+			try {
+				tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
+				tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
+
+				state Future<Standalone<RangeResultRef>> throttledTags = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY);
+				state Future<Optional<Value>> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey);
+
+				wait(success(throttledTags) && success(autoThrottlingEnabled));
+				ASSERT(!throttledTags.get().more && throttledTags.get().size() < CLIENT_KNOBS->TOO_MANY); // TODO: impose throttled tag limit
+
+				if(autoThrottlingEnabled.get().present() && autoThrottlingEnabled.get().get() == LiteralStringRef("0")) {
+					if(self->autoThrottlingEnabled) {
+						TraceEvent("AutoTagThrottlingDisabled");
+					}
+					self->autoThrottlingEnabled = false;
+				}
+				else if(autoThrottlingEnabled.get().present() && autoThrottlingEnabled.get().get() == LiteralStringRef("1")) {
+					if(!self->autoThrottlingEnabled) {
+						TraceEvent("AutoTagThrottlingEnabled");
+					}
+					self->autoThrottlingEnabled = true;
+				}
+				else {
+					if(autoThrottlingEnabled.get().present()) {
+						TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue").detail("Value", autoThrottlingEnabled.get().get());
+					}
+					self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
+				}
+
+				TraceEvent("RatekeeperReadThrottles").detail("NumThrottledTags", throttledTags.get().size());
+				std::map<TagThrottleInfo::Priority, RatekeeperData::ThrottleMap> newThrottles;
+				std::map<TagThrottleInfo::Priority, std::pair<RatekeeperData::ThrottleMap::iterator, RatekeeperData::ThrottleMap::iterator>> oldThrottleIterators;
+				for(auto t : self->tagThrottles) {
+					oldThrottleIterators[t.first] = std::make_pair(t.second.begin(), t.second.end());
+				}
+
+				for(auto entry : throttledTags.get()) {
+					StringRef tag = entry.key.substr(tagThrottleKeysPrefix.size());
+					TagThrottleInfo throttleInfo = decodeTagThrottleValue(entry.value);
+					TraceEvent("RatekeeperReadThrottleRead").detail("Tag", tag).detail("Expiration", throttleInfo.expiration);
+					if((!self->autoThrottlingEnabled && throttleInfo.autoThrottled) || throttleInfo.expiration <= now()) { // TODO: keep or delete auto throttles when disabling auto-throttling
+						tr.clear(tag);
+					}
+					else {
+						auto oldItr = oldThrottleIterators[throttleInfo.priority];
+						while(oldItr.first != oldItr.second && oldItr.first->first < tag) {
+							++oldItr.first;
+						}
+
+						if(oldItr.first == oldItr.second || oldItr.first->first != tag || oldItr.first->second.rate < throttleInfo.rate * 0.95) {
+							TraceEvent("RatekeeperDetectedThrottle")
+								.detail("Tag", tag)
+								.detail("Rate", throttleInfo.rate)
+								.detail("Priority", capitalize(TagThrottleInfo::priorityToString(throttleInfo.priority)))
+								.detail("SecondsToExpiration", throttleInfo.expiration - now())
+								.detail("AutoThrottled", throttleInfo.autoThrottled);
+						}
+
+						newThrottles[throttleInfo.priority][tag] = throttleInfo;
+					}
+				}
+
+				self->tagThrottles = newThrottles;
+
+				state Future<Void> watchFuture = tr.watch(tagThrottleSignalKey);
+				wait(tr.commit());
+				wait(watchFuture);
+				TraceEvent("RatekeeperThrottleSignaled");
+				break;
+			} catch (Error& e) {
+				wait(tr.onError(e));
+			}
+		}
+	}
+}
+
+void updateRate(RatekeeperData* self, RatekeeperLimits* limits, RatekeeperData::ThrottleMap& throttledTags) {
 	//double controlFactor = ;  // dt / eFoldingTime
 
 	double actualTps = self->smoothReleasedTransactions.smoothRate();
@@ -420,6 +529,29 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
 
 		double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
 
+		// TODO: limit the number of throttles active for a storage server
+		if(storageQueue > targetBytes / 2.0 && ss.busiestTag.present() && ss.busiestTagFractionalBusyness > 0.2
+		   && ss.busiestTagRate > 1000 && throttledTags.size() < 10) {
+			auto &throttle = throttledTags[ss.busiestTag.get()];
+
+			double throttleRate = (storageQueue - targetBytes / 2.0) / targetBytes / 2.0;
+
+			if(throttle.expiration <= now() || throttle.rate < throttleRate * 0.95) {
+				TraceEvent(format("RatekeeperThrottlingTag%s", limits->context).c_str())
+					.detail("Tag", ss.busiestTag.get())
+					.detail("ThrottleRate", throttleRate);
+			}
+
+			if(throttle.expiration <= now()) {
+				throttle.rate = throttleRate;
+			}
+			else {
+				throttle.expiration = std::max(throttle.rate, throttleRate);
+			}
+
+			throttle.expiration = now() + 120.0;
+		}
+
 		double inputRate = ss.smoothInputBytes.smoothRate();
 		//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
 
@@ -687,14 +819,14 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
 			.detail("LimitingStorageServerVersionLag", limitingVersionLag)
 			.detail("WorstStorageServerDurabilityLag", worstDurabilityLag)
 			.detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag)
+			.detail("TagsThrottled", throttledTags.size())
 			.trackLatest(name);
 	}
 }
 
-ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo, DatabaseConfiguration* conf) {
-	state Database cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
+ACTOR Future<Void> configurationMonitor(RatekeeperData *self) {
 	loop {
-		state ReadYourWritesTransaction tr(cx);
+		state ReadYourWritesTransaction tr(self->db);
 
 		loop {
 			try {
@@ -703,7 +835,7 @@ ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo
 				Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) );
 				ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
 
-				conf->fromKeyValues( (VectorRef<KeyValueRef>) results );
+				self->configuration.fromKeyValues( (VectorRef<KeyValueRef>) results );
 
 				state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey);
 				wait( tr.commit() );
@@ -717,7 +849,7 @@ ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo
 }
 
 ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
-	state RatekeeperData self;
+	state RatekeeperData self(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true));
 	state Future<Void> timeout = Void();
 	state std::vector<Future<Void>> tlogTrackers;
 	state std::vector<TLogInterface> tlogInterfs;
@@ -726,12 +858,14 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
 
 	TraceEvent("RatekeeperStarting", rkInterf.id());
 	self.addActor.send( waitFailureServer(rkInterf.waitFailure.getFuture()) );
-	self.addActor.send( configurationMonitor(dbInfo, &self.configuration) );
+	self.addActor.send( configurationMonitor(&self) );
 
 	PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
-	self.addActor.send( monitorServerListChange(&self, dbInfo, serverChanges) );
+	self.addActor.send( monitorServerListChange(&self, serverChanges) );
 	self.addActor.send( trackEachStorageServer(&self, serverChanges.getFuture()) );
 
+	self.addActor.send(monitorThrottlingChanges(&self));
+
 	TraceEvent("RkTLogQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
 		.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));
 
@@ -748,8 +882,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
 		state bool lastLimited = false;
 		loop choose {
 			when (wait( timeout )) {
-				updateRate(&self, &self.normalLimits);
-				updateRate(&self, &self.batchLimits);
+				updateRate(&self, &self.normalLimits, self.tagThrottles[TagThrottleInfo::Priority::DEFAULT]);
+				updateRate(&self, &self.batchLimits, self.tagThrottles[TagThrottleInfo::Priority::BATCH]);
 
 				lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
 				double tooOld = now() - 1.0;
@@ -781,6 +915,19 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
 				reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxy_transactionCounts.size();
 				reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
 
+				// TODO: avoid iteration every time
+				for(auto priorityItr = self.tagThrottles.begin(); priorityItr != self.tagThrottles.end(); ++priorityItr) {
+					for(auto tagItr = priorityItr->second.begin(); tagItr != priorityItr->second.end();) {
+						if(tagItr->second.expiration > now()) {
+							reply.throttledTags[tagItr->first] = tagItr->second.rate;
+							++tagItr;
+						}
+						else {
+							tagItr = priorityItr->second.erase(tagItr);
+						}
+					}
+				}
+
 				reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
 				reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit;
 				reply.healthMetrics.batchLimited = lastLimited;
diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp
index 535f0e45a3..52e9da1636 100644
--- a/fdbserver/Status.actor.cpp
+++ b/fdbserver/Status.actor.cpp
@@ -1709,6 +1709,8 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
 		double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
 		double transPerSec = ratekeeper.getDouble("ReleasedTPS");
 		double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
+		int throttledTags = ratekeeper.getInt("TagsThrottled");
+		int batchThrottledTags = batchRatekeeper.getInt("TagsThrottled");
 		int ssCount = ratekeeper.getInt("StorageServers");
 		int tlogCount = ratekeeper.getInt("TLogs");
 		int64_t worstFreeSpaceStorageServer = ratekeeper.getInt64("WorstFreeSpaceStorageServer");
@@ -1739,6 +1741,8 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
 		(*qos)["batch_transactions_per_second_limit"] = batchTpsLimit;
 		(*qos)["released_transactions_per_second"] = transPerSec;
 		(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
+		(*qos)["tags_throttled"] = throttledTags;
+		(*qos)["batch_tags_throttled"] = batchThrottledTags;
 
 		JsonBuilderObject perfLimit = getPerfLimit(ratekeeper, transPerSec, tpsLimit);
 		if(!perfLimit.empty()) {
diff --git a/flow/flow.cpp b/flow/flow.cpp
index 66feb0d126..fb1cf81f60 100644
--- a/flow/flow.cpp
+++ b/flow/flow.cpp
@@ -114,6 +114,45 @@ Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_un
 	return ret;
 }
 
+// Parses a duration with one of the following suffixes and returns the duration in seconds
+// s - seconds
+// m - minutes
+// h - hours
+// d - days
+Optional<uint64_t> parseDuration(std::string str, std::string defaultUnit) {
+	char *endptr;
+	uint64_t ret = strtoull(str.c_str(), &endptr, 10);
+
+	if (endptr == str.c_str()) {
+		return Optional<uint64_t>();
+	}
+
+	std::string unit;
+	if (*endptr == '\0') {
+		if (!defaultUnit.empty()) {
+			unit = defaultUnit;
+		} else {
+			return Optional<uint64_t>();
+		}
+	} else {
+		unit = endptr;
+	}
+
+	if (!unit.compare("s")) {
+		// Nothing to do
+	} else if (!unit.compare("m")) {
+		ret *= 60;
+	} else if (!unit.compare("h")) {
+		ret *= 60 * 60;
+	} else if (!unit.compare("d")) {
+		ret *= 24 * 60 * 60;
+	} else {
+		return Optional<uint64_t>();
+	}
+
+	return ret;
+}
+
 int vsformat( std::string &outputString, const char* form, va_list args) {
 	char buf[200];
 
diff --git a/flow/flow.h b/flow/flow.h
index 40d20740e5..92ea704f99 100644
--- a/flow/flow.h
+++ b/flow/flow.h
@@ -84,6 +84,7 @@ bool validationIsEnabled(BuggifyType type);
 #define EXPENSIVE_VALIDATION (validationIsEnabled(BuggifyType::General) && deterministicRandom()->random01() < P_EXPENSIVE_VALIDATION)
 
 extern Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_unit = "");
+extern Optional<uint64_t> parseDuration(std::string str, std::string defaultUnit = "");
 extern std::string format(const char* form, ...);
 
 // On success, returns the number of characters written. On failure, returns a negative number.