From 2f67328a0c2d633e92ff00a4f3695f7baf133f5c Mon Sep 17 00:00:00 2001
From: "A.J. Beamon" <aj.beamon@snowflake.com>
Date: Thu, 30 Jun 2022 15:03:37 -0700
Subject: [PATCH 1/2] Update the tenant special keys submodule to support
 multiple sub-ranges. This will enable future work that allows configuring
 tenants at the same time as creating them.

---
 bindings/bindingtester/bindingtester.py       |   2 +-
 bindings/c/test/fdb_api.hpp                   |  28 ++-
 bindings/c/test/unit/unit_tests.cpp           |   8 +-
 .../src/main/com/apple/foundationdb/FDB.java  |   7 +
 .../apple/foundationdb/TenantManagement.java  |   4 +-
 bindings/python/fdb/__init__.py               |   2 +
 bindings/python/fdb/tenant_management.py      |   2 +-
 bindings/python/tests/tenant_tests.py         |   4 +-
 documentation/sphinx/source/special-keys.rst  |   5 +-
 documentation/sphinx/source/tenants.rst       |   2 +-
 fdbcli/TenantCommands.actor.cpp               |   4 +-
 fdbclient/NativeAPI.actor.cpp                 |  11 +-
 fdbclient/SpecialKeySpace.actor.cpp           |   6 +-
 fdbclient/TenantSpecialKeys.actor.cpp         | 149 -------------
 fdbclient/TenantSpecialKeys.cpp               |  33 +++
 .../include/fdbclient/SpecialKeySpace.actor.h |  11 -
 .../fdbclient/TenantSpecialKeys.actor.h       | 209 ++++++++++++++++++
 .../TenantManagementWorkload.actor.cpp        |   6 +-
 18 files changed, 299 insertions(+), 194 deletions(-)
 delete mode 100644 fdbclient/TenantSpecialKeys.actor.cpp
 create mode 100644 fdbclient/TenantSpecialKeys.cpp
 create mode 100644 fdbclient/include/fdbclient/TenantSpecialKeys.actor.h

diff --git a/bindings/bindingtester/bindingtester.py b/bindings/bindingtester/bindingtester.py
index a87ad6b655..508ede8998 100755
--- a/bindings/bindingtester/bindingtester.py
+++ b/bindings/bindingtester/bindingtester.py
@@ -288,7 +288,7 @@ class TestRunner(object):
             tr = self.db.create_transaction()
             try:
                 tr.options.set_special_key_space_enable_writes()
-                del tr[b'\xff\xff/management/tenant_map/' : b'\xff\xff/management/tenant_map0']
+                del tr[b'\xff\xff/management/tenant/map/' : b'\xff\xff/management/tenant/map0']
                 tr.commit().wait()
                 break
             except fdb.FDBError as e:
diff --git a/bindings/c/test/fdb_api.hpp b/bindings/c/test/fdb_api.hpp
index 24ef97fad0..0ac415c7b3 100644
--- a/bindings/c/test/fdb_api.hpp
+++ b/bindings/c/test/fdb_api.hpp
@@ -197,16 +197,6 @@ inline int maxApiVersion() {
 	return native::fdb_get_max_api_version();
 }
 
-inline Error selectApiVersionNothrow(int version) {
-	return Error(native::fdb_select_api_version(version));
-}
-
-inline void selectApiVersion(int version) {
-	if (auto err = selectApiVersionNothrow(version)) {
-		throwError(fmt::format("ERROR: fdb_select_api_version({}): ", version), err);
-	}
-}
-
 namespace network {
 
 inline Error setOptionNothrow(FDBNetworkOption option, BytesRef str) noexcept {
@@ -588,14 +578,15 @@ class Tenant final {
 	friend class Database;
 	std::shared_ptr<native::FDBTenant> tenant;
 
-	static constexpr CharsRef tenantManagementMapPrefix = "\xff\xff/management/tenant_map/";
-
 	explicit Tenant(native::FDBTenant* tenant_raw) {
 		if (tenant_raw)
 			tenant = std::shared_ptr<native::FDBTenant>(tenant_raw, &native::fdb_tenant_destroy);
 	}
 
 public:
+	// This should only be mutated by API versioning
+	static inline CharsRef tenantManagementMapPrefix = "\xff\xff/management/tenant/map/";
+
 	Tenant(const Tenant&) noexcept = default;
 	Tenant& operator=(const Tenant&) noexcept = default;
 	Tenant() noexcept : tenant(nullptr) {}
@@ -684,6 +675,19 @@ public:
 	}
 };
 
+inline Error selectApiVersionNothrow(int version) {
+	if (version < 720) {
+		Tenant::tenantManagementMapPrefix = "\xff\xff/management/tenant_map/";
+	}
+	return Error(native::fdb_select_api_version(version));
+}
+
+inline void selectApiVersion(int version) {
+	if (auto err = selectApiVersionNothrow(version)) {
+		throwError(fmt::format("ERROR: fdb_select_api_version({}): ", version), err);
+	}
+}
+
 } // namespace fdb
 
 template <>
diff --git a/bindings/c/test/unit/unit_tests.cpp b/bindings/c/test/unit/unit_tests.cpp
index 3152d7b1a4..60af78c885 100644
--- a/bindings/c/test/unit/unit_tests.cpp
+++ b/bindings/c/test/unit/unit_tests.cpp
@@ -2624,7 +2624,7 @@ TEST_CASE("Tenant create, access, and delete") {
 	fdb::Transaction tr(db);
 	while (1) {
 		fdb_check(tr.set_option(FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, nullptr, 0));
-		tr.set("\xff\xff/management/tenant_map/" + tenantName, "");
+		tr.set("\xff\xff/management/tenant/map/" + tenantName, "");
 		fdb::EmptyFuture commitFuture = tr.commit();
 		fdb_error_t err = wait_future(commitFuture);
 		if (err) {
@@ -2637,8 +2637,8 @@ TEST_CASE("Tenant create, access, and delete") {
 	}
 
 	while (1) {
-		StringRef begin = "\xff\xff/management/tenant_map/"_sr;
-		StringRef end = "\xff\xff/management/tenant_map0"_sr;
+		StringRef begin = "\xff\xff/management/tenant/map/"_sr;
+		StringRef end = "\xff\xff/management/tenant/map0"_sr;
 
 		fdb_check(tr.set_option(FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, nullptr, 0));
 		fdb::KeyValueArrayFuture f = tr.get_range(FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(begin.begin(), begin.size()),
@@ -2716,7 +2716,7 @@ TEST_CASE("Tenant create, access, and delete") {
 
 	while (1) {
 		fdb_check(tr.set_option(FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, nullptr, 0));
-		tr.clear("\xff\xff/management/tenant_map/" + tenantName);
+		tr.clear("\xff\xff/management/tenant/map/" + tenantName);
 		fdb::EmptyFuture commitFuture = tr.commit();
 		fdb_error_t err = wait_future(commitFuture);
 		if (err) {
diff --git a/bindings/java/src/main/com/apple/foundationdb/FDB.java b/bindings/java/src/main/com/apple/foundationdb/FDB.java
index 4932e58ba5..5215d0836e 100644
--- a/bindings/java/src/main/com/apple/foundationdb/FDB.java
+++ b/bindings/java/src/main/com/apple/foundationdb/FDB.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.apple.foundationdb.tuple.ByteArrayUtil;
+
 /**
  * The starting point for accessing FoundationDB.
  *  <br>
@@ -189,6 +191,11 @@ public class FDB {
 		Select_API_version(version);
 		singleton = new FDB(version);
 
+		if (version < 720) {
+			TenantManagement.TENANT_MAP_PREFIX = ByteArrayUtil.join(new byte[] { (byte)255, (byte)255 },
+					"/management/tenant_map/".getBytes());
+		}
+
 		return singleton;
 	}
 
diff --git a/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java b/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java
index 262ebfef7c..a1de1b5d8e 100644
--- a/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java
+++ b/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java
@@ -40,8 +40,8 @@ import com.apple.foundationdb.tuple.Tuple;
  * The FoundationDB API includes function to manage the set of tenants in a cluster.
  */
 public class TenantManagement {
-	static final byte[] TENANT_MAP_PREFIX = ByteArrayUtil.join(new byte[] { (byte)255, (byte)255 },
-	                                                           "/management/tenant_map/".getBytes());
+	static byte[] TENANT_MAP_PREFIX = ByteArrayUtil.join(new byte[] { (byte)255, (byte)255 },
+														 "/management/tenant/map/".getBytes());
 
 	/**
 	 * Creates a new tenant in the cluster. If the tenant already exists, this operation will complete
diff --git a/bindings/python/fdb/__init__.py b/bindings/python/fdb/__init__.py
index 59af33873a..e7d1a8bc30 100644
--- a/bindings/python/fdb/__init__.py
+++ b/bindings/python/fdb/__init__.py
@@ -102,6 +102,8 @@ def api_version(ver):
 
     if ver >= 710:
         import fdb.tenant_management
+        if ver < 720:
+            fdb.tenant_management._tenant_map_prefix = b'\xff\xff/management/tenant_map/'
 
     if ver < 610:
         globals()["init"] = getattr(fdb.impl, "init")
diff --git a/bindings/python/fdb/tenant_management.py b/bindings/python/fdb/tenant_management.py
index 061c6961b3..84c3a46d03 100644
--- a/bindings/python/fdb/tenant_management.py
+++ b/bindings/python/fdb/tenant_management.py
@@ -25,7 +25,7 @@ https://apple.github.io/foundationdb/api-python.html"""
 
 from fdb import impl as _impl
 
-_tenant_map_prefix = b'\xff\xff/management/tenant_map/'
+_tenant_map_prefix = b'\xff\xff/management/tenant/map/'
 
 # If the existence_check_marker is an empty list, then check whether the tenant exists. 
 # After the check, append an item to the existence_check_marker list so that subsequent
diff --git a/bindings/python/tests/tenant_tests.py b/bindings/python/tests/tenant_tests.py
index 0570c42537..899be41ee3 100755
--- a/bindings/python/tests/tenant_tests.py
+++ b/bindings/python/tests/tenant_tests.py
@@ -78,12 +78,12 @@ def test_tenant_operations(db):
     tenant1[b'tenant_test_key'] = b'tenant1'
     tenant2[b'tenant_test_key'] = b'tenant2'
 
-    tenant1_entry = db[b'\xff\xff/management/tenant_map/tenant1']
+    tenant1_entry = db[b'\xff\xff/management/tenant/map/tenant1']
     tenant1_json = json.loads(tenant1_entry)
     prefix1 = tenant1_json['prefix'].encode('utf8')
     assert prefix1 == p1
 
-    tenant2_entry = db[b'\xff\xff/management/tenant_map/tenant2']
+    tenant2_entry = db[b'\xff\xff/management/tenant/map/tenant2']
     tenant2_json = json.loads(tenant2_entry)
     prefix2 = tenant2_json['prefix'].encode('utf8')
     assert prefix2 == p2
diff --git a/documentation/sphinx/source/special-keys.rst b/documentation/sphinx/source/special-keys.rst
index 12ec0e00e0..83ffeacac1 100644
--- a/documentation/sphinx/source/special-keys.rst
+++ b/documentation/sphinx/source/special-keys.rst
@@ -204,7 +204,7 @@ that process, and wait for necessary data to be moved away.
 #. ``\xff\xff/management/failed_locality/<locality>`` Read/write. Indicates that the cluster should consider matching processes as permanently failed. This allows the cluster to avoid maintaining extra state and doing extra work in the hope that these processes come back. See :ref:`removing machines from a cluster <removing-machines-from-a-cluster>` for documentation for the corresponding fdbcli command.
 #. ``\xff\xff/management/options/excluded_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/excluded_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
 #. ``\xff\xff/management/options/failed_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
-#. ``\xff\xff/management/tenant_map/<tenant>`` Read/write. Setting a key in this range to any value will result in a tenant being created with name ``<tenant>``. Clearing a key in this range will delete the tenant with name ``<tenant>``. Reading all or a portion of this range will return the list of tenants currently present in the cluster, excluding any changes in this transaction. Values read in this range will be JSON objects containing the metadata for the associated tenants.
+#. ``\xff\xff/management/tenant/map/<tenant>`` Read/write. Setting a key in this range to any value will result in a tenant being created with name ``<tenant>``. Clearing a key in this range will delete the tenant with name ``<tenant>``. Reading all or a portion of this range will return the list of tenants currently present in the cluster, excluding any changes in this transaction. Values read in this range will be JSON objects containing the metadata for the associated tenants.
 
 An exclusion is syntactically either an ip address (e.g. ``127.0.0.1``), or
 an ip address and port (e.g. ``127.0.0.1:4500``) or any locality (e.g ``locality_dcid:primary-satellite`` or
@@ -270,7 +270,8 @@ Deprecated Keys
 
 Listed below are the special keys that have been deprecated. Special key(s) will no longer be accessible when the client specifies an API version equal to or larger than the version where they were deprecated. Clients specifying older API versions will be able to continue using the deprecated key(s).
 
-#. ``\xff\xff/management/profiling/<client_txn_sample_rate|client_txn_size_limit>`` Deprecated as of API version 7.2. The corresponding functionalities are now covered by the global configuration module. For details, see :doc:`global-configuration`. Read/write. Changing these two keys will change the corresponding system keys ``\xff\x02/fdbClientInfo/<client_txn_sample_rate|client_txn_size_limit>``, respectively. The value of ``\xff\xff/management/client_txn_sample_rate`` is a literal text of ``double``, and the value of ``\xff\xff/management/client_txn_size_limit`` is a literal text of ``int64_t``. A special value ``default`` can be set to or read from these two keys, representing the client profiling is disabled. In addition, ``clear`` in this range is not allowed. For more details, see help text of ``fdbcli`` command ``profile client``.
+#. ``\xff\xff/management/profiling/<client_txn_sample_rate|client_txn_size_limit>`` Deprecated as of API version 720. The corresponding functionalities are now covered by the global configuration module. For details, see :doc:`global-configuration`. Read/write. Changing these two keys will change the corresponding system keys ``\xff\x02/fdbClientInfo/<client_txn_sample_rate|client_txn_size_limit>``, respectively. The value of ``\xff\xff/management/client_txn_sample_rate`` is a literal text of ``double``, and the value of ``\xff\xff/management/client_txn_size_limit`` is a literal text of ``int64_t``. A special value ``default`` can be set to or read from these two keys, representing the client profiling is disabled. In addition, ``clear`` in this range is not allowed. For more details, see help text of ``fdbcli`` command ``profile client``.
+#. ``\xff\xff/management/tenant_map/<tenant>`` Removed as of API version 720 and renamed to ``\xff\xff/management/tenant/map/<tenant>``.
 
 Versioning
 ==========
diff --git a/documentation/sphinx/source/tenants.rst b/documentation/sphinx/source/tenants.rst
index 4ef41f74b4..d22603b20e 100644
--- a/documentation/sphinx/source/tenants.rst
+++ b/documentation/sphinx/source/tenants.rst
@@ -31,7 +31,7 @@ FoundationDB clusters support the following tenant modes:
 Creating and deleting tenants
 =============================
 
-Tenants can be created and deleted using the ``\xff\xff/management/tenant_map/<tenant_name>`` :doc:`special key <special-keys>` range as well as by using APIs provided in some language bindings. 
+Tenants can be created and deleted using the ``\xff\xff/management/tenant/map/<tenant_name>`` :doc:`special key <special-keys>` range as well as by using APIs provided in some language bindings. 
 
 Tenants can be created with any byte-string name that does not begin with the ``\xff`` character. Once created, a tenant will be assigned an ID and a prefix where its data will reside.
 
diff --git a/fdbcli/TenantCommands.actor.cpp b/fdbcli/TenantCommands.actor.cpp
index bdf869a438..50608e404e 100644
--- a/fdbcli/TenantCommands.actor.cpp
+++ b/fdbcli/TenantCommands.actor.cpp
@@ -33,8 +33,8 @@
 
 namespace fdb_cli {
 
-const KeyRangeRef tenantSpecialKeyRange(LiteralStringRef("\xff\xff/management/tenant_map/"),
-                                        LiteralStringRef("\xff\xff/management/tenant_map0"));
+const KeyRangeRef tenantSpecialKeyRange(LiteralStringRef("\xff\xff/management/tenant/map/"),
+                                        LiteralStringRef("\xff\xff/management/tenant/map0"));
 
 // createtenant command
 ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp
index beb0f1fb3d..117e904010 100644
--- a/fdbclient/NativeAPI.actor.cpp
+++ b/fdbclient/NativeAPI.actor.cpp
@@ -61,6 +61,7 @@
 #include "fdbclient/SpecialKeySpace.actor.h"
 #include "fdbclient/StorageServerInterface.h"
 #include "fdbclient/SystemData.h"
+#include "fdbclient/TenantSpecialKeys.actor.h"
 #include "fdbclient/TransactionLineage.h"
 #include "fdbclient/versions.h"
 #include "fdbrpc/WellKnownEndpoints.h"
@@ -1491,11 +1492,17 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
 	smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
 	globalConfig = std::make_unique<GlobalConfig>(this);
 
-	if (apiVersionAtLeast(710)) {
+	if (apiVersionAtLeast(720)) {
 		registerSpecialKeysImpl(
 		    SpecialKeySpace::MODULE::MANAGEMENT,
 		    SpecialKeySpace::IMPLTYPE::READWRITE,
-		    std::make_unique<TenantMapRangeImpl>(SpecialKeySpace::getManagementApiCommandRange("tenantmap")));
+		    std::make_unique<TenantRangeImpl<true>>(SpecialKeySpace::getManagementApiCommandRange("tenant")));
+	}
+	if (apiVersionAtLeast(710) && !apiVersionAtLeast(720)) {
+		registerSpecialKeysImpl(
+		    SpecialKeySpace::MODULE::MANAGEMENT,
+		    SpecialKeySpace::IMPLTYPE::READWRITE,
+		    std::make_unique<TenantRangeImpl<false>>(SpecialKeySpace::getManagementApiCommandRange("tenantmap")));
 	}
 	if (apiVersionAtLeast(700)) {
 		registerSpecialKeysImpl(SpecialKeySpace::MODULE::ERRORMSG,
diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp
index c64a361357..baa3803c08 100644
--- a/fdbclient/SpecialKeySpace.actor.cpp
+++ b/fdbclient/SpecialKeySpace.actor.cpp
@@ -55,8 +55,6 @@ static bool isAlphaNumeric(const std::string& key) {
 }
 } // namespace
 
-const KeyRangeRef TenantMapRangeImpl::submoduleRange = KeyRangeRef("tenant_map/"_sr, "tenant_map0"_sr);
-
 std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToBoundary = {
 	{ SpecialKeySpace::MODULE::TRANSACTION,
 	  KeyRangeRef(LiteralStringRef("\xff\xff/transaction/"), LiteralStringRef("\xff\xff/transaction0")) },
@@ -117,7 +115,9 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
 	{ "datadistribution",
 	  KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
 	      .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
-	{ "tenantmap", TenantMapRangeImpl::submoduleRange.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
+	{ "tenant", KeyRangeRef("tenant/"_sr, "tenant0"_sr).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
+	{ "tenantmap",
+	  KeyRangeRef("tenant_map/"_sr, "tenant_map0"_sr).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
 };
 
 std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiCommandToRange = {
diff --git a/fdbclient/TenantSpecialKeys.actor.cpp b/fdbclient/TenantSpecialKeys.actor.cpp
deleted file mode 100644
index 7aed868f54..0000000000
--- a/fdbclient/TenantSpecialKeys.actor.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * TenantSpecialKeys.actor.cpp
- *
- * This source file is part of the FoundationDB open source project
- *
- * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "fdbclient/ActorLineageProfiler.h"
-#include "fdbclient/FDBOptions.g.h"
-#include "fdbclient/Knobs.h"
-#include "fdbclient/DatabaseContext.h"
-#include "fdbclient/SpecialKeySpace.actor.h"
-#include "fdbclient/TenantManagement.actor.h"
-#include "flow/Arena.h"
-#include "flow/UnitTest.h"
-#include "flow/actorcompiler.h" // This must be the last #include.
-
-ACTOR Future<RangeResult> getTenantList(ReadYourWritesTransaction* ryw, KeyRangeRef kr, GetRangeLimits limitsHint) {
-	state KeyRef managementPrefix =
-	    kr.begin.substr(0,
-	                    SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() +
-	                        TenantMapRangeImpl::submoduleRange.begin.size());
-
-	kr = kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
-	TenantNameRef beginTenant = kr.begin.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
-
-	TenantNameRef endTenant = kr.end;
-	if (endTenant.startsWith(TenantMapRangeImpl::submoduleRange.begin)) {
-		endTenant = endTenant.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
-	} else {
-		endTenant = "\xff"_sr;
-	}
-
-	std::map<TenantName, TenantMapEntry> tenants =
-	    wait(TenantAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, limitsHint.rows));
-
-	RangeResult results;
-	for (auto tenant : tenants) {
-		json_spirit::mObject tenantEntry;
-		tenantEntry["id"] = tenant.second.id;
-		tenantEntry["prefix"] = tenant.second.prefix.toString();
-		std::string tenantEntryString = json_spirit::write_string(json_spirit::mValue(tenantEntry));
-		ValueRef tenantEntryBytes(results.arena(), tenantEntryString);
-		results.push_back(results.arena(),
-		                  KeyValueRef(tenant.first.withPrefix(managementPrefix, results.arena()), tenantEntryBytes));
-	}
-
-	return results;
-}
-
-TenantMapRangeImpl::TenantMapRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
-
-Future<RangeResult> TenantMapRangeImpl::getRange(ReadYourWritesTransaction* ryw,
-                                                 KeyRangeRef kr,
-                                                 GetRangeLimits limitsHint) const {
-	return getTenantList(ryw, kr, limitsHint);
-}
-
-ACTOR Future<Void> createTenants(ReadYourWritesTransaction* ryw, std::vector<TenantNameRef> tenants) {
-	Optional<Value> lastIdVal = wait(ryw->getTransaction().get(tenantLastIdKey));
-	int64_t previousId = lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) : -1;
-
-	std::vector<Future<Void>> createFutures;
-	for (auto tenant : tenants) {
-		createFutures.push_back(
-		    success(TenantAPI::createTenantTransaction(&ryw->getTransaction(), tenant, ++previousId)));
-	}
-
-	ryw->getTransaction().set(tenantLastIdKey, TenantMapEntry::idToPrefix(previousId));
-	wait(waitForAll(createFutures));
-	return Void();
-}
-
-ACTOR Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw,
-                                     TenantNameRef beginTenant,
-                                     TenantNameRef endTenant) {
-	std::map<TenantName, TenantMapEntry> tenants =
-	    wait(TenantAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->TOO_MANY));
-
-	if (tenants.size() == CLIENT_KNOBS->TOO_MANY) {
-		TraceEvent(SevWarn, "DeleteTenantRangeTooLange")
-		    .detail("BeginTenant", beginTenant)
-		    .detail("EndTenant", endTenant);
-		ryw->setSpecialKeySpaceErrorMsg("too many tenants to range delete");
-		throw special_keys_api_failure();
-	}
-
-	std::vector<Future<Void>> deleteFutures;
-	for (auto tenant : tenants) {
-		deleteFutures.push_back(TenantAPI::deleteTenantTransaction(&ryw->getTransaction(), tenant.first));
-	}
-
-	wait(waitForAll(deleteFutures));
-	return Void();
-}
-
-Future<Optional<std::string>> TenantMapRangeImpl::commit(ReadYourWritesTransaction* ryw) {
-	auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
-	std::vector<TenantNameRef> tenantsToCreate;
-	std::vector<Future<Void>> tenantManagementFutures;
-	for (auto range : ranges) {
-		if (!range.value().first) {
-			continue;
-		}
-
-		TenantNameRef tenantName =
-		    range.begin()
-		        .removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
-		        .removePrefix(TenantMapRangeImpl::submoduleRange.begin);
-
-		if (range.value().second.present()) {
-			tenantsToCreate.push_back(tenantName);
-		} else {
-			// For a single key clear, just issue the delete
-			if (KeyRangeRef(range.begin(), range.end()).singleKeyRange()) {
-				tenantManagementFutures.push_back(
-				    TenantAPI::deleteTenantTransaction(&ryw->getTransaction(), tenantName));
-			} else {
-				TenantNameRef endTenant = range.end().removePrefix(
-				    SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
-				if (endTenant.startsWith(submoduleRange.begin)) {
-					endTenant = endTenant.removePrefix(submoduleRange.begin);
-				} else {
-					endTenant = "\xff"_sr;
-				}
-				tenantManagementFutures.push_back(deleteTenantRange(ryw, tenantName, endTenant));
-			}
-		}
-	}
-
-	if (tenantsToCreate.size()) {
-		tenantManagementFutures.push_back(createTenants(ryw, tenantsToCreate));
-	}
-
-	return tag(waitForAll(tenantManagementFutures), Optional<std::string>());
-}
\ No newline at end of file
diff --git a/fdbclient/TenantSpecialKeys.cpp b/fdbclient/TenantSpecialKeys.cpp
new file mode 100644
index 0000000000..05d51b7e93
--- /dev/null
+++ b/fdbclient/TenantSpecialKeys.cpp
@@ -0,0 +1,33 @@
+/*
+ * TenantSpecialKeys.cpp
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fdbclient/TenantSpecialKeys.actor.h"
+
+template <>
+const KeyRangeRef TenantRangeImpl<true>::submoduleRange = KeyRangeRef("tenant/"_sr, "tenant0"_sr);
+
+template <>
+const KeyRangeRef TenantRangeImpl<true>::mapSubRange = KeyRangeRef("map/"_sr, "map0"_sr);
+
+template <>
+const KeyRangeRef TenantRangeImpl<false>::submoduleRange = KeyRangeRef(""_sr, "\xff"_sr);
+
+template <>
+const KeyRangeRef TenantRangeImpl<false>::mapSubRange = KeyRangeRef("tenant_map/"_sr, "tenant_map0"_sr);
diff --git a/fdbclient/include/fdbclient/SpecialKeySpace.actor.h b/fdbclient/include/fdbclient/SpecialKeySpace.actor.h
index 44f1646b8e..fcff6dbb0e 100644
--- a/fdbclient/include/fdbclient/SpecialKeySpace.actor.h
+++ b/fdbclient/include/fdbclient/SpecialKeySpace.actor.h
@@ -538,16 +538,5 @@ public:
 	Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
 };
 
-class TenantMapRangeImpl : public SpecialKeyRangeRWImpl {
-public:
-	const static KeyRangeRef submoduleRange;
-
-	explicit TenantMapRangeImpl(KeyRangeRef kr);
-	Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
-	                             KeyRangeRef kr,
-	                             GetRangeLimits limitsHint) const override;
-	Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
-};
-
 #include "flow/unactorcompiler.h"
 #endif
diff --git a/fdbclient/include/fdbclient/TenantSpecialKeys.actor.h b/fdbclient/include/fdbclient/TenantSpecialKeys.actor.h
new file mode 100644
index 0000000000..9cde887343
--- /dev/null
+++ b/fdbclient/include/fdbclient/TenantSpecialKeys.actor.h
@@ -0,0 +1,209 @@
+/*
+ * TenantSpecialKeys.actor.h
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_TENANT_SPECIAL_KEYS_ACTOR_G_H)
+#define FDBCLIENT_TENANT_SPECIAL_KEYS_ACTOR_G_H
+#include "fdbclient/TenantSpecialKeys.actor.g.h"
+#elif !defined(FDBCLIENT_TENANT_SPECIAL_KEYS_ACTOR_H)
+#define FDBCLIENT_TENANT_SPECIAL_KEYS_ACTOR_H
+
+#include "fdbclient/ActorLineageProfiler.h"
+#include "fdbclient/FDBOptions.g.h"
+#include "fdbclient/Knobs.h"
+#include "fdbclient/DatabaseContext.h"
+#include "fdbclient/SpecialKeySpace.actor.h"
+#include "fdbclient/TenantManagement.actor.h"
+#include "flow/Arena.h"
+#include "flow/UnitTest.h"
+#include "flow/actorcompiler.h" // This must be the last #include.
+
+template <bool HasSubRanges = true>
+class TenantRangeImpl : public SpecialKeyRangeRWImpl {
+private:
+	static KeyRangeRef removePrefix(KeyRangeRef range, KeyRef prefix, KeyRef defaultEnd) {
+		KeyRef begin = range.begin.removePrefix(prefix);
+		KeyRef end;
+		if (range.end.startsWith(prefix)) {
+			end = range.end.removePrefix(prefix);
+		} else {
+			end = defaultEnd;
+		}
+
+		return KeyRangeRef(begin, end);
+	}
+
+	static KeyRef withTenantMapPrefix(KeyRef key, Arena& ar) {
+		int keySize = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() +
+		              TenantRangeImpl::submoduleRange.begin.size() + TenantRangeImpl::mapSubRange.begin.size() +
+		              key.size();
+
+		KeyRef prefixedKey = makeString(keySize, ar);
+		uint8_t* mutableKey = mutateString(prefixedKey);
+
+		mutableKey = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.copyTo(mutableKey);
+		mutableKey = TenantRangeImpl::submoduleRange.begin.copyTo(mutableKey);
+		mutableKey = TenantRangeImpl::mapSubRange.begin.copyTo(mutableKey);
+
+		key.copyTo(mutableKey);
+		return prefixedKey;
+	}
+
+	ACTOR static Future<Void> getTenantList(ReadYourWritesTransaction* ryw,
+	                                        KeyRangeRef kr,
+	                                        RangeResult* results,
+	                                        GetRangeLimits limitsHint) {
+		std::map<TenantName, TenantMapEntry> tenants =
+		    wait(TenantAPI::listTenantsTransaction(&ryw->getTransaction(), kr.begin, kr.end, limitsHint.rows));
+
+		for (auto tenant : tenants) {
+			json_spirit::mObject tenantEntry;
+			tenantEntry["id"] = tenant.second.id;
+			tenantEntry["prefix"] = tenant.second.prefix.toString();
+			std::string tenantEntryString = json_spirit::write_string(json_spirit::mValue(tenantEntry));
+			ValueRef tenantEntryBytes(results->arena(), tenantEntryString);
+			results->push_back(results->arena(),
+			                   KeyValueRef(withTenantMapPrefix(tenant.first, results->arena()), tenantEntryBytes));
+		}
+
+		return Void();
+	}
+
+	ACTOR static Future<RangeResult> getTenantRange(ReadYourWritesTransaction* ryw,
+	                                                KeyRangeRef kr,
+	                                                GetRangeLimits limitsHint) {
+		state RangeResult results;
+
+		kr = kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
+		         .removePrefix(TenantRangeImpl::submoduleRange.begin);
+
+		if (kr.intersects(TenantRangeImpl::mapSubRange)) {
+			GetRangeLimits limits = limitsHint;
+			limits.decrement(results);
+			wait(getTenantList(
+			    ryw,
+			    removePrefix(kr & TenantRangeImpl::mapSubRange, TenantRangeImpl::mapSubRange.begin, "\xff"_sr),
+			    &results,
+			    limits));
+		}
+
+		return results;
+	}
+
+	ACTOR static Future<Void> createTenants(ReadYourWritesTransaction* ryw, std::vector<TenantNameRef> tenants) {
+		Optional<Value> lastIdVal = wait(ryw->getTransaction().get(tenantLastIdKey));
+		int64_t previousId = lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) : -1;
+
+		std::vector<Future<Void>> createFutures;
+		for (auto tenant : tenants) {
+			createFutures.push_back(
+			    success(TenantAPI::createTenantTransaction(&ryw->getTransaction(), tenant, ++previousId)));
+		}
+
+		ryw->getTransaction().set(tenantLastIdKey, TenantMapEntry::idToPrefix(previousId));
+		wait(waitForAll(createFutures));
+		return Void();
+	}
+
+	ACTOR static Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw,
+	                                            TenantName beginTenant,
+	                                            TenantName endTenant) {
+		state std::map<TenantName, TenantMapEntry> tenants = wait(
+		    TenantAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->TOO_MANY));
+
+		if (tenants.size() == CLIENT_KNOBS->TOO_MANY) {
+			TraceEvent(SevWarn, "DeleteTenantRangeTooLange")
+			    .detail("BeginTenant", beginTenant)
+			    .detail("EndTenant", endTenant);
+			ryw->setSpecialKeySpaceErrorMsg(
+			    ManagementAPIError::toJsonString(false, "delete tenants", "too many tenants to range delete"));
+			throw special_keys_api_failure();
+		}
+
+		std::vector<Future<Void>> deleteFutures;
+		for (auto tenant : tenants) {
+			deleteFutures.push_back(TenantAPI::deleteTenantTransaction(&ryw->getTransaction(), tenant.first));
+		}
+		wait(waitForAll(deleteFutures));
+
+		return Void();
+	}
+
+public:
+	const static KeyRangeRef submoduleRange;
+	const static KeyRangeRef mapSubRange;
+
+	explicit TenantRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
+
+	Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
+	                             KeyRangeRef kr,
+	                             GetRangeLimits limitsHint) const override {
+		return getTenantRange(ryw, kr, limitsHint);
+	}
+
+	Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override {
+		auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
+		std::vector<Future<Void>> tenantManagementFutures;
+
+		std::vector<std::pair<KeyRangeRef, Optional<Value>>> mapMutations;
+
+		for (auto range : ranges) {
+			if (!range.value().first) {
+				continue;
+			}
+
+			KeyRangeRef adjustedRange =
+			    range.range()
+			        .removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
+			        .removePrefix(submoduleRange.begin);
+
+			if (mapSubRange.intersects(adjustedRange)) {
+				adjustedRange = mapSubRange & adjustedRange;
+				adjustedRange = removePrefix(adjustedRange, mapSubRange.begin, "\xff"_sr);
+				mapMutations.push_back(std::make_pair(adjustedRange, range.value().second));
+			}
+		}
+
+		std::vector<TenantNameRef> tenantsToCreate;
+		for (auto mapMutation : mapMutations) {
+			TenantNameRef tenantName = mapMutation.first.begin;
+			if (mapMutation.second.present()) {
+				tenantsToCreate.push_back(tenantName);
+			} else {
+				// For a single key clear, just issue the delete
+				if (mapMutation.first.singleKeyRange()) {
+					tenantManagementFutures.push_back(
+					    TenantAPI::deleteTenantTransaction(&ryw->getTransaction(), tenantName));
+				} else {
+					tenantManagementFutures.push_back(deleteTenantRange(ryw, tenantName, mapMutation.first.end));
+				}
+			}
+		}
+
+		if (!tenantsToCreate.empty()) {
+			tenantManagementFutures.push_back(createTenants(ryw, tenantsToCreate));
+		}
+
+		return tag(waitForAll(tenantManagementFutures), Optional<std::string>());
+	}
+};
+
+#include "flow/unactorcompiler.h"
+#endif
\ No newline at end of file
diff --git a/fdbserver/workloads/TenantManagementWorkload.actor.cpp b/fdbserver/workloads/TenantManagementWorkload.actor.cpp
index 5bb2fa5573..6cd21f13f7 100644
--- a/fdbserver/workloads/TenantManagementWorkload.actor.cpp
+++ b/fdbserver/workloads/TenantManagementWorkload.actor.cpp
@@ -22,6 +22,7 @@
 #include <limits>
 #include "fdbclient/FDBOptions.g.h"
 #include "fdbclient/TenantManagement.actor.h"
+#include "fdbclient/TenantSpecialKeys.actor.h"
 #include "fdbrpc/simulator.h"
 #include "fdbserver/workloads/workloads.actor.h"
 #include "fdbserver/Knobs.h"
@@ -49,8 +50,9 @@ struct TenantManagementWorkload : TestWorkload {
 	const TenantName tenantNamePrefix = "tenant_management_workload_"_sr;
 	TenantName localTenantNamePrefix;
 
-	const Key specialKeysTenantMapPrefix = TenantMapRangeImpl::submoduleRange.begin.withPrefix(
-	    SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
+	const Key specialKeysTenantMapPrefix =
+	    TenantRangeImpl<true>::mapSubRange.begin.withPrefix(TenantRangeImpl<true>::submoduleRange.begin.withPrefix(
+	        SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin));
 
 	int maxTenants;
 	double testDuration;

From aff21f062f304c9e618292363d4421d7e705f033 Mon Sep 17 00:00:00 2001
From: "A.J. Beamon" <aj.beamon@snowflake.com>
Date: Fri, 1 Jul 2022 09:50:05 -0700
Subject: [PATCH 2/2] Fix indentation and refactor prefix construction to be in
 forward order

---
 .../src/main/com/apple/foundationdb/TenantManagement.java   | 2 +-
 fdbserver/workloads/TenantManagementWorkload.actor.cpp      | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java b/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java
index a1de1b5d8e..12aaf70322 100644
--- a/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java
+++ b/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java
@@ -41,7 +41,7 @@ import com.apple.foundationdb.tuple.Tuple;
  */
 public class TenantManagement {
 	static byte[] TENANT_MAP_PREFIX = ByteArrayUtil.join(new byte[] { (byte)255, (byte)255 },
-														 "/management/tenant/map/".getBytes());
+	                                                     "/management/tenant/map/".getBytes());
 
 	/**
 	 * Creates a new tenant in the cluster. If the tenant already exists, this operation will complete
diff --git a/fdbserver/workloads/TenantManagementWorkload.actor.cpp b/fdbserver/workloads/TenantManagementWorkload.actor.cpp
index 6cd21f13f7..6453f18e97 100644
--- a/fdbserver/workloads/TenantManagementWorkload.actor.cpp
+++ b/fdbserver/workloads/TenantManagementWorkload.actor.cpp
@@ -50,9 +50,9 @@ struct TenantManagementWorkload : TestWorkload {
 	const TenantName tenantNamePrefix = "tenant_management_workload_"_sr;
 	TenantName localTenantNamePrefix;
 
-	const Key specialKeysTenantMapPrefix =
-	    TenantRangeImpl<true>::mapSubRange.begin.withPrefix(TenantRangeImpl<true>::submoduleRange.begin.withPrefix(
-	        SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin));
+	const Key specialKeysTenantMapPrefix = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT)
+	                                           .begin.withSuffix(TenantRangeImpl<true>::submoduleRange.begin)
+	                                           .withSuffix(TenantRangeImpl<true>::mapSubRange.begin);
 
 	int maxTenants;
 	double testDuration;