diff --git a/bindings/python/tests/fdbcli_tests.py b/bindings/python/tests/fdbcli_tests.py
index 97f79d35b6..35c2d21862 100755
--- a/bindings/python/tests/fdbcli_tests.py
+++ b/bindings/python/tests/fdbcli_tests.py
@@ -599,7 +599,7 @@ def tenants(logger):
     output = run_fdbcli_command('createtenant tenant')
     assert output == 'The tenant `tenant\' has been created'
 
-    output = run_fdbcli_command('createtenant tenant2')
+    output = run_fdbcli_command('createtenant tenant2 tenant_group=tenant_group2')
     assert output == 'The tenant `tenant2\' has been created'
 
     output = run_fdbcli_command('listtenants')
@@ -634,6 +634,61 @@ def tenants(logger):
     assert('printable' in json_output['tenant']['prefix'])
     assert(json_output['tenant']['tenant_state'] == 'ready')
 
+    output = run_fdbcli_command('gettenant tenant2')
+    lines = output.split('\n')
+    assert len(lines) == 3
+    assert lines[0].strip().startswith('id: ')
+    assert lines[1].strip().startswith('prefix: ')
+    assert lines[2].strip() == 'tenant group: tenant_group2'
+
+    output = run_fdbcli_command('gettenant tenant2 JSON')
+    json_output = json.loads(output, strict=False)
+    assert(len(json_output) == 2)
+    assert('tenant' in json_output)
+    assert(json_output['type'] == 'success')
+    assert(len(json_output['tenant']) == 3)
+    assert('id' in json_output['tenant'])
+    assert('prefix' in json_output['tenant'])
+    assert('tenant_group' in json_output['tenant'])
+    assert(len(json_output['tenant']['tenant_group']) == 2)
+    assert('base64' in json_output['tenant']['tenant_group'])
+    assert(json_output['tenant']['tenant_group']['printable'] == 'tenant_group2')
+
+    output = run_fdbcli_command('configuretenant tenant tenant_group=tenant_group1')
+    assert output == 'The configuration for tenant `tenant\' has been updated'
+
+    output = run_fdbcli_command('gettenant tenant')
+    lines = output.split('\n')
+    assert len(lines) == 3
+    assert lines[2].strip() == 'tenant group: tenant_group1'
+
+    output = run_fdbcli_command('configuretenant tenant tenant_group=tenant_group1 tenant_group=tenant_group2')
+    assert output == 'The configuration for tenant `tenant\' has been updated'
+
+    output = run_fdbcli_command('gettenant tenant')
+    lines = output.split('\n')
+    assert len(lines) == 3
+    assert lines[2].strip() == 'tenant group: tenant_group2'
+
+    output = run_fdbcli_command('configuretenant tenant unset tenant_group')
+    assert output == 'The configuration for tenant `tenant\' has been updated'
+
+    output = run_fdbcli_command('gettenant tenant')
+    lines = output.split('\n')
+    assert len(lines) == 2
+
+    output = run_fdbcli_command_and_get_error('configuretenant tenant unset')
+    assert output == 'ERROR: `unset\' specified without a configuration parameter.'
+
+    output = run_fdbcli_command_and_get_error('configuretenant tenant unset tenant_group=tenant_group1')
+    assert output == 'ERROR: unrecognized configuration parameter `tenant_group=tenant_group1\''
+
+    output = run_fdbcli_command_and_get_error('configuretenant tenant tenant_group')
+    assert output == 'ERROR: invalid configuration string `tenant_group\'. String must specify a value using `=\'.'
+
+    output = run_fdbcli_command_and_get_error('configuretenant tenant3 tenant_group=tenant_group1')
+    assert output == 'ERROR: Tenant does not exist (2131)'
+
     output = run_fdbcli_command('usetenant')
     assert output == 'Using the default tenant'
 
diff --git a/fdbcli/TenantCommands.actor.cpp b/fdbcli/TenantCommands.actor.cpp
index af46cb4965..cb93b6c210 100644
--- a/fdbcli/TenantCommands.actor.cpp
+++ b/fdbcli/TenantCommands.actor.cpp
@@ -35,20 +35,82 @@
 
 namespace fdb_cli {
 
-const KeyRangeRef tenantSpecialKeyRange(LiteralStringRef("\xff\xff/management/tenant/map/"),
-                                        LiteralStringRef("\xff\xff/management/tenant/map0"));
+const KeyRangeRef tenantMapSpecialKeyRange(LiteralStringRef("\xff\xff/management/tenant/map/"),
+                                           LiteralStringRef("\xff\xff/management/tenant/map0"));
+const KeyRangeRef tenantConfigSpecialKeyRange(LiteralStringRef("\xff\xff/management/tenant/configure/"),
+                                              LiteralStringRef("\xff\xff/management/tenant/configure0"));
+
+Optional<std::map<Standalone<StringRef>, Optional<Value>>>
+parseTenantConfiguration(std::vector<StringRef> const& tokens, int startIndex, bool allowUnset) {
+	std::map<Standalone<StringRef>, Optional<Value>> configParams;
+	for (int tokenNum = startIndex; tokenNum < tokens.size(); ++tokenNum) {
+		Optional<Value> value;
+
+		StringRef token = tokens[tokenNum];
+		StringRef param;
+		if (allowUnset && token == "unset"_sr) {
+			if (++tokenNum == tokens.size()) {
+				fmt::print(stderr, "ERROR: `unset' specified without a configuration parameter.\n");
+				return {};
+			}
+			param = tokens[tokenNum];
+		} else {
+			bool foundEquals;
+			param = token.eat("=", &foundEquals);
+			if (!foundEquals) {
+				fmt::print(stderr,
+				           "ERROR: invalid configuration string `{}'. String must specify a value using `='.\n",
+				           param.toString().c_str());
+				return {};
+			}
+			value = token;
+		}
+
+		if (tokencmp(param, "tenant_group")) {
+			configParams[param] = value;
+		} else {
+			fmt::print(stderr, "ERROR: unrecognized configuration parameter `{}'\n", param.toString().c_str());
+			return {};
+		}
+	}
+
+	return configParams;
+}
+
+Key makeConfigKey(TenantNameRef tenantName, StringRef configName) {
+	return tenantConfigSpecialKeyRange.begin.withSuffix(Tuple().append(tenantName).append(configName).pack());
+}
+
+void applyConfiguration(Reference<ITransaction> tr,
+                        TenantNameRef tenantName,
+                        std::map<Standalone<StringRef>, Optional<Value>> configuration) {
+	for (auto [configName, value] : configuration) {
+		if (value.present()) {
+			tr->set(makeConfigKey(tenantName, configName), value.get());
+		} else {
+			tr->clear(makeConfigKey(tenantName, configName));
+		}
+	}
+}
 
 // createtenant command
 ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
-	if (tokens.size() != 2) {
+	if (tokens.size() < 2 || tokens.size() > 3) {
 		printUsage(tokens[0]);
 		return false;
 	}
 
-	state Key tenantNameKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(tokens[1]);
+	state Key tenantNameKey = tenantMapSpecialKeyRange.begin.withSuffix(tokens[1]);
 	state Reference<ITransaction> tr = db->createTransaction();
 	state bool doneExistenceCheck = false;
 
+	state Optional<std::map<Standalone<StringRef>, Optional<Value>>> configuration =
+	    parseTenantConfiguration(tokens, 2, false);
+
+	if (!configuration.present()) {
+		return false;
+	}
+
 	loop {
 		tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
 		try {
@@ -63,12 +125,13 @@ ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector
 			}
 
 			tr->set(tenantNameKey, ValueRef());
+			applyConfiguration(tr, tokens[1], configuration.get());
 			wait(safeThreadFutureToFuture(tr->commit()));
 			break;
 		} catch (Error& e) {
 			state Error err(e);
 			if (e.code() == error_code_special_keys_api_failure) {
-				std::string errorMsgStr = wait(fdb_cli::getSpecialKeysFailureErrorMessage(tr));
+				std::string errorMsgStr = wait(getSpecialKeysFailureErrorMessage(tr));
 				fmt::print(stderr, "ERROR: {}\n", errorMsgStr.c_str());
 				return false;
 			}
@@ -81,7 +144,7 @@ ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector
 }
 
 CommandFactory createTenantFactory("createtenant",
-                                   CommandHelp("createtenant <TENANT_NAME>",
+                                   CommandHelp("createtenant <TENANT_NAME> [tenant_group=<TENANT_GROUP>]",
                                                "creates a new tenant in the cluster",
                                                "Creates a new tenant in the cluster with the specified name."));
 
@@ -92,7 +155,7 @@ ACTOR Future<bool> deleteTenantCommandActor(Reference<IDatabase> db, std::vector
 		return false;
 	}
 
-	state Key tenantNameKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(tokens[1]);
+	state Key tenantNameKey = tenantMapSpecialKeyRange.begin.withSuffix(tokens[1]);
 	state Reference<ITransaction> tr = db->createTransaction();
 	state bool doneExistenceCheck = false;
 
@@ -115,7 +178,7 @@ ACTOR Future<bool> deleteTenantCommandActor(Reference<IDatabase> db, std::vector
 		} catch (Error& e) {
 			state Error err(e);
 			if (e.code() == error_code_special_keys_api_failure) {
-				std::string errorMsgStr = wait(fdb_cli::getSpecialKeysFailureErrorMessage(tr));
+				std::string errorMsgStr = wait(getSpecialKeysFailureErrorMessage(tr));
 				fmt::print(stderr, "ERROR: {}\n", errorMsgStr.c_str());
 				return false;
 			}
@@ -157,14 +220,14 @@ ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<
 	}
 	if (tokens.size() == 4) {
 		int n = 0;
-		if (sscanf(tokens[3].toString().c_str(), "%d%n", &limit, &n) != 1 || n != tokens[3].size()) {
-			fmt::print(stderr, "ERROR: invalid limit {}\n", tokens[3].toString().c_str());
+		if (sscanf(tokens[3].toString().c_str(), "%d%n", &limit, &n) != 1 || n != tokens[3].size() || limit <= 0) {
+			fmt::print(stderr, "ERROR: invalid limit `{}'\n", tokens[3].toString().c_str());
 			return false;
 		}
 	}
 
-	state Key beginTenantKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(beginTenant);
-	state Key endTenantKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(endTenant);
+	state Key beginTenantKey = tenantMapSpecialKeyRange.begin.withSuffix(beginTenant);
+	state Key endTenantKey = tenantMapSpecialKeyRange.begin.withSuffix(endTenant);
 	state Reference<ITransaction> tr = db->createTransaction();
 
 	loop {
@@ -184,16 +247,15 @@ ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<
 
 			int index = 0;
 			for (auto tenant : tenants) {
-				fmt::print("  {}. {}\n",
-				           ++index,
-				           printable(tenant.key.removePrefix(fdb_cli::tenantSpecialKeyRange.begin)).c_str());
+				fmt::print(
+				    "  {}. {}\n", ++index, printable(tenant.key.removePrefix(tenantMapSpecialKeyRange.begin)).c_str());
 			}
 
 			return true;
 		} catch (Error& e) {
 			state Error err(e);
 			if (e.code() == error_code_special_keys_api_failure) {
-				std::string errorMsgStr = wait(fdb_cli::getSpecialKeysFailureErrorMessage(tr));
+				std::string errorMsgStr = wait(getSpecialKeysFailureErrorMessage(tr));
 				fmt::print(stderr, "ERROR: {}\n", errorMsgStr.c_str());
 				return false;
 			}
@@ -217,7 +279,7 @@ ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<St
 	}
 
 	state bool useJson = tokens.size() == 3;
-	state Key tenantNameKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(tokens[1]);
+	state Key tenantNameKey = tenantMapSpecialKeyRange.begin.withSuffix(tokens[1]);
 	state Reference<ITransaction> tr = db->createTransaction();
 
 	loop {
@@ -245,6 +307,7 @@ ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<St
 				int64_t id;
 				std::string prefix;
 				std::string tenantState;
+				std::string tenantGroup;
 
 				doc.get("id", id);
 
@@ -255,10 +318,14 @@ ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<St
 				}
 
 				doc.get("tenant_state", tenantState);
+				bool hasTenantGroup = doc.tryGet("tenant_group.printable", tenantGroup);
 
 				fmt::print("  id: {}\n", id);
 				fmt::print("  prefix: {}\n", printable(prefix).c_str());
 				fmt::print("  tenant state: {}\n", printable(tenantState).c_str());
+				if (hasTenantGroup) {
+					fmt::print("  tenant group: {}\n", tenantGroup.c_str());
+				}
 			}
 
 			return true;
@@ -299,6 +366,50 @@ CommandFactory getTenantFactory(
                 "prints the metadata for a tenant",
                 "Prints the metadata for a tenant. If JSON is specified, then the output will be in JSON format."));
 
+// configuretenant command
+ACTOR Future<bool> configureTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
+	if (tokens.size() < 3) {
+		printUsage(tokens[0]);
+		return false;
+	}
+
+	state Optional<std::map<Standalone<StringRef>, Optional<Value>>> configuration =
+	    parseTenantConfiguration(tokens, 2, true);
+
+	if (!configuration.present()) {
+		return false;
+	}
+
+	state Reference<ITransaction> tr = db->createTransaction();
+
+	loop {
+		tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
+		try {
+			applyConfiguration(tr, tokens[1], configuration.get());
+			wait(safeThreadFutureToFuture(tr->commit()));
+			break;
+		} catch (Error& e) {
+			state Error err(e);
+			if (e.code() == error_code_special_keys_api_failure) {
+				std::string errorMsgStr = wait(getSpecialKeysFailureErrorMessage(tr));
+				fmt::print(stderr, "ERROR: {}\n", errorMsgStr.c_str());
+				return false;
+			}
+			wait(safeThreadFutureToFuture(tr->onError(err)));
+		}
+	}
+
+	fmt::print("The configuration for tenant `{}' has been updated\n", printable(tokens[1]).c_str());
+	return true;
+}
+
+CommandFactory configureTenantFactory(
+    "configuretenant",
+    CommandHelp("configuretenant <TENANT_NAME> <[unset] tenant_group[=<GROUP_NAME>]> ...",
+                "updates the configuration for a tenant",
+                "Updates the configuration for a tenant. Use `tenant_group=<GROUP_NAME>' to change the tenant group "
+                "that a tenant is assigned to or `unset tenant_group' to remove a tenant from its tenant group."));
+
 // renametenant command
 ACTOR Future<bool> renameTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
 	if (tokens.size() != 3) {
@@ -316,6 +427,6 @@ CommandFactory renameTenantFactory(
     "renametenant",
     CommandHelp(
         "renametenant <OLD_NAME> <NEW_NAME>",
-        "renames a tenant in the cluster.",
+        "renames a tenant in the cluster",
         "Renames a tenant in the cluster. The old name must exist and the new name must not exist in the cluster."));
 } // namespace fdb_cli
diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp
index b762c48e0a..26f4a8469c 100644
--- a/fdbcli/fdbcli.actor.cpp
+++ b/fdbcli/fdbcli.actor.cpp
@@ -1941,6 +1941,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 					continue;
 				}
 
+				if (tokencmp(tokens[0], "configuretenant")) {
+					bool _result = wait(makeInterruptable(configureTenantCommandActor(db, tokens)));
+					if (!_result)
+						is_error = true;
+					continue;
+				}
+
 				if (tokencmp(tokens[0], "renametenant")) {
 					bool _result = wait(makeInterruptable(renameTenantCommandActor(db, tokens)));
 					if (!_result)
diff --git a/fdbcli/include/fdbcli/fdbcli.actor.h b/fdbcli/include/fdbcli/fdbcli.actor.h
index ba754279e8..3b497b17d5 100644
--- a/fdbcli/include/fdbcli/fdbcli.actor.h
+++ b/fdbcli/include/fdbcli/fdbcli.actor.h
@@ -157,6 +157,8 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
                                          std::vector<StringRef> tokens,
                                          LineNoise* linenoise,
                                          Future<Void> warn);
+// configuretenant command
+ACTOR Future<bool> configureTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
 // consistency command
 ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr,
                                                 std::vector<StringRef> tokens,
diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp
index 753a37ffa5..7f1dd70622 100644
--- a/fdbclient/SystemData.cpp
+++ b/fdbclient/SystemData.cpp
@@ -1624,11 +1624,6 @@ BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value) {
 	return interf;
 }
 
-const KeyRangeRef tenantMapKeys("\xff/tenant/map/"_sr, "\xff/tenant/map0"_sr);
-const KeyRef tenantMapPrefix = tenantMapKeys.begin;
-const KeyRef tenantMapPrivatePrefix = "\xff\xff/tenant/map/"_sr;
-const KeyRef tenantLastIdKey = "\xff/tenant/lastId"_sr;
-
 const KeyRangeRef storageQuotaKeys(LiteralStringRef("\xff/storageQuota/"), LiteralStringRef("\xff/storageQuota0"));
 const KeyRef storageQuotaPrefix = storageQuotaKeys.begin;
 
diff --git a/fdbclient/Tenant.cpp b/fdbclient/Tenant.cpp
index c984e8008f..781bc30717 100644
--- a/fdbclient/Tenant.cpp
+++ b/fdbclient/Tenant.cpp
@@ -74,6 +74,11 @@ TenantMapEntry::TenantMapEntry() {}
 TenantMapEntry::TenantMapEntry(int64_t id, TenantState tenantState) : tenantState(tenantState) {
 	setId(id);
 }
+TenantMapEntry::TenantMapEntry(int64_t id, TenantState tenantState, Optional<TenantGroupName> tenantGroup)
+  : tenantState(tenantState), tenantGroup(tenantGroup) {
+	setId(id);
+}
+
 void TenantMapEntry::setId(int64_t id) {
 	ASSERT(id >= 0);
 	this->id = id;
@@ -100,9 +105,33 @@ std::string TenantMapEntry::toJson(int apiVersion) const {
 
 	tenantEntry["tenant_state"] = TenantMapEntry::tenantStateToString(tenantState);
 
+	if (tenantGroup.present()) {
+		json_spirit::mObject tenantGroupObject;
+		std::string encodedTenantGroup = base64::encoder::from_string(tenantGroup.get().toString());
+		// Remove trailing newline
+		encodedTenantGroup.resize(encodedTenantGroup.size() - 1);
+
+		tenantGroupObject["base64"] = encodedTenantGroup;
+		tenantGroupObject["printable"] = printable(tenantGroup.get());
+		tenantEntry["tenant_group"] = tenantGroupObject;
+	}
+
 	return json_spirit::write_string(json_spirit::mValue(tenantEntry));
 }
 
+bool TenantMapEntry::matchesConfiguration(TenantMapEntry const& other) const {
+	return tenantGroup == other.tenantGroup;
+}
+
+void TenantMapEntry::configure(Standalone<StringRef> parameter, Optional<Value> value) {
+	if (parameter == "tenant_group"_sr) {
+		tenantGroup = value;
+	} else {
+		TraceEvent(SevWarnAlways, "UnknownTenantConfigurationParameter").detail("Parameter", parameter);
+		throw invalid_tenant_configuration();
+	}
+}
+
 TEST_CASE("/fdbclient/TenantMapEntry/Serialization") {
 	TenantMapEntry entry1(1, TenantState::READY);
 	ASSERT(entry1.prefix == "\x00\x00\x00\x00\x00\x00\x00\x01"_sr);
diff --git a/fdbclient/TenantSpecialKeys.cpp b/fdbclient/TenantSpecialKeys.cpp
index 05d51b7e93..5570816684 100644
--- a/fdbclient/TenantSpecialKeys.cpp
+++ b/fdbclient/TenantSpecialKeys.cpp
@@ -31,3 +31,13 @@ const KeyRangeRef TenantRangeImpl<false>::submoduleRange = KeyRangeRef(""_sr, "\
 
 template <>
 const KeyRangeRef TenantRangeImpl<false>::mapSubRange = KeyRangeRef("tenant_map/"_sr, "tenant_map0"_sr);
+
+template <>
+bool TenantRangeImpl<true>::subRangeIntersects(KeyRangeRef subRange, KeyRangeRef range) {
+	return subRange.intersects(range);
+}
+
+template <>
+bool TenantRangeImpl<false>::subRangeIntersects(KeyRangeRef subRange, KeyRangeRef range) {
+	return subRange == mapSubRange;
+}
\ No newline at end of file
diff --git a/fdbclient/include/fdbclient/Tenant.h b/fdbclient/include/fdbclient/Tenant.h
index 3c4a2eddaf..133c1496e7 100644
--- a/fdbclient/include/fdbclient/Tenant.h
+++ b/fdbclient/include/fdbclient/Tenant.h
@@ -29,6 +29,8 @@
 
 typedef StringRef TenantNameRef;
 typedef Standalone<TenantNameRef> TenantName;
+typedef StringRef TenantGroupNameRef;
+typedef Standalone<TenantGroupNameRef> TenantGroupName;
 
 enum class TenantState { REGISTERING, READY, REMOVING, UPDATING_CONFIGURATION, ERROR };
 
@@ -44,16 +46,21 @@ struct TenantMapEntry {
 	int64_t id = -1;
 	Key prefix;
 	TenantState tenantState = TenantState::READY;
+	Optional<TenantGroupName> tenantGroup;
 
 	constexpr static int PREFIX_SIZE = sizeof(id);
 
 public:
 	TenantMapEntry();
 	TenantMapEntry(int64_t id, TenantState tenantState);
+	TenantMapEntry(int64_t id, TenantState tenantState, Optional<TenantGroupName> tenantGroup);
 
 	void setId(int64_t id);
 	std::string toJson(int apiVersion) const;
 
+	bool matchesConfiguration(TenantMapEntry const& other) const;
+	void configure(Standalone<StringRef> parameter, Optional<Value> value);
+
 	Value encode() const { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withTenants())); }
 
 	static TenantMapEntry decode(ValueRef const& value) {
@@ -65,7 +72,7 @@ public:
 
 	template <class Ar>
 	void serialize(Ar& ar) {
-		serializer(ar, id, tenantState);
+		serializer(ar, id, tenantState, tenantGroup);
 		if constexpr (Ar::isDeserializing) {
 			if (id >= 0) {
 				prefix = idToPrefix(id);
@@ -75,15 +82,39 @@ public:
 	}
 };
 
+struct TenantGroupEntry {
+	constexpr static FileIdentifier file_identifier = 10764222;
+
+	TenantGroupEntry() = default;
+
+	Value encode() { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withTenants())); }
+	static TenantGroupEntry decode(ValueRef const& value) {
+		TenantGroupEntry entry;
+		ObjectReader reader(value.begin(), IncludeVersion());
+		reader.deserialize(entry);
+		return entry;
+	}
+
+	template <class Ar>
+	void serialize(Ar& ar) {
+		serializer(ar);
+	}
+};
+
 struct TenantMetadataSpecification {
 	static KeyRef subspace;
 
 	KeyBackedObjectMap<TenantName, TenantMapEntry, decltype(IncludeVersion()), NullCodec> tenantMap;
 	KeyBackedProperty<int64_t> lastTenantId;
+	KeyBackedSet<Tuple> tenantGroupTenantIndex;
+	KeyBackedObjectMap<TenantGroupName, TenantGroupEntry, decltype(IncludeVersion()), NullCodec> tenantGroupMap;
 
 	TenantMetadataSpecification(KeyRef subspace)
 	  : tenantMap(subspace.withSuffix("tenant/map/"_sr), IncludeVersion(ProtocolVersion::withTenants())),
-	    lastTenantId(subspace.withSuffix("tenant/lastId"_sr)) {}
+	    lastTenantId(subspace.withSuffix("tenant/lastId"_sr)),
+	    tenantGroupTenantIndex(subspace.withSuffix("tenant/tenantGroup/tenantIndex/"_sr)),
+	    tenantGroupMap(subspace.withSuffix("tenant/tenantGroup/map/"_sr),
+	                   IncludeVersion(ProtocolVersion::withTenants())) {}
 };
 
 struct TenantMetadata {
@@ -93,6 +124,8 @@ private:
 public:
 	static inline auto& tenantMap = instance.tenantMap;
 	static inline auto& lastTenantId = instance.lastTenantId;
+	static inline auto& tenantGroupTenantIndex = instance.tenantGroupTenantIndex;
+	static inline auto& tenantGroupMap = instance.tenantGroupMap;
 
 	static inline Key tenantMapPrivatePrefix = "\xff"_sr.withSuffix(tenantMap.subspace.begin);
 };
diff --git a/fdbclient/include/fdbclient/TenantManagement.actor.h b/fdbclient/include/fdbclient/TenantManagement.actor.h
index 70cac81801..d27844c49f 100644
--- a/fdbclient/include/fdbclient/TenantManagement.actor.h
+++ b/fdbclient/include/fdbclient/TenantManagement.actor.h
@@ -107,6 +107,10 @@ Future<std::pair<Optional<TenantMapEntry>, bool>> createTenantTransaction(Transa
 
 	state Future<Optional<TenantMapEntry>> existingEntryFuture = tryGetTenantTransaction(tr, name);
 	wait(checkTenantMode(tr));
+	state Future<Optional<TenantGroupEntry>> existingTenantGroupEntryFuture;
+	if (tenantEntry.tenantGroup.present()) {
+		existingTenantGroupEntryFuture = TenantMetadata::tenantGroupMap.get(tr, tenantEntry.tenantGroup.get());
+	}
 
 	Optional<TenantMapEntry> existingEntry = wait(existingEntryFuture);
 	if (existingEntry.present()) {
@@ -123,6 +127,15 @@ Future<std::pair<Optional<TenantMapEntry>, bool>> createTenantTransaction(Transa
 
 	tenantEntry.tenantState = TenantState::READY;
 	TenantMetadata::tenantMap.set(tr, name, tenantEntry);
+	if (tenantEntry.tenantGroup.present()) {
+		TenantMetadata::tenantGroupTenantIndex.insert(tr, Tuple::makeTuple(tenantEntry.tenantGroup.get(), name));
+
+		// Create the tenant group associated with this tenant if it doesn't already exist
+		Optional<TenantGroupEntry> existingTenantGroup = wait(existingTenantGroupEntryFuture);
+		if (!existingTenantGroup.present()) {
+			TenantMetadata::tenantGroupMap.set(tr, tenantEntry.tenantGroup.get(), TenantGroupEntry());
+		}
+	}
 
 	return std::make_pair(tenantEntry, true);
 }
@@ -182,6 +195,7 @@ Future<Optional<TenantMapEntry>> createTenant(Reference<DB> db,
 				    .detail("Tenant", name)
 				    .detail("TenantId", newTenant.first.get().id)
 				    .detail("Prefix", newTenant.first.get().prefix)
+				    .detail("TenantGroup", tenantEntry.tenantGroup)
 				    .detail("Version", tr->getCommittedVersion());
 			}
 
@@ -212,6 +226,19 @@ Future<Void> deleteTenantTransaction(Transaction tr,
 		}
 
 		TenantMetadata::tenantMap.erase(tr, name);
+		if (tenantEntry.get().tenantGroup.present()) {
+			TenantMetadata::tenantGroupTenantIndex.erase(tr,
+			                                             Tuple::makeTuple(tenantEntry.get().tenantGroup.get(), name));
+			KeyBackedSet<Tuple>::RangeResultType tenantsInGroup = wait(TenantMetadata::tenantGroupTenantIndex.getRange(
+			    tr,
+			    Tuple::makeTuple(tenantEntry.get().tenantGroup.get()),
+			    Tuple::makeTuple(keyAfter(tenantEntry.get().tenantGroup.get())),
+			    2));
+			if (tenantsInGroup.results.empty() ||
+			    (tenantsInGroup.results.size() == 1 && tenantsInGroup.results[0].getString(1) == name)) {
+				TenantMetadata::tenantGroupMap.erase(tr, tenantEntry.get().tenantGroup.get());
+			}
+		}
 	}
 
 	return Void();
@@ -247,6 +274,56 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name, Optional<int64_t> t
 	}
 }
 
+// This should only be called from a transaction that has already confirmed that the tenant entry
+// is present. The tenantEntry should start with the existing entry and modify only those fields that need
+// to be changed. This must only be called on a non-management cluster.
+ACTOR template <class Transaction>
+Future<Void> configureTenantTransaction(Transaction tr,
+                                        TenantNameRef tenantName,
+                                        TenantMapEntry originalEntry,
+                                        TenantMapEntry updatedTenantEntry) {
+	tr->setOption(FDBTransactionOptions::RAW_ACCESS);
+	TenantMetadata::tenantMap.set(tr, tenantName, updatedTenantEntry);
+
+	// If the tenant group was changed, we need to update the tenant group metadata structures
+	if (originalEntry.tenantGroup != updatedTenantEntry.tenantGroup) {
+		if (updatedTenantEntry.tenantGroup.present() && updatedTenantEntry.tenantGroup.get().startsWith("\xff"_sr)) {
+			throw invalid_tenant_group_name();
+		}
+		if (originalEntry.tenantGroup.present()) {
+			// Remove this tenant from the original tenant group index
+			TenantMetadata::tenantGroupTenantIndex.erase(tr,
+			                                             Tuple::makeTuple(originalEntry.tenantGroup.get(), tenantName));
+
+			// Check if the original tenant group is now empty. If so, remove the tenant group.
+			KeyBackedSet<Tuple>::RangeResultType tenants = wait(TenantMetadata::tenantGroupTenantIndex.getRange(
+			    tr,
+			    Tuple::makeTuple(originalEntry.tenantGroup.get()),
+			    Tuple::makeTuple(keyAfter(originalEntry.tenantGroup.get())),
+			    2));
+
+			if (tenants.results.empty() ||
+			    (tenants.results.size() == 1 && tenants.results[0].getString(1) == tenantName)) {
+				TenantMetadata::tenantGroupMap.erase(tr, originalEntry.tenantGroup.get());
+			}
+		}
+		if (updatedTenantEntry.tenantGroup.present()) {
+			// If this is creating a new tenant group, add it to the tenant group map
+			Optional<TenantGroupEntry> entry =
+			    wait(TenantMetadata::tenantGroupMap.get(tr, updatedTenantEntry.tenantGroup.get()));
+			if (!entry.present()) {
+				TenantMetadata::tenantGroupMap.set(tr, updatedTenantEntry.tenantGroup.get(), TenantGroupEntry());
+			}
+
+			// Insert this tenant in the tenant group index
+			TenantMetadata::tenantGroupTenantIndex.insert(
+			    tr, Tuple::makeTuple(updatedTenantEntry.tenantGroup.get(), tenantName));
+		}
+	}
+
+	return Void();
+}
+
 ACTOR template <class Transaction>
 Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listTenantsTransaction(Transaction tr,
                                                                                   TenantNameRef begin,
@@ -330,6 +407,14 @@ Future<Void> renameTenant(Reference<DB> db, TenantName oldName, TenantName newNa
 			TenantMetadata::tenantMap.erase(tr, oldName);
 			TenantMetadata::tenantMap.set(tr, newName, oldEntry.get());
 
+			// Update the tenant group index to reflect the new tenant name
+			if (oldEntry.get().tenantGroup.present()) {
+				TenantMetadata::tenantGroupTenantIndex.erase(
+				    tr, Tuple::makeTuple(oldEntry.get().tenantGroup.get(), oldName));
+				TenantMetadata::tenantGroupTenantIndex.insert(
+				    tr, Tuple::makeTuple(oldEntry.get().tenantGroup.get(), newName));
+			}
+
 			wait(safeThreadFutureToFuture(tr->commit()));
 			TraceEvent("RenameTenantSuccess").detail("OldName", oldName).detail("NewName", newName);
 			return Void();
diff --git a/fdbclient/include/fdbclient/TenantSpecialKeys.actor.h b/fdbclient/include/fdbclient/TenantSpecialKeys.actor.h
index e5d1f92941..0c58f2defd 100644
--- a/fdbclient/include/fdbclient/TenantSpecialKeys.actor.h
+++ b/fdbclient/include/fdbclient/TenantSpecialKeys.actor.h
@@ -31,14 +31,17 @@
 #include "fdbclient/DatabaseContext.h"
 #include "fdbclient/SpecialKeySpace.actor.h"
 #include "fdbclient/TenantManagement.actor.h"
+#include "fdbclient/Tuple.h"
 #include "fdbclient/libb64/encode.h"
 #include "flow/Arena.h"
 #include "flow/UnitTest.h"
 #include "flow/actorcompiler.h" // This must be the last #include.
 
-template <bool HasSubRanges = true>
+template <bool HasSubRanges>
 class TenantRangeImpl : public SpecialKeyRangeRWImpl {
 private:
+	static bool subRangeIntersects(KeyRangeRef subRange, KeyRangeRef range);
+
 	static KeyRangeRef removePrefix(KeyRangeRef range, KeyRef prefix, KeyRef defaultEnd) {
 		KeyRef begin = range.begin.removePrefix(prefix);
 		KeyRef end;
@@ -53,15 +56,14 @@ private:
 
 	static KeyRef withTenantMapPrefix(KeyRef key, Arena& ar) {
 		int keySize = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() +
-		              TenantRangeImpl::submoduleRange.begin.size() + TenantRangeImpl::mapSubRange.begin.size() +
-		              key.size();
+		              submoduleRange.begin.size() + mapSubRange.begin.size() + key.size();
 
 		KeyRef prefixedKey = makeString(keySize, ar);
 		uint8_t* mutableKey = mutateString(prefixedKey);
 
 		mutableKey = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.copyTo(mutableKey);
-		mutableKey = TenantRangeImpl::submoduleRange.begin.copyTo(mutableKey);
-		mutableKey = TenantRangeImpl::mapSubRange.begin.copyTo(mutableKey);
+		mutableKey = submoduleRange.begin.copyTo(mutableKey);
+		mutableKey = mapSubRange.begin.copyTo(mutableKey);
 
 		key.copyTo(mutableKey);
 		return prefixedKey;
@@ -84,20 +86,21 @@ private:
 		return Void();
 	}
 
-	ACTOR static Future<RangeResult> getTenantRange(ReadYourWritesTransaction* ryw,
-	                                                KeyRangeRef kr,
-	                                                GetRangeLimits limitsHint) {
+	ACTOR template <bool B>
+	static Future<RangeResult> getTenantRange(ReadYourWritesTransaction* ryw,
+	                                          KeyRangeRef kr,
+	                                          GetRangeLimits limitsHint) {
 		state RangeResult results;
 
 		kr = kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
-		         .removePrefix(TenantRangeImpl::submoduleRange.begin);
+		         .removePrefix(TenantRangeImpl<B>::submoduleRange.begin);
 
-		if (kr.intersects(TenantRangeImpl::mapSubRange)) {
+		if (kr.intersects(TenantRangeImpl<B>::mapSubRange)) {
 			GetRangeLimits limits = limitsHint;
 			limits.decrement(results);
 			wait(getTenantList(
 			    ryw,
-			    removePrefix(kr & TenantRangeImpl::mapSubRange, TenantRangeImpl::mapSubRange.begin, "\xff"_sr),
+			    removePrefix(kr & TenantRangeImpl<B>::mapSubRange, TenantRangeImpl<B>::mapSubRange.begin, "\xff"_sr),
 			    &results,
 			    limits));
 		}
@@ -105,15 +108,63 @@ private:
 		return results;
 	}
 
-	ACTOR static Future<Void> createTenants(ReadYourWritesTransaction* ryw, std::vector<TenantNameRef> tenants) {
+	static void applyTenantConfig(ReadYourWritesTransaction* ryw,
+	                              TenantNameRef tenantName,
+	                              std::vector<std::pair<Standalone<StringRef>, Optional<Value>>> configEntries,
+	                              TenantMapEntry* tenantEntry) {
+
+		std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>::iterator configItr;
+		for (configItr = configEntries.begin(); configItr != configEntries.end(); ++configItr) {
+			if (configItr->first == "tenant_group"_sr) {
+				tenantEntry->tenantGroup = configItr->second;
+			} else {
+				TraceEvent(SevWarn, "InvalidTenantConfig")
+				    .detail("TenantName", tenantName)
+				    .detail("ConfigName", configItr->first);
+				ryw->setSpecialKeySpaceErrorMsg(
+				    ManagementAPIError::toJsonString(false,
+				                                     "set tenant configuration",
+				                                     format("invalid tenant configuration option `%s' for tenant `%s'",
+				                                            configItr->first.toString().c_str(),
+				                                            tenantName.toString().c_str())));
+				throw special_keys_api_failure();
+			}
+		}
+	}
+
+	ACTOR static Future<Void> createTenant(
+	    ReadYourWritesTransaction* ryw,
+	    TenantNameRef tenantName,
+	    std::vector<std::pair<Standalone<StringRef>, Optional<Value>>> configMutations,
+	    int64_t tenantId,
+	    std::map<TenantGroupName, int>* tenantGroupNetTenantDelta) {
+		state TenantMapEntry tenantEntry;
+		tenantEntry.setId(tenantId);
+
+		for (auto const& [name, value] : configMutations) {
+			tenantEntry.configure(name, value);
+		}
+
+		if (tenantEntry.tenantGroup.present()) {
+			(*tenantGroupNetTenantDelta)[tenantEntry.tenantGroup.get()]++;
+		}
+
+		std::pair<Optional<TenantMapEntry>, bool> entry =
+		    wait(TenantAPI::createTenantTransaction(&ryw->getTransaction(), tenantName, tenantEntry));
+
+		return Void();
+	}
+
+	ACTOR static Future<Void> createTenants(
+	    ReadYourWritesTransaction* ryw,
+	    std::map<TenantName, std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>> tenants,
+	    std::map<TenantGroupName, int>* tenantGroupNetTenantDelta) {
 		int64_t _nextId = wait(TenantAPI::getNextTenantId(&ryw->getTransaction()));
 		int64_t nextId = _nextId;
 
 		std::vector<Future<Void>> createFutures;
-		for (auto tenant : tenants) {
-			state TenantMapEntry tenantEntry(nextId++, TenantState::READY);
-			createFutures.push_back(
-			    success(TenantAPI::createTenantTransaction(&ryw->getTransaction(), tenant, tenantEntry)));
+		for (auto const& [tenant, config] : tenants) {
+			createFutures.push_back(createTenant(ryw, tenant, config, nextId++, tenantGroupNetTenantDelta));
 		}
 
 		TenantMetadata::lastTenantId.set(&ryw->getTransaction(), nextId - 1);
@@ -121,9 +172,49 @@ private:
 		return Void();
 	}
 
+	ACTOR static Future<Void> changeTenantConfig(
+	    ReadYourWritesTransaction* ryw,
+	    TenantName tenantName,
+	    std::vector<std::pair<Standalone<StringRef>, Optional<Value>>> configEntries,
+	    std::map<TenantGroupName, int>* tenantGroupNetTenantDelta) {
+		TenantMapEntry originalEntry = wait(TenantAPI::getTenantTransaction(&ryw->getTransaction(), tenantName));
+		TenantMapEntry updatedEntry = originalEntry;
+		for (auto const& [name, value] : configEntries) {
+			updatedEntry.configure(name, value);
+		}
+
+		if (originalEntry.tenantGroup != updatedEntry.tenantGroup) {
+			if (originalEntry.tenantGroup.present()) {
+				(*tenantGroupNetTenantDelta)[originalEntry.tenantGroup.get()]--;
+			}
+			if (updatedEntry.tenantGroup.present()) {
+				(*tenantGroupNetTenantDelta)[updatedEntry.tenantGroup.get()]++;
+			}
+		}
+
+		wait(TenantAPI::configureTenantTransaction(&ryw->getTransaction(), tenantName, originalEntry, updatedEntry));
+		return Void();
+	}
+
+	ACTOR static Future<Void> deleteSingleTenant(ReadYourWritesTransaction* ryw,
+	                                             TenantName tenantName,
+	                                             std::map<TenantGroupName, int>* tenantGroupNetTenantDelta) {
+		state Optional<TenantMapEntry> tenantEntry =
+		    wait(TenantAPI::tryGetTenantTransaction(&ryw->getTransaction(), tenantName));
+		if (tenantEntry.present()) {
+			wait(TenantAPI::deleteTenantTransaction(&ryw->getTransaction(), tenantName));
+			if (tenantEntry.get().tenantGroup.present()) {
+				(*tenantGroupNetTenantDelta)[tenantEntry.get().tenantGroup.get()]--;
+			}
+		}
+
+		return Void();
+	}
+
 	ACTOR static Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw,
 	                                            TenantName beginTenant,
-	                                            TenantName endTenant) {
+	                                            TenantName endTenant,
+	                                            std::map<TenantGroupName, int>* tenantGroupNetTenantDelta) {
 		state std::vector<std::pair<TenantName, TenantMapEntry>> tenants = wait(
 		    TenantAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->TOO_MANY));
 
@@ -139,69 +230,154 @@ private:
 		std::vector<Future<Void>> deleteFutures;
 		for (auto tenant : tenants) {
 			deleteFutures.push_back(TenantAPI::deleteTenantTransaction(&ryw->getTransaction(), tenant.first));
+			if (tenant.second.tenantGroup.present()) {
+				(*tenantGroupNetTenantDelta)[tenant.second.tenantGroup.get()]--;
+			}
 		}
+
 		wait(waitForAll(deleteFutures));
+		return Void();
+	}
+
+	// Check if the number of tenants in the tenant group is equal to the net reduction in the number of tenants.
+	// If it is, then we can delete the tenant group.
+	ACTOR static Future<Void> checkAndRemoveTenantGroup(ReadYourWritesTransaction* ryw,
+	                                                    TenantGroupName tenantGroup,
+	                                                    int tenantDelta) {
+		ASSERT(tenantDelta < 0);
+		state int removedTenants = -tenantDelta;
+		KeyBackedSet<Tuple>::RangeResultType tenantsInGroup =
+		    wait(TenantMetadata::tenantGroupTenantIndex.getRange(&ryw->getTransaction(),
+		                                                         Tuple::makeTuple(tenantGroup),
+		                                                         Tuple::makeTuple(keyAfter(tenantGroup)),
+		                                                         removedTenants + 1));
+
+		ASSERT(tenantsInGroup.results.size() >= removedTenants);
+		if (tenantsInGroup.results.size() == removedTenants) {
+			TenantMetadata::tenantGroupMap.erase(&ryw->getTransaction(), tenantGroup);
+		}
 
 		return Void();
 	}
 
 public:
+	// These ranges vary based on the template parameter
 	const static KeyRangeRef submoduleRange;
 	const static KeyRangeRef mapSubRange;
 
+	// These sub-ranges should only be used if HasSubRanges=true
+	const inline static KeyRangeRef configureSubRange = KeyRangeRef("configure/"_sr, "configure0"_sr);
+
 	explicit TenantRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
 
 	Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
 	                             KeyRangeRef kr,
 	                             GetRangeLimits limitsHint) const override {
-		return getTenantRange(ryw, kr, limitsHint);
+		return getTenantRange<HasSubRanges>(ryw, kr, limitsHint);
 	}
 
-	Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override {
-		auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
-		std::vector<Future<Void>> tenantManagementFutures;
+	ACTOR static Future<Optional<std::string>> commitImpl(TenantRangeImpl* self, ReadYourWritesTransaction* ryw) {
+		state std::vector<Future<Void>> tenantManagementFutures;
 
-		std::vector<std::pair<KeyRangeRef, Optional<Value>>> mapMutations;
+		// This map is an ugly workaround to the fact that we cannot use RYW in these transactions.
+		// It tracks the net change to the number of tenants in a tenant group, and at the end we can compare
+		// that with how many tenants the tenant group started with. If we removed all of the tenants, then we
+		// delete the tenant group.
+		//
+		// SOMEDAY: enable RYW support in special keys and remove this complexity.
+		state std::map<TenantGroupName, int> tenantGroupNetTenantDelta;
+
+		state KeyRangeMap<std::pair<bool, Optional<Value>>>::Ranges ranges =
+		    ryw->getSpecialKeySpaceWriteMap().containedRanges(self->range);
+
+		state std::vector<std::pair<KeyRangeRef, Optional<Value>>> mapMutations;
+		state std::map<TenantName, std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>> configMutations;
+
+		tenantManagementFutures.push_back(TenantAPI::checkTenantMode(&ryw->getTransaction()));
 
 		for (auto range : ranges) {
 			if (!range.value().first) {
 				continue;
 			}
 
-			KeyRangeRef adjustedRange =
+			state KeyRangeRef adjustedRange =
 			    range.range()
 			        .removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
 			        .removePrefix(submoduleRange.begin);
 
-			if (mapSubRange.intersects(adjustedRange)) {
+			if (subRangeIntersects(mapSubRange, adjustedRange)) {
 				adjustedRange = mapSubRange & adjustedRange;
 				adjustedRange = removePrefix(adjustedRange, mapSubRange.begin, "\xff"_sr);
 				mapMutations.push_back(std::make_pair(adjustedRange, range.value().second));
+			} else if (subRangeIntersects(configureSubRange, adjustedRange) && adjustedRange.singleKeyRange()) {
+				StringRef configTupleStr = adjustedRange.begin.removePrefix(configureSubRange.begin);
+				try {
+					Tuple tuple = Tuple::unpack(configTupleStr);
+					if (tuple.size() != 2) {
+						throw invalid_tuple_index();
+					}
+					configMutations[tuple.getString(0)].push_back(
+					    std::make_pair(tuple.getString(1), range.value().second));
+				} catch (Error& e) {
+					TraceEvent(SevWarn, "InvalidTenantConfigurationKey").error(e).detail("Key", adjustedRange.begin);
+					ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
+					    false, "configure tenant", "invalid tenant configuration key"));
+					throw special_keys_api_failure();
+				}
 			}
 		}
 
-		std::vector<TenantNameRef> tenantsToCreate;
+		std::map<TenantName, std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>> tenantsToCreate;
 		for (auto mapMutation : mapMutations) {
 			TenantNameRef tenantName = mapMutation.first.begin;
 			if (mapMutation.second.present()) {
-				tenantsToCreate.push_back(tenantName);
+				std::vector<std::pair<Standalone<StringRef>, Optional<Value>>> createMutations;
+				auto itr = configMutations.find(tenantName);
+				if (itr != configMutations.end()) {
+					createMutations = itr->second;
+					configMutations.erase(itr);
+				}
+				tenantsToCreate[tenantName] = createMutations;
 			} else {
 				// For a single key clear, just issue the delete
 				if (mapMutation.first.singleKeyRange()) {
-					tenantManagementFutures.push_back(
-					    TenantAPI::deleteTenantTransaction(&ryw->getTransaction(), tenantName));
+					tenantManagementFutures.push_back(deleteSingleTenant(ryw, tenantName, &tenantGroupNetTenantDelta));
+
+					// Configuration changes made to a deleted tenant are discarded
+					configMutations.erase(tenantName);
 				} else {
-					tenantManagementFutures.push_back(deleteTenantRange(ryw, tenantName, mapMutation.first.end));
+					tenantManagementFutures.push_back(
+					    deleteTenantRange(ryw, tenantName, mapMutation.first.end, &tenantGroupNetTenantDelta));
+
+					// Configuration changes made to a deleted tenant are discarded
+					configMutations.erase(configMutations.lower_bound(tenantName),
+					                      configMutations.lower_bound(mapMutation.first.end));
 				}
 			}
 		}
 
 		if (!tenantsToCreate.empty()) {
-			tenantManagementFutures.push_back(createTenants(ryw, tenantsToCreate));
+			tenantManagementFutures.push_back(createTenants(ryw, tenantsToCreate, &tenantGroupNetTenantDelta));
+		}
+		for (auto configMutation : configMutations) {
+			tenantManagementFutures.push_back(
+			    changeTenantConfig(ryw, configMutation.first, configMutation.second, &tenantGroupNetTenantDelta));
 		}
 
-		return tag(waitForAll(tenantManagementFutures), Optional<std::string>());
+		wait(waitForAll(tenantManagementFutures));
+
+		state std::vector<Future<Void>> tenantGroupUpdateFutures;
+		for (auto [tenantGroup, count] : tenantGroupNetTenantDelta) {
+			if (count < 0) {
+				tenantGroupUpdateFutures.push_back(checkAndRemoveTenantGroup(ryw, tenantGroup, count));
+			}
+		}
+
+		wait(waitForAll(tenantGroupUpdateFutures));
+		return Optional<std::string>();
 	}
+
+	Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override { return commitImpl(this, ryw); }
 };
 
 #include "flow/unactorcompiler.h"
diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp
index 869bdef545..0ba08722f7 100644
--- a/fdbserver/tester.actor.cpp
+++ b/fdbserver/tester.actor.cpp
@@ -1633,8 +1633,12 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
 	if (useDB) {
 		std::vector<Future<Void>> tenantFutures;
 		for (auto tenant : tenantsToCreate) {
-			TraceEvent("CreatingTenant").detail("Tenant", tenant);
-			tenantFutures.push_back(success(TenantAPI::createTenant(cx.getReference(), tenant)));
+			TenantMapEntry entry;
+			if (deterministicRandom()->coinflip()) {
+				entry.tenantGroup = "TestTenantGroup"_sr;
+			}
+			TraceEvent("CreatingTenant").detail("Tenant", tenant).detail("TenantGroup", entry.tenantGroup);
+			tenantFutures.push_back(success(TenantAPI::createTenant(cx.getReference(), tenant, entry)));
 		}
 
 		wait(waitForAll(tenantFutures));
diff --git a/fdbserver/workloads/FuzzApiCorrectness.actor.cpp b/fdbserver/workloads/FuzzApiCorrectness.actor.cpp
index 390ff6f295..27000783b7 100644
--- a/fdbserver/workloads/FuzzApiCorrectness.actor.cpp
+++ b/fdbserver/workloads/FuzzApiCorrectness.actor.cpp
@@ -131,6 +131,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
 	std::vector<Reference<ITenant>> tenants;
 	std::set<TenantName> createdTenants;
 	int numTenants;
+	int numTenantGroups;
 
 	// Map from tenant number to key prefix
 	std::map<int, std::string> keyPrefixes;
@@ -154,6 +155,9 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
 		int maxTenants = getOption(options, "numTenants"_sr, 4);
 		numTenants = deterministicRandom()->randomInt(0, maxTenants + 1);
 
+		int maxTenantGroups = getOption(options, "numTenantGroups"_sr, numTenants);
+		numTenantGroups = deterministicRandom()->randomInt(0, maxTenantGroups + 1);
+
 		// See https://github.com/apple/foundationdb/issues/2424
 		if (BUGGIFY) {
 			enableBuggify(true, BuggifyType::Client);
@@ -206,6 +210,14 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
 	std::string description() const override { return "FuzzApiCorrectness"; }
 
 	static TenantName getTenant(int num) { return TenantNameRef(format("tenant_%d", num)); }
+	Optional<TenantGroupName> getTenantGroup(int num) {
+		int groupNum = num % (numTenantGroups + 1);
+		if (groupNum == numTenantGroups - 1) {
+			return Optional<TenantGroupName>();
+		} else {
+			return TenantGroupNameRef(format("tenantgroup_%d", groupNum));
+		}
+	}
 	bool canUseTenant(Optional<TenantName> tenant) { return !tenant.present() || createdTenants.count(tenant.get()); }
 
 	Future<Void> setup(Database const& cx) override {
@@ -226,7 +238,9 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
 
 			// The last tenant will not be created
 			if (i < self->numTenants) {
-				tenantFutures.push_back(::success(TenantAPI::createTenant(cx.getReference(), tenantName)));
+				TenantMapEntry entry;
+				entry.tenantGroup = self->getTenantGroup(i);
+				tenantFutures.push_back(::success(TenantAPI::createTenant(cx.getReference(), tenantName, entry)));
 				self->createdTenants.insert(tenantName);
 			}
 		}
diff --git a/fdbserver/workloads/TenantManagementWorkload.actor.cpp b/fdbserver/workloads/TenantManagementWorkload.actor.cpp
index ae4ea1fbf7..17f25d68ef 100644
--- a/fdbserver/workloads/TenantManagementWorkload.actor.cpp
+++ b/fdbserver/workloads/TenantManagementWorkload.actor.cpp
@@ -21,6 +21,7 @@
 #include <cstdint>
 #include <limits>
 #include "fdbclient/FDBOptions.g.h"
+#include "fdbclient/RunTransaction.actor.h"
 #include "fdbclient/TenantManagement.actor.h"
 #include "fdbclient/TenantSpecialKeys.actor.h"
 #include "fdbclient/libb64/decode.h"
@@ -35,25 +36,37 @@
 struct TenantManagementWorkload : TestWorkload {
 	struct TenantData {
 		int64_t id;
+		Optional<TenantGroupName> tenantGroup;
 		bool empty;
 
 		TenantData() : id(-1), empty(true) {}
-		TenantData(int64_t id, bool empty) : id(id), empty(empty) {}
+		TenantData(int64_t id, Optional<TenantGroupName> tenantGroup, bool empty)
+		  : id(id), tenantGroup(tenantGroup), empty(empty) {}
+	};
+
+	struct TenantGroupData {
+		int64_t tenantCount = 0;
 	};
 
 	std::map<TenantName, TenantData> createdTenants;
+	std::map<TenantGroupName, TenantGroupData> createdTenantGroups;
 	int64_t maxId = -1;
 
 	const Key keyName = "key"_sr;
 	const Value noTenantValue = "no_tenant"_sr;
 	const TenantName tenantNamePrefix = "tenant_management_workload_"_sr;
 	TenantName localTenantNamePrefix;
+	TenantName localTenantGroupNamePrefix;
 
 	const Key specialKeysTenantMapPrefix = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT)
 	                                           .begin.withSuffix(TenantRangeImpl<true>::submoduleRange.begin)
 	                                           .withSuffix(TenantRangeImpl<true>::mapSubRange.begin);
+	const Key specialKeysTenantConfigPrefix = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT)
+	                                              .begin.withSuffix(TenantRangeImpl<true>::submoduleRange.begin)
+	                                              .withSuffix(TenantRangeImpl<true>::configureSubRange.begin);
 
 	int maxTenants;
+	int maxTenantGroups;
 	double testDuration;
 
 	enum class OperationType { SPECIAL_KEYS, MANAGEMENT_DATABASE, MANAGEMENT_TRANSACTION };
@@ -71,9 +84,11 @@ struct TenantManagementWorkload : TestWorkload {
 
 	TenantManagementWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
 		maxTenants = std::min<int>(1e8 - 1, getOption(options, "maxTenants"_sr, 1000));
+		maxTenantGroups = std::min<int>(2 * maxTenants, getOption(options, "maxTenantGroups"_sr, 20));
 		testDuration = getOption(options, "testDuration"_sr, 60.0);
 
 		localTenantNamePrefix = format("%stenant_%d_", tenantNamePrefix.toString().c_str(), clientId);
+		localTenantGroupNamePrefix = format("%stenantgroup_%d_", tenantNamePrefix.toString().c_str(), clientId);
 	}
 
 	std::string description() const override { return "TenantManagement"; }
@@ -107,7 +122,60 @@ struct TenantManagementWorkload : TestWorkload {
 		return tenant;
 	}
 
-	ACTOR Future<Void> createTenant(Database cx, TenantManagementWorkload* self) {
+	Optional<TenantGroupName> chooseTenantGroup(bool allowSystemTenantGroup) {
+		Optional<TenantGroupName> tenantGroup;
+		if (deterministicRandom()->coinflip()) {
+			tenantGroup = TenantGroupNameRef(format("%s%08d",
+			                                        localTenantGroupNamePrefix.toString().c_str(),
+			                                        deterministicRandom()->randomInt(0, maxTenantGroups)));
+			if (allowSystemTenantGroup && deterministicRandom()->random01() < 0.02) {
+				tenantGroup = tenantGroup.get().withPrefix("\xff"_sr);
+			}
+		}
+
+		return tenantGroup;
+	}
+
+	// Creates tenant(s) using the specified operation type
+	ACTOR static Future<Void> createImpl(Database cx,
+	                                     Reference<ReadYourWritesTransaction> tr,
+	                                     std::map<TenantName, TenantMapEntry> tenantsToCreate,
+	                                     OperationType operationType,
+	                                     TenantManagementWorkload* self) {
+		if (operationType == OperationType::SPECIAL_KEYS) {
+			tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
+			for (auto [tenant, entry] : tenantsToCreate) {
+				tr->set(self->specialKeysTenantMapPrefix.withSuffix(tenant), ""_sr);
+				if (entry.tenantGroup.present()) {
+					tr->set(self->specialKeysTenantConfigPrefix.withSuffix(
+					            Tuple().append(tenant).append("tenant_group"_sr).pack()),
+					        entry.tenantGroup.get());
+				}
+			}
+			wait(tr->commit());
+		} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
+			ASSERT(tenantsToCreate.size() == 1);
+			wait(success(TenantAPI::createTenant(
+			    cx.getReference(), tenantsToCreate.begin()->first, tenantsToCreate.begin()->second)));
+		} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
+			tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
+			int64_t _nextId = wait(TenantAPI::getNextTenantId(tr));
+			int64_t nextId = _nextId;
+
+			std::vector<Future<Void>> createFutures;
+			for (auto [tenant, entry] : tenantsToCreate) {
+				entry.setId(nextId++);
+				createFutures.push_back(success(TenantAPI::createTenantTransaction(tr, tenant, entry)));
+			}
+			TenantMetadata::lastTenantId.set(tr, nextId - 1);
+			wait(waitForAll(createFutures));
+			wait(tr->commit());
+		}
+
+		return Void();
+	}
+
+	ACTOR static Future<Void> createTenant(Database cx, TenantManagementWorkload* self) {
 		state OperationType operationType = TenantManagementWorkload::randomOperationType();
 		int numTenants = 1;
 
@@ -116,72 +184,79 @@ struct TenantManagementWorkload : TestWorkload {
 			numTenants = deterministicRandom()->randomInt(1, 5);
 		}
 
+		// Tracks whether any tenant exists in the database or not. This variable is updated if we have to retry
+		// the creation.
 		state bool alreadyExists = false;
+
+		// True if any tenant name starts with \xff
 		state bool hasSystemTenant = false;
 
-		state std::set<TenantName> tenantsToCreate;
+		// True if any tenant group name starts with \xff
+		state bool hasSystemTenantGroup = false;
+
+		state std::map<TenantName, TenantMapEntry> tenantsToCreate;
 		for (int i = 0; i < numTenants; ++i) {
 			TenantName tenant = self->chooseTenantName(true);
-			tenantsToCreate.insert(tenant);
+			while (tenantsToCreate.count(tenant)) {
+				tenant = self->chooseTenantName(true);
+			}
+
+			TenantMapEntry entry;
+			entry.tenantGroup = self->chooseTenantGroup(true);
+			tenantsToCreate[tenant] = entry;
 
 			alreadyExists = alreadyExists || self->createdTenants.count(tenant);
 			hasSystemTenant = hasSystemTenant || tenant.startsWith("\xff"_sr);
+			hasSystemTenantGroup = hasSystemTenantGroup || entry.tenantGroup.orDefault(""_sr).startsWith("\xff"_sr);
 		}
 
 		state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
 
 		loop {
 			try {
-				if (operationType == OperationType::SPECIAL_KEYS) {
-					tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
-					for (auto tenant : tenantsToCreate) {
-						tr->set(self->specialKeysTenantMapPrefix.withSuffix(tenant), ""_sr);
-					}
-					wait(tr->commit());
-				} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
-					ASSERT(tenantsToCreate.size() == 1);
-					wait(success(TenantAPI::createTenant(cx.getReference(), *tenantsToCreate.begin())));
-				} else {
-					tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
-
-					int64_t _nextId = wait(TenantAPI::getNextTenantId(tr));
-					int64_t nextId = _nextId;
-
-					std::vector<Future<Void>> createFutures;
-					for (auto tenant : tenantsToCreate) {
-						TenantMapEntry tenantEntry(nextId++, TenantState::READY);
-						createFutures.push_back(success(TenantAPI::createTenantTransaction(tr, tenant, tenantEntry)));
-					}
-					TenantMetadata::lastTenantId.set(tr, nextId - 1);
-					wait(waitForAll(createFutures));
-					wait(tr->commit());
-				}
+				wait(createImpl(cx, tr, tenantsToCreate, operationType, self));
 
 				if (operationType == OperationType::MANAGEMENT_DATABASE) {
 					ASSERT(!alreadyExists);
 				}
 
+				// It is not legal to create a tenant or tenant group starting with \xff
 				ASSERT(!hasSystemTenant);
+				ASSERT(!hasSystemTenantGroup);
 
-				state std::set<TenantName>::iterator tenantItr;
+				state std::map<TenantName, TenantMapEntry>::iterator tenantItr;
 				for (tenantItr = tenantsToCreate.begin(); tenantItr != tenantsToCreate.end(); ++tenantItr) {
-					if (self->createdTenants.count(*tenantItr)) {
+					// Ignore any tenants that already existed
+					if (self->createdTenants.count(tenantItr->first)) {
 						continue;
 					}
 
-					state Optional<TenantMapEntry> entry = wait(TenantAPI::tryGetTenant(cx.getReference(), *tenantItr));
+					// Read the created tenant object and verify that its state is correct
+					state Optional<TenantMapEntry> entry =
+					    wait(TenantAPI::tryGetTenant(cx.getReference(), tenantItr->first));
 					ASSERT(entry.present());
 					ASSERT(entry.get().id > self->maxId);
+					ASSERT(entry.get().tenantGroup == tenantItr->second.tenantGroup);
+					ASSERT(entry.get().tenantState == TenantState::READY);
 
+					// Update our local tenant state to include the newly created one
 					self->maxId = entry.get().id;
-					self->createdTenants[*tenantItr] = TenantData(entry.get().id, true);
+					self->createdTenants[tenantItr->first] =
+					    TenantData(entry.get().id, tenantItr->second.tenantGroup, true);
 
+					// If this tenant has a tenant group, create or update the entry for it
+					if (tenantItr->second.tenantGroup.present()) {
+						self->createdTenantGroups[tenantItr->second.tenantGroup.get()].tenantCount++;
+					}
+
+					// Randomly decide to insert a key into the tenant
 					state bool insertData = deterministicRandom()->random01() < 0.5;
 					if (insertData) {
-						state Transaction insertTr(cx, *tenantItr);
+						state Transaction insertTr(cx, tenantItr->first);
 						loop {
 							try {
-								insertTr.set(self->keyName, *tenantItr);
+								// The value stored in the key will be the name of the tenant
+								insertTr.set(self->keyName, tenantItr->first);
 								wait(insertTr.commit());
 								break;
 							} catch (Error& e) {
@@ -189,15 +264,17 @@ struct TenantManagementWorkload : TestWorkload {
 							}
 						}
 
-						self->createdTenants[*tenantItr].empty = false;
+						self->createdTenants[tenantItr->first].empty = false;
 
+						// Make sure that the key inserted correctly concatenates the tenant prefix with the
+						// relative key
 						state Transaction checkTr(cx);
 						loop {
 							try {
 								checkTr.setOption(FDBTransactionOptions::RAW_ACCESS);
 								Optional<Value> val = wait(checkTr.get(self->keyName.withPrefix(entry.get().prefix)));
 								ASSERT(val.present());
-								ASSERT(val.get() == *tenantItr);
+								ASSERT(val.get() == tenantItr->first);
 								break;
 							} catch (Error& e) {
 								wait(checkTr.onError(e));
@@ -205,28 +282,39 @@ struct TenantManagementWorkload : TestWorkload {
 						}
 					}
 
-					wait(self->checkTenant(cx, self, *tenantItr, self->createdTenants[*tenantItr]));
+					// Perform some final tenant validation
+					wait(checkTenantContents(cx, self, tenantItr->first, self->createdTenants[tenantItr->first]));
 				}
+
 				return Void();
 			} catch (Error& e) {
 				if (e.code() == error_code_invalid_tenant_name) {
 					ASSERT(hasSystemTenant);
 					return Void();
-				} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
+				} else if (e.code() == error_code_invalid_tenant_group_name) {
+					ASSERT(hasSystemTenantGroup);
+					return Void();
+				}
+
+				// Database-based operations should not need to be retried
+				else if (operationType == OperationType::MANAGEMENT_DATABASE) {
 					if (e.code() == error_code_tenant_already_exists) {
 						ASSERT(alreadyExists && operationType == OperationType::MANAGEMENT_DATABASE);
 					} else {
 						ASSERT(tenantsToCreate.size() == 1);
 						TraceEvent(SevError, "CreateTenantFailure")
 						    .error(e)
-						    .detail("TenantName", *tenantsToCreate.begin());
+						    .detail("TenantName", tenantsToCreate.begin()->first);
 					}
 					return Void();
-				} else {
+				}
+
+				// Transaction-based operations should be retried
+				else {
 					try {
 						wait(tr->onError(e));
 					} catch (Error& e) {
-						for (auto tenant : tenantsToCreate) {
+						for (auto [tenant, _] : tenantsToCreate) {
 							TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant);
 						}
 						return Void();
@@ -236,11 +324,48 @@ struct TenantManagementWorkload : TestWorkload {
 		}
 	}
 
-	ACTOR Future<Void> deleteTenant(Database cx, TenantManagementWorkload* self) {
+	ACTOR static Future<Void> deleteImpl(Database cx,
+	                                     Reference<ReadYourWritesTransaction> tr,
+	                                     TenantName beginTenant,
+	                                     Optional<TenantName> endTenant,
+	                                     std::vector<TenantName> tenants,
+	                                     OperationType operationType,
+	                                     TenantManagementWorkload* self) {
+		state int tenantIndex;
+		if (operationType == OperationType::SPECIAL_KEYS) {
+			tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
+			Key key = self->specialKeysTenantMapPrefix.withSuffix(beginTenant);
+			if (endTenant.present()) {
+				tr->clear(KeyRangeRef(key, self->specialKeysTenantMapPrefix.withSuffix(endTenant.get())));
+			} else {
+				tr->clear(key);
+			}
+			wait(tr->commit());
+		} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
+			ASSERT(tenants.size() == 1);
+			for (tenantIndex = 0; tenantIndex != tenants.size(); ++tenantIndex) {
+				wait(TenantAPI::deleteTenant(cx.getReference(), tenants[tenantIndex]));
+			}
+		} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
+			tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
+			std::vector<Future<Void>> deleteFutures;
+			for (tenantIndex = 0; tenantIndex != tenants.size(); ++tenantIndex) {
+				deleteFutures.push_back(TenantAPI::deleteTenantTransaction(tr, tenants[tenantIndex]));
+			}
+
+			wait(waitForAll(deleteFutures));
+			wait(tr->commit());
+		}
+
+		return Void();
+	}
+
+	ACTOR static Future<Void> deleteTenant(Database cx, TenantManagementWorkload* self) {
 		state TenantName beginTenant = self->chooseTenantName(true);
 		state OperationType operationType = TenantManagementWorkload::randomOperationType();
 		state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
 
+		// For transaction-based deletion, we randomly allow the deletion of a range of tenants
 		state Optional<TenantName> endTenant = operationType != OperationType::MANAGEMENT_DATABASE &&
 		                                               !beginTenant.startsWith("\xff"_sr) &&
 		                                               deterministicRandom()->random01() < 0.2
@@ -254,9 +379,15 @@ struct TenantManagementWorkload : TestWorkload {
 		}
 
 		auto itr = self->createdTenants.find(beginTenant);
+
+		// True if the beginTenant should exist and be deletable. This is updated if a deletion fails and gets
+		// retried.
 		state bool alreadyExists = itr != self->createdTenants.end();
+
+		// True if all of the tenants in the range are empty and can be deleted
 		state bool isEmpty = true;
 
+		// Collect a list of all tenants that we expect should be deleted by this operation
 		state std::vector<TenantName> tenants;
 		if (!endTenant.present()) {
 			tenants.push_back(beginTenant);
@@ -268,10 +399,12 @@ struct TenantManagementWorkload : TestWorkload {
 			}
 		}
 
+		// Check whether each tenant is empty.
 		state int tenantIndex;
 		try {
 			if (alreadyExists || endTenant.present()) {
 				for (tenantIndex = 0; tenantIndex < tenants.size(); ++tenantIndex) {
+					// For most tenants, we will delete the contents and make them empty
 					if (deterministicRandom()->random01() < 0.9) {
 						state Transaction clearTr(cx, tenants[tenantIndex]);
 						loop {
@@ -286,7 +419,9 @@ struct TenantManagementWorkload : TestWorkload {
 								wait(clearTr.onError(e));
 							}
 						}
-					} else {
+					}
+					// Otherwise, we will just report the current emptiness of the tenant
+					else {
 						auto itr = self->createdTenants.find(tenants[tenantIndex]);
 						ASSERT(itr != self->createdTenants.end());
 						isEmpty = isEmpty && itr->second.empty;
@@ -303,38 +438,34 @@ struct TenantManagementWorkload : TestWorkload {
 
 		loop {
 			try {
-				if (operationType == OperationType::SPECIAL_KEYS) {
-					tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
-					Key key = self->specialKeysTenantMapPrefix.withSuffix(beginTenant);
-					if (endTenant.present()) {
-						tr->clear(KeyRangeRef(key, self->specialKeysTenantMapPrefix.withSuffix(endTenant.get())));
-					} else {
-						tr->clear(key);
-					}
-					wait(tr->commit());
-				} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
-					ASSERT(tenants.size() == 1);
-					for (tenantIndex = 0; tenantIndex != tenants.size(); ++tenantIndex) {
-						wait(TenantAPI::deleteTenant(cx.getReference(), tenants[tenantIndex]));
-					}
-				} else {
-					tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
-					std::vector<Future<Void>> deleteFutures;
-					for (tenantIndex = 0; tenantIndex != tenants.size(); ++tenantIndex) {
-						deleteFutures.push_back(TenantAPI::deleteTenantTransaction(tr, tenants[tenantIndex]));
-					}
-
-					wait(waitForAll(deleteFutures));
-					wait(tr->commit());
-				}
+				// Attempt to delete the tenant(s)
+				wait(deleteImpl(cx, tr, beginTenant, endTenant, tenants, operationType, self));
 
+				// Transaction-based operations do not fail if the tenant isn't present. If we attempted to delete a
+				// single tenant that didn't exist, we can just return.
 				if (!alreadyExists && !endTenant.present() && operationType != OperationType::MANAGEMENT_DATABASE) {
 					return Void();
 				}
 
 				ASSERT(alreadyExists || endTenant.present());
+
+				// Deletion should not succeed if any tenant in the range wasn't empty
 				ASSERT(isEmpty);
+
+				// Update our local state to remove the deleted tenants
 				for (auto tenant : tenants) {
+					auto itr = self->createdTenants.find(tenant);
+					ASSERT(itr != self->createdTenants.end());
+
+					// If the tenant group has no tenants remaining, stop tracking it
+					if (itr->second.tenantGroup.present()) {
+						auto tenantGroupItr = self->createdTenantGroups.find(itr->second.tenantGroup.get());
+						ASSERT(tenantGroupItr != self->createdTenantGroups.end());
+						if (--tenantGroupItr->second.tenantCount == 0) {
+							self->createdTenantGroups.erase(tenantGroupItr);
+						}
+					}
+
 					self->createdTenants.erase(tenant);
 				}
 				return Void();
@@ -342,7 +473,10 @@ struct TenantManagementWorkload : TestWorkload {
 				if (e.code() == error_code_tenant_not_empty) {
 					ASSERT(!isEmpty);
 					return Void();
-				} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
+				}
+
+				// Database-based operations do not need to be retried
+				else if (operationType == OperationType::MANAGEMENT_DATABASE) {
 					if (e.code() == error_code_tenant_not_found) {
 						ASSERT(!alreadyExists && !endTenant.present());
 					} else {
@@ -352,7 +486,10 @@ struct TenantManagementWorkload : TestWorkload {
 						    .detail("EndTenant", endTenant);
 					}
 					return Void();
-				} else {
+				}
+
+				// Transaction-based operations should be retried
+				else {
 					try {
 						wait(tr->onError(e));
 					} catch (Error& e) {
@@ -367,17 +504,25 @@ struct TenantManagementWorkload : TestWorkload {
 		}
 	}
 
-	ACTOR Future<Void> checkTenant(Database cx,
-	                               TenantManagementWorkload* self,
-	                               TenantName tenant,
-	                               TenantData tenantData) {
+	// Performs some validation on a tenant's contents
+	ACTOR static Future<Void> checkTenantContents(Database cx,
+	                                              TenantManagementWorkload* self,
+	                                              TenantName tenant,
+	                                              TenantData tenantData) {
 		state Transaction tr(cx, tenant);
 		loop {
 			try {
+				// We only every store a single key in each tenant. Therefore we expect a range read of the entire
+				// tenant to return either 0 or 1 keys, depending on whether that key has been set.
 				state RangeResult result = wait(tr.getRange(KeyRangeRef(""_sr, "\xff"_sr), 2));
+
+				// An empty tenant should have no data
 				if (tenantData.empty) {
 					ASSERT(result.size() == 0);
-				} else {
+				}
+				// A non-empty tenant should have our single key. The value of that key should be the name of the
+				// tenant.
+				else {
 					ASSERT(result.size() == 1);
 					ASSERT(result[0].key == self->keyName);
 					ASSERT(result[0].value == tenant);
@@ -391,15 +536,22 @@ struct TenantManagementWorkload : TestWorkload {
 		return Void();
 	}
 
+	// Convert the JSON document returned by the special-key space when reading tenant metadata
+	// into a TenantMapEntry
 	static TenantMapEntry jsonToTenantMapEntry(ValueRef tenantJson) {
 		json_spirit::mValue jsonObject;
 		json_spirit::read_string(tenantJson.toString(), jsonObject);
 		JSONDoc jsonDoc(jsonObject);
 
 		int64_t id;
+
 		std::string prefix;
 		std::string base64Prefix;
 		std::string printablePrefix;
+		std::string tenantStateStr;
+		std::string base64TenantGroup;
+		std::string printableTenantGroup;
+
 		jsonDoc.get("id", id);
 		jsonDoc.get("prefix.base64", base64Prefix);
 		jsonDoc.get("prefix.printable", printablePrefix);
@@ -407,53 +559,80 @@ struct TenantManagementWorkload : TestWorkload {
 		prefix = base64::decoder::from_string(base64Prefix);
 		ASSERT(prefix == unprintable(printablePrefix));
 
-		Key prefixKey = KeyRef(prefix);
-		TenantMapEntry entry(id, TenantState::READY);
+		jsonDoc.get("tenant_state", tenantStateStr);
 
-		ASSERT(entry.prefix == prefixKey);
+		Optional<TenantGroupName> tenantGroup;
+		if (jsonDoc.tryGet("tenant_group.base64", base64TenantGroup)) {
+			jsonDoc.get("tenant_group.printable", printableTenantGroup);
+			std::string tenantGroupStr = base64::decoder::from_string(base64TenantGroup);
+			ASSERT(tenantGroupStr == unprintable(printableTenantGroup));
+			tenantGroup = TenantGroupNameRef(tenantGroupStr);
+		}
+
+		TenantMapEntry entry(id, TenantState::READY, tenantGroup);
+		ASSERT(entry.prefix == prefix);
 		return entry;
 	}
 
-	ACTOR Future<Void> getTenant(Database cx, TenantManagementWorkload* self) {
+	// Gets the metadata for a tenant using the specified operation type
+	ACTOR static Future<TenantMapEntry> getImpl(Database cx,
+	                                            Reference<ReadYourWritesTransaction> tr,
+	                                            TenantName tenant,
+	                                            OperationType operationType,
+	                                            TenantManagementWorkload* self) {
+		state TenantMapEntry entry;
+		if (operationType == OperationType::SPECIAL_KEYS) {
+			Key key = self->specialKeysTenantMapPrefix.withSuffix(tenant);
+			Optional<Value> value = wait(tr->get(key));
+			if (!value.present()) {
+				throw tenant_not_found();
+			}
+			entry = TenantManagementWorkload::jsonToTenantMapEntry(value.get());
+		} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
+			TenantMapEntry _entry = wait(TenantAPI::getTenant(cx.getReference(), tenant));
+			entry = _entry;
+		} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
+			tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
+			TenantMapEntry _entry = wait(TenantAPI::getTenantTransaction(tr, tenant));
+			entry = _entry;
+		}
+
+		return entry;
+	}
+
+	ACTOR static Future<Void> getTenant(Database cx, TenantManagementWorkload* self) {
 		state TenantName tenant = self->chooseTenantName(true);
-		auto itr = self->createdTenants.find(tenant);
-		state bool alreadyExists = itr != self->createdTenants.end();
-		state TenantData tenantData = itr->second;
 		state OperationType operationType = TenantManagementWorkload::randomOperationType();
 		state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
 
+		// True if the tenant should should exist and return a result
+		auto itr = self->createdTenants.find(tenant);
+		state bool alreadyExists = itr != self->createdTenants.end();
+		state TenantData tenantData = alreadyExists ? itr->second : TenantData();
+
 		loop {
 			try {
-				state TenantMapEntry entry;
-				if (operationType == OperationType::SPECIAL_KEYS) {
-					Key key = self->specialKeysTenantMapPrefix.withSuffix(tenant);
-					Optional<Value> value = wait(tr->get(key));
-					if (!value.present()) {
-						throw tenant_not_found();
-					}
-					entry = TenantManagementWorkload::jsonToTenantMapEntry(value.get());
-				} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
-					TenantMapEntry _entry = wait(TenantAPI::getTenant(cx.getReference(), tenant));
-					entry = _entry;
-				} else {
-					tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
-					TenantMapEntry _entry = wait(TenantAPI::getTenantTransaction(tr, tenant));
-					entry = _entry;
-				}
+				// Get the tenant metadata and check that it matches our local state
+				state TenantMapEntry entry = wait(getImpl(cx, tr, tenant, operationType, self));
 				ASSERT(alreadyExists);
 				ASSERT(entry.id == tenantData.id);
-				wait(self->checkTenant(cx, self, tenant, tenantData));
+				ASSERT(entry.tenantGroup == tenantData.tenantGroup);
+				wait(self->checkTenantContents(cx, self, tenant, tenantData));
 				return Void();
 			} catch (Error& e) {
-				state bool retry = true;
+				state bool retry = false;
 				state Error error = e;
 
 				if (e.code() == error_code_tenant_not_found) {
 					ASSERT(!alreadyExists);
 					return Void();
-				} else if (operationType != OperationType::MANAGEMENT_DATABASE) {
+				}
+
+				// Transaction-based operations should retry
+				else if (operationType != OperationType::MANAGEMENT_DATABASE) {
 					try {
 						wait(tr->onError(e));
+						retry = true;
 					} catch (Error& e) {
 						error = e;
 						retry = false;
@@ -468,7 +647,39 @@ struct TenantManagementWorkload : TestWorkload {
 		}
 	}
 
-	ACTOR Future<Void> listTenants(Database cx, TenantManagementWorkload* self) {
+	// Gets a list of tenants using the specified operation type
+	ACTOR static Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listImpl(
+	    Database cx,
+	    Reference<ReadYourWritesTransaction> tr,
+	    TenantName beginTenant,
+	    TenantName endTenant,
+	    int limit,
+	    OperationType operationType,
+	    TenantManagementWorkload* self) {
+		state std::vector<std::pair<TenantName, TenantMapEntry>> tenants;
+
+		if (operationType == OperationType::SPECIAL_KEYS) {
+			KeyRange range = KeyRangeRef(beginTenant, endTenant).withPrefix(self->specialKeysTenantMapPrefix);
+			RangeResult results = wait(tr->getRange(range, limit));
+			for (auto result : results) {
+				tenants.push_back(std::make_pair(result.key.removePrefix(self->specialKeysTenantMapPrefix),
+				                                 TenantManagementWorkload::jsonToTenantMapEntry(result.value)));
+			}
+		} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
+			std::vector<std::pair<TenantName, TenantMapEntry>> _tenants =
+			    wait(TenantAPI::listTenants(cx.getReference(), beginTenant, endTenant, limit));
+			tenants = _tenants;
+		} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
+			tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
+			std::vector<std::pair<TenantName, TenantMapEntry>> _tenants =
+			    wait(TenantAPI::listTenantsTransaction(tr, beginTenant, endTenant, limit));
+			tenants = _tenants;
+		}
+
+		return tenants;
+	}
+
+	ACTOR static Future<Void> listTenants(Database cx, TenantManagementWorkload* self) {
 		state TenantName beginTenant = self->chooseTenantName(false);
 		state TenantName endTenant = self->chooseTenantName(false);
 		state int limit = std::min(CLIENT_KNOBS->TOO_MANY, deterministicRandom()->randomInt(1, self->maxTenants * 2));
@@ -481,27 +692,13 @@ struct TenantManagementWorkload : TestWorkload {
 
 		loop {
 			try {
-				state std::vector<std::pair<TenantName, TenantMapEntry>> tenants;
-				if (operationType == OperationType::SPECIAL_KEYS) {
-					KeyRange range = KeyRangeRef(beginTenant, endTenant).withPrefix(self->specialKeysTenantMapPrefix);
-					RangeResult results = wait(tr->getRange(range, limit));
-					for (auto result : results) {
-						tenants.push_back(std::make_pair(result.key.removePrefix(self->specialKeysTenantMapPrefix),
-						                                 TenantManagementWorkload::jsonToTenantMapEntry(result.value)));
-					}
-				} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
-					std::vector<std::pair<TenantName, TenantMapEntry>> _tenants =
-					    wait(TenantAPI::listTenants(cx.getReference(), beginTenant, endTenant, limit));
-					tenants = _tenants;
-				} else {
-					tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
-					std::vector<std::pair<TenantName, TenantMapEntry>> _tenants =
-					    wait(TenantAPI::listTenantsTransaction(tr, beginTenant, endTenant, limit));
-					tenants = _tenants;
-				}
+				// Attempt to read the chosen list of tenants
+				state std::vector<std::pair<TenantName, TenantMapEntry>> tenants =
+				    wait(listImpl(cx, tr, beginTenant, endTenant, limit, operationType, self));
 
 				ASSERT(tenants.size() <= limit);
 
+				// Compare the resulting tenant list to the list we expected to get
 				auto localItr = self->createdTenants.lower_bound(beginTenant);
 				auto tenantMapItr = tenants.begin();
 				for (; tenantMapItr != tenants.end(); ++tenantMapItr, ++localItr) {
@@ -509,19 +706,18 @@ struct TenantManagementWorkload : TestWorkload {
 					ASSERT(localItr->first == tenantMapItr->first);
 				}
 
-				if (!(tenants.size() == limit || localItr == self->createdTenants.end())) {
-					for (auto tenant : self->createdTenants) {
-						TraceEvent("ExistingTenant").detail("Tenant", tenant.first);
-					}
-				}
+				// Make sure the list terminated at the right spot
 				ASSERT(tenants.size() == limit || localItr == self->createdTenants.end() ||
 				       localItr->first >= endTenant);
 				return Void();
 			} catch (Error& e) {
-				state bool retry = true;
+				state bool retry = false;
 				state Error error = e;
+
+				// Transaction-based operations need to be retried
 				if (operationType != OperationType::MANAGEMENT_DATABASE) {
 					try {
+						retry = true;
 						wait(tr->onError(e));
 					} catch (Error& e) {
 						error = e;
@@ -541,7 +737,7 @@ struct TenantManagementWorkload : TestWorkload {
 		}
 	}
 
-	ACTOR Future<Void> renameTenant(Database cx, TenantManagementWorkload* self) {
+	ACTOR static Future<Void> renameTenant(Database cx, TenantManagementWorkload* self) {
 		// Currently only supporting MANAGEMENT_DATABASE op, so numTenants should always be 1
 		// state OperationType operationType = TenantManagementWorkload::randomOperationType();
 		int numTenants = 1;
@@ -596,7 +792,7 @@ struct TenantManagementWorkload : TestWorkload {
 							}
 						}
 					}
-					wait(self->checkTenant(cx, self, newTenantName, self->createdTenants[newTenantName]));
+					wait(self->checkTenantContents(cx, self, newTenantName, self->createdTenants[newTenantName]));
 				}
 				return Void();
 			} catch (Error& e) {
@@ -622,31 +818,273 @@ struct TenantManagementWorkload : TestWorkload {
 		}
 	}
 
+	// Changes the configuration of a tenant
+	ACTOR static Future<Void> configureImpl(Reference<ReadYourWritesTransaction> tr,
+	                                        TenantName tenant,
+	                                        std::map<Standalone<StringRef>, Optional<Value>> configParameters,
+	                                        OperationType operationType,
+	                                        bool specialKeysUseInvalidTuple,
+	                                        TenantManagementWorkload* self) {
+		if (operationType == OperationType::SPECIAL_KEYS) {
+			tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
+			for (auto const& [config, value] : configParameters) {
+				Tuple t;
+				if (specialKeysUseInvalidTuple) {
+					// Wrong number of items
+					if (deterministicRandom()->coinflip()) {
+						int numItems = deterministicRandom()->randomInt(0, 3);
+						if (numItems > 0) {
+							t.append(tenant);
+						}
+						if (numItems > 1) {
+							t.append(config).append(""_sr);
+						}
+					}
+					// Wrong data types
+					else {
+						if (deterministicRandom()->coinflip()) {
+							t.append(0).append(config);
+						} else {
+							t.append(tenant).append(0);
+						}
+					}
+				} else {
+					t.append(tenant).append(config);
+				}
+				if (value.present()) {
+					tr->set(self->specialKeysTenantConfigPrefix.withSuffix(t.pack()), value.get());
+				} else {
+					tr->clear(self->specialKeysTenantConfigPrefix.withSuffix(t.pack()));
+				}
+			}
+
+			wait(tr->commit());
+			ASSERT(!specialKeysUseInvalidTuple);
+		} else {
+			// We don't have a transaction or database variant of this function
+			ASSERT(false);
+		}
+
+		return Void();
+	}
+
+	ACTOR static Future<Void> configureTenant(Database cx, TenantManagementWorkload* self) {
+		state OperationType operationType = OperationType::SPECIAL_KEYS;
+
+		state TenantName tenant = self->chooseTenantName(true);
+		auto itr = self->createdTenants.find(tenant);
+		state bool exists = itr != self->createdTenants.end();
+		state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
+
+		state std::map<Standalone<StringRef>, Optional<Value>> configuration;
+		state Optional<TenantGroupName> newTenantGroup;
+
+		// If true, the options generated may include an unknown option
+		state bool hasInvalidOption = deterministicRandom()->random01() < 0.1;
+
+		// True if any tenant group name starts with \xff
+		state bool hasSystemTenantGroup = false;
+
+		state bool specialKeysUseInvalidTuple =
+		    operationType == OperationType::SPECIAL_KEYS && deterministicRandom()->random01() < 0.1;
+
+		// Generate a tenant group. Sometimes do this at the same time that we include an invalid option to ensure
+		// that the configure function still fails
+		if (!hasInvalidOption || deterministicRandom()->coinflip()) {
+			newTenantGroup = self->chooseTenantGroup(true);
+			hasSystemTenantGroup = hasSystemTenantGroup || newTenantGroup.orDefault(""_sr).startsWith("\xff"_sr);
+			configuration["tenant_group"_sr] = newTenantGroup;
+		}
+		if (hasInvalidOption) {
+			configuration["invalid_option"_sr] = ""_sr;
+		}
+
+		state bool hasInvalidSpecialKeyTuple = deterministicRandom()->random01() < 0.05;
+
+		loop {
+			try {
+				wait(configureImpl(tr, tenant, configuration, operationType, specialKeysUseInvalidTuple, self));
+
+				ASSERT(exists);
+				ASSERT(!hasInvalidOption);
+				ASSERT(!hasSystemTenantGroup);
+				ASSERT(!hasInvalidSpecialKeyTuple);
+
+				auto itr = self->createdTenants.find(tenant);
+				if (itr->second.tenantGroup.present()) {
+					auto tenantGroupItr = self->createdTenantGroups.find(itr->second.tenantGroup.get());
+					ASSERT(tenantGroupItr != self->createdTenantGroups.end());
+					if (--tenantGroupItr->second.tenantCount == 0) {
+						self->createdTenantGroups.erase(tenantGroupItr);
+					}
+				}
+				if (newTenantGroup.present()) {
+					self->createdTenantGroups[newTenantGroup.get()].tenantCount++;
+				}
+				itr->second.tenantGroup = newTenantGroup;
+				return Void();
+			} catch (Error& e) {
+				state Error error = e;
+				if (e.code() == error_code_tenant_not_found) {
+					ASSERT(!exists);
+					return Void();
+				} else if (e.code() == error_code_special_keys_api_failure) {
+					ASSERT(hasInvalidSpecialKeyTuple || hasInvalidOption);
+					return Void();
+				} else if (e.code() == error_code_invalid_tenant_configuration) {
+					ASSERT(hasInvalidOption);
+					return Void();
+				} else if (e.code() == error_code_invalid_tenant_group_name) {
+					ASSERT(hasSystemTenantGroup);
+					return Void();
+				}
+
+				try {
+					wait(tr->onError(e));
+				} catch (Error&) {
+					TraceEvent(SevError, "ConfigureTenantFailure").error(error).detail("TenantName", tenant);
+					return Void();
+				}
+			}
+		}
+	}
+
 	Future<Void> start(Database const& cx) override { return _start(cx, this); }
 	ACTOR Future<Void> _start(Database cx, TenantManagementWorkload* self) {
 		state double start = now();
+
+		// Run a random sequence of tenant management operations for the duration of the test
 		while (now() < start + self->testDuration) {
-			state int operation = deterministicRandom()->randomInt(0, 5);
+			state int operation = deterministicRandom()->randomInt(0, 6);
 			if (operation == 0) {
-				wait(self->createTenant(cx, self));
+				wait(createTenant(cx, self));
 			} else if (operation == 1) {
-				wait(self->deleteTenant(cx, self));
+				wait(deleteTenant(cx, self));
 			} else if (operation == 2) {
-				wait(self->getTenant(cx, self));
+				wait(getTenant(cx, self));
 			} else if (operation == 3) {
-				wait(self->listTenants(cx, self));
-			} else {
-				wait(self->renameTenant(cx, self));
+				wait(listTenants(cx, self));
+			} else if (operation == 4) {
+				wait(renameTenant(cx, self));
+			} else if (operation == 5) {
+				wait(configureTenant(cx, self));
 			}
 		}
 
 		return Void();
 	}
 
+	// Check that the given tenant group has the expected number of tenants
+	ACTOR template <class DB>
+	static Future<Void> checkTenantGroupTenantCount(Reference<DB> db, TenantGroupName tenantGroup, int expectedCount) {
+		TenantGroupName const& tenantGroupRef = tenantGroup;
+		int const& expectedCountRef = expectedCount;
+
+		KeyBackedSet<Tuple>::RangeResultType tenants =
+		    wait(runTransaction(db, [tenantGroupRef, expectedCountRef](Reference<typename DB::TransactionT> tr) {
+			    tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
+			    return TenantMetadata::tenantGroupTenantIndex.getRange(tr,
+			                                                           Tuple::makeTuple(tenantGroupRef),
+			                                                           Tuple::makeTuple(keyAfter(tenantGroupRef)),
+			                                                           expectedCountRef + 1);
+		    }));
+
+		ASSERT(tenants.results.size() == expectedCount && !tenants.more);
+		return Void();
+	}
+
+	// Verify that the set of tenants in the database matches our local state
+	ACTOR static Future<Void> compareTenants(Database cx, TenantManagementWorkload* self) {
+		state std::map<TenantName, TenantData>::iterator localItr = self->createdTenants.begin();
+		state std::vector<Future<Void>> checkTenants;
+		state TenantName beginTenant = ""_sr.withPrefix(self->localTenantNamePrefix);
+		state TenantName endTenant = "\xff\xff"_sr.withPrefix(self->localTenantNamePrefix);
+
+		loop {
+			// Read the tenant list
+			state std::vector<std::pair<TenantName, TenantMapEntry>> tenants =
+			    wait(TenantAPI::listTenants(cx.getReference(), beginTenant, endTenant, 1000));
+
+			auto dataItr = tenants.begin();
+
+			TenantNameRef lastTenant;
+			while (dataItr != tenants.end()) {
+				ASSERT(localItr != self->createdTenants.end());
+				ASSERT(dataItr->first == localItr->first);
+				ASSERT(dataItr->second.tenantGroup == localItr->second.tenantGroup);
+
+				checkTenants.push_back(checkTenantContents(cx, self, dataItr->first, localItr->second));
+				lastTenant = dataItr->first;
+
+				++localItr;
+				++dataItr;
+			}
+
+			if (tenants.size() < 1000) {
+				break;
+			} else {
+				beginTenant = keyAfter(lastTenant);
+			}
+		}
+
+		ASSERT(localItr == self->createdTenants.end());
+		wait(waitForAll(checkTenants));
+		return Void();
+	}
+
+	// Verify that the set of tenants in the database matches our local state
+	ACTOR static Future<Void> compareTenantGroups(Database cx, TenantManagementWorkload* self) {
+		// Verify that the set of tena
+		state std::map<TenantName, TenantGroupData>::iterator localItr = self->createdTenantGroups.begin();
+		state TenantName beginTenantGroup = ""_sr.withPrefix(self->localTenantGroupNamePrefix);
+		state TenantName endTenantGroup = "\xff\xff"_sr.withPrefix(self->localTenantGroupNamePrefix);
+		state std::vector<Future<Void>> checkTenantGroups;
+
+		loop {
+			// Read the tenant group list
+			state KeyBackedRangeResult<std::pair<TenantGroupName, TenantGroupEntry>> tenantGroups;
+			TenantName const& beginTenantGroupRef = beginTenantGroup;
+			TenantName const& endTenantGroupRef = endTenantGroup;
+			KeyBackedRangeResult<std::pair<TenantGroupName, TenantGroupEntry>> _tenantGroups = wait(runTransaction(
+			    cx.getReference(), [beginTenantGroupRef, endTenantGroupRef](Reference<ReadYourWritesTransaction> tr) {
+				    tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
+				    return TenantMetadata::tenantGroupMap.getRange(tr, beginTenantGroupRef, endTenantGroupRef, 1000);
+			    }));
+			tenantGroups = _tenantGroups;
+
+			auto dataItr = tenantGroups.results.begin();
+
+			TenantGroupNameRef lastTenantGroup;
+			while (dataItr != tenantGroups.results.end()) {
+				ASSERT(localItr != self->createdTenantGroups.end());
+				ASSERT(dataItr->first == localItr->first);
+				lastTenantGroup = dataItr->first;
+
+				checkTenantGroups.push_back(
+				    checkTenantGroupTenantCount(cx.getReference(), dataItr->first, localItr->second.tenantCount));
+
+				++localItr;
+				++dataItr;
+			}
+
+			if (!tenantGroups.more) {
+				break;
+			} else {
+				beginTenantGroup = keyAfter(lastTenantGroup);
+			}
+		}
+
+		ASSERT(localItr == self->createdTenantGroups.end());
+		return Void();
+	}
+
 	Future<bool> check(Database const& cx) override { return _check(cx, this); }
-	ACTOR Future<bool> _check(Database cx, TenantManagementWorkload* self) {
+	ACTOR static Future<bool> _check(Database cx, TenantManagementWorkload* self) {
 		state Transaction tr(cx);
 
+		// Check that the key we set outside of the tenant is present and has the correct value
+		// This is the same key we set inside some of our tenants, so this checks that no tenant
+		// writes accidentally happened in the raw key-space
 		loop {
 			try {
 				tr.setOption(FDBTransactionOptions::RAW_ACCESS);
@@ -658,34 +1096,7 @@ struct TenantManagementWorkload : TestWorkload {
 			}
 		}
 
-		state std::map<TenantName, TenantData>::iterator itr = self->createdTenants.begin();
-		state std::vector<Future<Void>> checkTenants;
-		state TenantName beginTenant = ""_sr.withPrefix(self->localTenantNamePrefix);
-		state TenantName endTenant = "\xff\xff"_sr.withPrefix(self->localTenantNamePrefix);
-
-		loop {
-			std::vector<std::pair<TenantName, TenantMapEntry>> tenants =
-			    wait(TenantAPI::listTenants(cx.getReference(), beginTenant, endTenant, 1000));
-
-			TenantNameRef lastTenant;
-			for (auto tenant : tenants) {
-				ASSERT(itr != self->createdTenants.end());
-				ASSERT(tenant.first == itr->first);
-				checkTenants.push_back(self->checkTenant(cx, self, tenant.first, itr->second));
-				lastTenant = tenant.first;
-				++itr;
-			}
-
-			if (tenants.size() < 1000) {
-				break;
-			} else {
-				beginTenant = keyAfter(lastTenant);
-			}
-		}
-
-		ASSERT(itr == self->createdTenants.end());
-		wait(waitForAll(checkTenants));
-
+		wait(compareTenants(cx, self) && compareTenantGroups(cx, self));
 		return true;
 	}
 
diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h
index 519a0cb8b0..56520c8c15 100755
--- a/flow/include/flow/error_definitions.h
+++ b/flow/include/flow/error_definitions.h
@@ -228,11 +228,13 @@ ERROR( tenant_name_required, 2130, "Tenant name must be specified to access data
 ERROR( tenant_not_found, 2131, "Tenant does not exist" )
 ERROR( tenant_already_exists, 2132, "A tenant with the given name already exists" )
 ERROR( tenant_not_empty, 2133, "Cannot delete a non-empty tenant" )
-ERROR( invalid_tenant_name, 2134, "Tenant name cannot begin with \\xff");
-ERROR( tenant_prefix_allocator_conflict, 2135, "The database already has keys stored at the prefix allocated for the tenant");
-ERROR( tenants_disabled, 2136, "Tenants have been disabled in the cluster");
-ERROR( unknown_tenant, 2137, "Tenant is not available from this server")
-ERROR( illegal_tenant_access, 2138, "Illegal tenant access")
+ERROR( invalid_tenant_name, 2134, "Tenant name cannot begin with \\xff" )
+ERROR( tenant_prefix_allocator_conflict, 2135, "The database already has keys stored at the prefix allocated for the tenant" )
+ERROR( tenants_disabled, 2136, "Tenants have been disabled in the cluster" )
+ERROR( unknown_tenant, 2137, "Tenant is not available from this server" )
+ERROR( illegal_tenant_access, 2138, "Illegal tenant access" )
+ERROR( invalid_tenant_group_name, 2139, "Tenant group name cannot begin with \\xff" )
+ERROR( invalid_tenant_configuration, 2140, "Tenant configuration is invalid" )
 
 // 2200 - errors from bindings and official APIs
 ERROR( api_version_unset, 2200, "API version is not set" )
diff --git a/tests/slow/SwizzledTenantManagement.toml b/tests/slow/SwizzledTenantManagement.toml
index e7129f1061..d34544eaa8 100644
--- a/tests/slow/SwizzledTenantManagement.toml
+++ b/tests/slow/SwizzledTenantManagement.toml
@@ -10,7 +10,7 @@ runSetup = true
 
     [[test.workload]]
     testName = 'TenantManagement'
-	maxTenants = 1000
+    maxTenants = 1000
     testDuration = 60
 
     [[test.workload]]
diff --git a/tests/slow/TenantManagement.toml b/tests/slow/TenantManagement.toml
index 5848bdf4e3..f03bc421f2 100644
--- a/tests/slow/TenantManagement.toml
+++ b/tests/slow/TenantManagement.toml
@@ -10,5 +10,5 @@ runSetup = true
 
     [[test.workload]]
     testName = 'TenantManagement'
-	maxTenants = 1000
-	testDuration = 60
+    maxTenants = 1000
+    testDuration = 60