diff --git a/bindings/python/tests/fdbcli_tests.py b/bindings/python/tests/fdbcli_tests.py
index bb2746df13..d24ddb876f 100755
--- a/bindings/python/tests/fdbcli_tests.py
+++ b/bindings/python/tests/fdbcli_tests.py
@@ -542,6 +542,103 @@ def triggerddteaminfolog(logger):
     output = run_fdbcli_command('triggerddteaminfolog')
     assert output == 'Triggered team info logging in data distribution.'
 
+@enable_logging()
+def tenants(logger):
+    output = run_fdbcli_command('listtenants')
+    assert output == 'The cluster has no tenants'
+
+    output = run_fdbcli_command('createtenant tenant')
+    assert output == 'The tenant `tenant\' has been created'
+
+    output = run_fdbcli_command('createtenant tenant2')
+    assert output == 'The tenant `tenant2\' has been created'
+
+    output = run_fdbcli_command('listtenants')
+    assert output == '1. tenant\n  2. tenant2'
+
+    output = run_fdbcli_command('listtenants a z 1')
+    assert output == '1. tenant'
+
+    output = run_fdbcli_command('listtenants a tenant2')
+    assert output == '1. tenant'
+
+    output = run_fdbcli_command('listtenants tenant2 z')
+    assert output == '1. tenant2'
+
+    output = run_fdbcli_command('gettenant tenant')
+    lines = output.split('\n')
+    assert len(lines) == 2
+    assert lines[0].strip().startswith('id: ')
+    assert lines[1].strip().startswith('prefix: ')
+    
+    output = run_fdbcli_command('usetenant')
+    assert output == 'Using the default tenant'
+
+    output = run_fdbcli_command_and_get_error('usetenant tenant3')
+    assert output == 'ERROR: Tenant `tenant3\' does not exist'
+
+    # Test writing keys to different tenants and make sure they all work correctly
+    run_fdbcli_command('writemode on; set tenant_test default_tenant')
+    output = run_fdbcli_command('get tenant_test')
+    assert output == '`tenant_test\' is `default_tenant\''
+
+    process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=fdbcli_env)
+    cmd_sequence = ['writemode on', 'usetenant tenant', 'get tenant_test', 'set tenant_test tenant']
+    output, _ = process.communicate(input='\n'.join(cmd_sequence).encode())
+
+    lines = output.decode().strip().split('\n')[-3:]
+    assert lines[0] == 'Using tenant `tenant\''
+    assert lines[1] == '`tenant_test\': not found'
+    assert lines[2].startswith('Committed')
+
+    process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=fdbcli_env)
+    cmd_sequence = ['writemode on', 'usetenant tenant2', 'get tenant_test', 'set tenant_test tenant2', 'get tenant_test']
+    output, _ = process.communicate(input='\n'.join(cmd_sequence).encode())
+
+    lines = output.decode().strip().split('\n')[-4:]
+    assert lines[0] == 'Using tenant `tenant2\''
+    assert lines[1] == '`tenant_test\': not found'
+    assert lines[2].startswith('Committed')
+    assert lines[3] == '`tenant_test\' is `tenant2\''
+
+    process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=fdbcli_env)
+    cmd_sequence = ['usetenant tenant', 'get tenant_test', 'defaulttenant', 'get tenant_test']
+    output, _ = process.communicate(input='\n'.join(cmd_sequence).encode())
+
+    lines = output.decode().strip().split('\n')[-4:]
+    assert lines[0] == 'Using tenant `tenant\''
+    assert lines[1] == '`tenant_test\' is `tenant\''
+    assert lines[2] == 'Using the default tenant'
+    assert lines[3] == '`tenant_test\' is `default_tenant\''
+
+    process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=fdbcli_env)
+    cmd_sequence = ['writemode on', 'usetenant tenant', 'clear tenant_test', 'deletetenant tenant', 'get tenant_test', 'defaulttenant', 'usetenant tenant']
+    output, error_output = process.communicate(input='\n'.join(cmd_sequence).encode())
+
+    lines = output.decode().strip().split('\n')[-7:]
+    error_lines = error_output.decode().strip().split('\n')[-2:]
+    assert lines[0] == 'Using tenant `tenant\''
+    assert lines[1].startswith('Committed')
+    assert lines[2] == 'The tenant `tenant\' has been deleted'
+    assert lines[3] == 'WARNING: the active tenant was deleted. Use the `usetenant\' or `defaulttenant\''
+    assert lines[4] == 'command to choose a new tenant.'
+    assert error_lines[0] == 'ERROR: Tenant does not exist (2131)'
+    assert lines[6] == 'Using the default tenant'
+    assert error_lines[1] == 'ERROR: Tenant `tenant\' does not exist'
+
+    process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=fdbcli_env)
+    cmd_sequence = ['writemode on', 'deletetenant tenant2', 'usetenant tenant2', 'clear tenant_test', 'defaulttenant', 'deletetenant tenant2']
+    output, error_output = process.communicate(input='\n'.join(cmd_sequence).encode())
+
+    lines = output.decode().strip().split('\n')[-4:]
+    error_lines = error_output.decode().strip().split('\n')[-1:]
+    assert error_lines[0] == 'ERROR: Cannot delete a non-empty tenant (2133)'
+    assert lines[0] == 'Using tenant `tenant2\''
+    assert lines[1].startswith('Committed')
+    assert lines[2] == 'Using the default tenant'
+    assert lines[3] == 'The tenant `tenant2\' has been deleted'
+
+    run_fdbcli_command('writemode on; clear tenant_test')
 
 if __name__ == '__main__':
     parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
@@ -586,6 +683,7 @@ if __name__ == '__main__':
         transaction()
         throttle()
         triggerddteaminfolog()
+        tenants()
     else:
         assert args.process_number > 1, "Process number should be positive"
         coordinators()
diff --git a/documentation/sphinx/source/command-line-interface.rst b/documentation/sphinx/source/command-line-interface.rst
index 6d2243f02f..72d7da54ef 100644
--- a/documentation/sphinx/source/command-line-interface.rst
+++ b/documentation/sphinx/source/command-line-interface.rst
@@ -153,6 +153,27 @@ If ``description=<DESC>`` is specified, the description field in the cluster fil
 
 For more information on setting the cluster description, see :ref:`configuration-setting-cluster-description`.
 
+createtenant
+------------
+
+The ``createtenant`` command is used to create new tenants in the cluster. Its syntax is ``createtenant <TENANT_NAME>``.
+
+The tenant name can be any byte string that does not begin with the ``\xff`` byte. If the tenant already exists, ``fdbcli`` will report an error.
+
+defaulttenant
+-------------
+
+The ``defaulttenant`` command configures ``fdbcli`` to run its commands without a tenant. This is the default behavior.
+
+The active tenant cannot be changed while a transaction (using ``begin``) is open.
+
+deletetenant
+------------
+
+The ``deletetenant`` command is used to delete tenants from the cluster. Its syntax is ``deletetenant <TENANT_NAME>``.
+
+In order to delete a tenant, it must be empty. To delete a tenant with data, first clear that data using the ``clear`` command. If the tenant does not exist, ``fdbcli`` will report an error.
+
 exclude
 -------
 
@@ -210,6 +231,13 @@ The ``getrangekeys`` command fetches keys in a range. Its syntax is ``getrangeke
 
 Note that :ref:`characters can be escaped <cli-escaping>` when specifying keys (or values) in ``fdbcli``.
 
+gettenant
+---------
+
+The ``gettenant`` command fetches metadata for a given tenant and displays it. Its syntax is ``gettenant <TENANT_NAME>``.
+
+Included in the output of this command are the ``id`` and ``prefix`` assigned to the tenant. If the tenant does not exist, ``fdbcli`` will report an error.
+
 getversion
 ----------
 
@@ -300,6 +328,13 @@ Attempts to kill all specified processes. Each address should include the IP and
 
 Attempts to kill all known processes in the cluster.
 
+listtenants
+-----------
+
+The ``listtenants`` command prints the names of tenants in the cluster. Its syntax is ``listtenants [BEGIN] [END] [LIMIT]``.
+
+By default, the ``listtenants`` command will print up to 100 entries from the entire range of tenants. A narrower sub-range can be printed using the optional ``[BEGIN]`` and ``[END]`` parameters, and the limit can be changed by specifying an integer ``[LIMIT]`` parameter.
+
 lock
 ----
 
@@ -512,6 +547,17 @@ unlock
 
 The ``unlock`` command unlocks the database with the specified lock UID. Because this is a potentially dangerous operation, users must copy a passphrase before the unlock command is executed.
 
+usetenant
+---------
+
+The ``usetenant`` command configures ``fdbcli`` to run transactions within the specified tenant. Its syntax is ``usetenant <TENANT_NAME>``.
+
+When configured, transactions will read and write keys from the key-space associated with the specified tenant. By default, ``fdbcli`` runs without a tenant. Management operations that modify keys (e.g. ``exclude``) will not operate within the tenant.
+
+If the tenant chosen does not exist, ``fdbcli`` will report an error.
+
+The active tenant cannot be changed while a transaction (using ``begin``) is open.
+
 writemode
 ---------
 
diff --git a/fdbcli/BlobRangeCommand.actor.cpp b/fdbcli/BlobRangeCommand.actor.cpp
index 33a34e7e7b..97bfc98872 100644
--- a/fdbcli/BlobRangeCommand.actor.cpp
+++ b/fdbcli/BlobRangeCommand.actor.cpp
@@ -62,12 +62,27 @@ ACTOR Future<Void> setBlobRange(Database db, Key startKey, Key endKey, Value val
 
 namespace fdb_cli {
 
-ACTOR Future<bool> blobRangeCommandActor(Database localDb, std::vector<StringRef> tokens) {
+ACTOR Future<bool> blobRangeCommandActor(Database localDb,
+                                         Optional<TenantMapEntry> tenantEntry,
+                                         std::vector<StringRef> tokens) {
 	// enables blob writing for the given range
 	if (tokens.size() != 4) {
 		printUsage(tokens[0]);
 		return false;
-	} else if (tokens[3] > LiteralStringRef("\xff")) {
+	}
+
+	Key begin;
+	Key end;
+
+	if (tenantEntry.present()) {
+		begin = tokens[2].withPrefix(tenantEntry.get().prefix);
+		end = tokens[3].withPrefix(tenantEntry.get().prefix);
+	} else {
+		begin = tokens[2];
+		end = tokens[3];
+	}
+
+	if (end > LiteralStringRef("\xff")) {
 		// TODO is this something we want?
 		printf("Cannot blobbify system keyspace! Problematic End Key: %s\n", tokens[3].printable().c_str());
 		return false;
@@ -78,12 +93,12 @@ ACTOR Future<bool> blobRangeCommandActor(Database localDb, std::vector<StringRef
 			printf("Starting blobbify range for [%s - %s)\n",
 			       tokens[2].printable().c_str(),
 			       tokens[3].printable().c_str());
-			wait(setBlobRange(localDb, tokens[2], tokens[3], LiteralStringRef("1")));
+			wait(setBlobRange(localDb, begin, end, LiteralStringRef("1")));
 		} else if (tokencmp(tokens[1], "stop")) {
 			printf("Stopping blobbify range for [%s - %s)\n",
 			       tokens[2].printable().c_str(),
 			       tokens[3].printable().c_str());
-			wait(setBlobRange(localDb, tokens[2], tokens[3], StringRef()));
+			wait(setBlobRange(localDb, begin, end, StringRef()));
 		} else {
 			printUsage(tokens[0]);
 			printf("Usage: blobrange <start|stop> <startkey> <endkey>");
diff --git a/fdbcli/CMakeLists.txt b/fdbcli/CMakeLists.txt
index c54338fa95..29e831f859 100644
--- a/fdbcli/CMakeLists.txt
+++ b/fdbcli/CMakeLists.txt
@@ -24,6 +24,7 @@ set(FDBCLI_SRCS
   SnapshotCommand.actor.cpp
   StatusCommand.actor.cpp
   SuspendCommand.actor.cpp
+  TenantCommands.actor.cpp
   ThrottleCommand.actor.cpp
   TriggerDDTeamInfoLogCommand.actor.cpp
   TssqCommand.actor.cpp
diff --git a/fdbcli/ChangeFeedCommand.actor.cpp b/fdbcli/ChangeFeedCommand.actor.cpp
index 0cebc508ec..796be8f337 100644
--- a/fdbcli/ChangeFeedCommand.actor.cpp
+++ b/fdbcli/ChangeFeedCommand.actor.cpp
@@ -75,7 +75,10 @@ ACTOR Future<Void> requestVersionUpdate(Database localDb, Reference<ChangeFeedDa
 	}
 }
 
-ACTOR Future<bool> changeFeedCommandActor(Database localDb, std::vector<StringRef> tokens, Future<Void> warn) {
+ACTOR Future<bool> changeFeedCommandActor(Database localDb,
+                                          Optional<TenantMapEntry> tenantEntry,
+                                          std::vector<StringRef> tokens,
+                                          Future<Void> warn) {
 	if (tokens.size() == 1) {
 		printUsage(tokens[0]);
 		return false;
@@ -92,8 +95,15 @@ ACTOR Future<bool> changeFeedCommandActor(Database localDb, std::vector<StringRe
 			printUsage(tokens[0]);
 			return false;
 		}
-		wait(updateChangeFeed(
-		    localDb, tokens[2], ChangeFeedStatus::CHANGE_FEED_CREATE, KeyRangeRef(tokens[3], tokens[4])));
+
+		KeyRange range;
+		if (tenantEntry.present()) {
+			range = KeyRangeRef(tokens[3], tokens[4]).withPrefix(tenantEntry.get().prefix);
+		} else {
+			range = KeyRangeRef(tokens[3], tokens[4]);
+		}
+
+		wait(updateChangeFeed(localDb, tokens[2], ChangeFeedStatus::CHANGE_FEED_CREATE, range));
 	} else if (tokencmp(tokens[1], "stop")) {
 		if (tokens.size() != 3) {
 			printUsage(tokens[0]);
diff --git a/fdbcli/ConfigureCommand.actor.cpp b/fdbcli/ConfigureCommand.actor.cpp
index 3570f8dbd2..03368a6f9e 100644
--- a/fdbcli/ConfigureCommand.actor.cpp
+++ b/fdbcli/ConfigureCommand.actor.cpp
@@ -264,7 +264,8 @@ CommandFactory configureFactory(
         "<single|double|triple|three_data_hall|three_datacenter|ssd|memory|memory-radixtree-beta|proxies=<PROXIES>|"
         "commit_proxies=<COMMIT_PROXIES>|grv_proxies=<GRV_PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*|"
         "count=<TSS_COUNT>|perpetual_storage_wiggle=<WIGGLE_SPEED>|perpetual_storage_wiggle_locality="
-        "<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>|storage_migration_type={disabled|gradual|aggressive}",
+        "<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>|storage_migration_type={disabled|gradual|aggressive}"
+        "|tenant_mode={disabled|optional_experimental|required_experimental}",
         "change the database configuration",
         "The `new' option, if present, initializes a new database with the given configuration rather than changing "
         "the configuration of an existing one. When used, both a redundancy mode and a storage engine must be "
@@ -295,6 +296,10 @@ CommandFactory configureFactory(
         "perpetual_storage_wiggle_locality=<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>: Set the process filter for wiggling. "
         "The processes that match the given locality key and locality value are only wiggled. The value 0 will disable "
         "the locality filter and matches all the processes for wiggling.\n\n"
+        "tenant_mode=<disabled|optional_experimental|required_experimental>: Sets the tenant mode for the cluster. If "
+        "optional, then transactions can be run with or without specifying tenants. If required, all data must be "
+        "accessed using tenants.\n\n"
+
         "See the FoundationDB Administration Guide for more information."));
 
 } // namespace fdb_cli
diff --git a/fdbcli/TenantCommands.actor.cpp b/fdbcli/TenantCommands.actor.cpp
new file mode 100644
index 0000000000..c03bb17c88
--- /dev/null
+++ b/fdbcli/TenantCommands.actor.cpp
@@ -0,0 +1,249 @@
+/*
+ * TenantCommands.actor.cpp
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fdbcli/fdbcli.actor.h"
+
+#include "fdbclient/FDBOptions.g.h"
+#include "fdbclient/IClientApi.h"
+#include "fdbclient/Knobs.h"
+#include "fdbclient/ManagementAPI.actor.h"
+#include "fdbclient/Schemas.h"
+
+#include "flow/Arena.h"
+#include "flow/FastRef.h"
+#include "flow/ThreadHelper.actor.h"
+#include "flow/actorcompiler.h" // This must be the last #include.
+
+namespace fdb_cli {
+
+const KeyRangeRef tenantSpecialKeyRange(LiteralStringRef("\xff\xff/management/tenant_map/"),
+                                        LiteralStringRef("\xff\xff/management/tenant_map0"));
+
+// createtenant command
+ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
+	if (tokens.size() != 2) {
+		printUsage(tokens[0]);
+		return false;
+	}
+
+	state Key tenantNameKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(tokens[1]);
+	state Reference<ITransaction> tr = db->createTransaction();
+	state bool doneExistenceCheck = false;
+
+	loop {
+		tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
+		try {
+			if (!doneExistenceCheck) {
+				Optional<Value> existingTenant = wait(safeThreadFutureToFuture(tr->get(tenantNameKey)));
+				if (existingTenant.present()) {
+					throw tenant_already_exists();
+				}
+				doneExistenceCheck = true;
+			}
+
+			tr->set(tenantNameKey, ValueRef());
+			wait(safeThreadFutureToFuture(tr->commit()));
+			break;
+		} catch (Error& e) {
+			state Error err(e);
+			if (e.code() == error_code_special_keys_api_failure) {
+				std::string errorMsgStr = wait(fdb_cli::getSpecialKeysFailureErrorMessage(tr));
+				fprintf(stderr, "ERROR: %s\n", errorMsgStr.c_str());
+				return false;
+			}
+			wait(safeThreadFutureToFuture(tr->onError(err)));
+		}
+	}
+
+	printf("The tenant `%s' has been created\n", printable(tokens[1]).c_str());
+	return true;
+}
+
+CommandFactory createTenantFactory("createtenant",
+                                   CommandHelp("createtenant <TENANT_NAME>",
+                                               "creates a new tenant in the cluster",
+                                               "Creates a new tenant in the cluster with the specified name."));
+
+// deletetenant command
+ACTOR Future<bool> deleteTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
+	if (tokens.size() != 2) {
+		printUsage(tokens[0]);
+		return false;
+	}
+
+	state Key tenantNameKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(tokens[1]);
+	state Reference<ITransaction> tr = db->createTransaction();
+	state bool doneExistenceCheck = false;
+
+	loop {
+		tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
+		try {
+			if (!doneExistenceCheck) {
+				Optional<Value> existingTenant = wait(safeThreadFutureToFuture(tr->get(tenantNameKey)));
+				if (!existingTenant.present()) {
+					throw tenant_not_found();
+				}
+				doneExistenceCheck = true;
+			}
+
+			tr->clear(tenantNameKey);
+			wait(safeThreadFutureToFuture(tr->commit()));
+			break;
+		} catch (Error& e) {
+			state Error err(e);
+			if (e.code() == error_code_special_keys_api_failure) {
+				std::string errorMsgStr = wait(fdb_cli::getSpecialKeysFailureErrorMessage(tr));
+				fprintf(stderr, "ERROR: %s\n", errorMsgStr.c_str());
+				return false;
+			}
+			wait(safeThreadFutureToFuture(tr->onError(err)));
+		}
+	}
+
+	printf("The tenant `%s' has been deleted\n", printable(tokens[1]).c_str());
+	return true;
+}
+
+CommandFactory deleteTenantFactory(
+    "deletetenant",
+    CommandHelp(
+        "deletetenant <TENANT_NAME>",
+        "deletes a tenant from the cluster",
+        "Deletes a tenant from the cluster. Deletion will be allowed only if the specified tenant contains no data."));
+
+// listtenants command
+ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
+	if (tokens.size() > 4) {
+		printUsage(tokens[0]);
+		return false;
+	}
+
+	StringRef beginTenant = ""_sr;
+	StringRef endTenant = "\xff\xff"_sr;
+	state int limit = 100;
+
+	if (tokens.size() >= 2) {
+		beginTenant = tokens[1];
+	}
+	if (tokens.size() >= 3) {
+		endTenant = tokens[2];
+		if (endTenant <= beginTenant) {
+			fprintf(stderr, "ERROR: end must be larger than begin");
+			return false;
+		}
+	}
+	if (tokens.size() == 4) {
+		int n = 0;
+		if (sscanf(tokens[3].toString().c_str(), "%d%n", &limit, &n) != 1 || n != tokens[3].size()) {
+			fprintf(stderr, "ERROR: invalid limit %s\n", tokens[3].toString().c_str());
+			return false;
+		}
+	}
+
+	state Key beginTenantKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(beginTenant);
+	state Key endTenantKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(endTenant);
+	state Reference<ITransaction> tr = db->createTransaction();
+
+	loop {
+		try {
+			RangeResult tenants = wait(safeThreadFutureToFuture(
+			    tr->getRange(firstGreaterOrEqual(beginTenantKey), firstGreaterOrEqual(endTenantKey), limit)));
+
+			if (tenants.empty()) {
+				if (tokens.size() == 1) {
+					printf("The cluster has no tenants\n");
+				} else {
+					printf("The cluster has no tenants in the specified range\n");
+				}
+			}
+
+			int index = 0;
+			for (auto tenant : tenants) {
+				printf("  %d. %s\n",
+				       ++index,
+				       printable(tenant.key.removePrefix(fdb_cli::tenantSpecialKeyRange.begin)).c_str());
+			}
+
+			return true;
+		} catch (Error& e) {
+			state Error err(e);
+			if (e.code() == error_code_special_keys_api_failure) {
+				std::string errorMsgStr = wait(fdb_cli::getSpecialKeysFailureErrorMessage(tr));
+				fprintf(stderr, "ERROR: %s\n", errorMsgStr.c_str());
+				return false;
+			}
+			wait(safeThreadFutureToFuture(tr->onError(err)));
+		}
+	}
+}
+
+CommandFactory listTenantsFactory(
+    "listtenants",
+    CommandHelp("listtenants [BEGIN] [END] [LIMIT]",
+                "print a list of tenants in the cluster",
+                "Print a list of tenants in the cluster. Only tenants in the range [BEGIN] - [END] will be printed. "
+                "The number of tenants to print can be specified using the [LIMIT] parameter, which defaults to 100."));
+
+// gettenant command
+ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
+	if (tokens.size() != 2) {
+		printUsage(tokens[0]);
+		return false;
+	}
+
+	state Key tenantNameKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(tokens[1]);
+	state Reference<ITransaction> tr = db->createTransaction();
+
+	loop {
+		try {
+			Optional<Value> tenant = wait(safeThreadFutureToFuture(tr->get(tenantNameKey)));
+			if (!tenant.present()) {
+				throw tenant_not_found();
+			}
+
+			json_spirit::mValue jsonObject;
+			json_spirit::read_string(tenant.get().toString(), jsonObject);
+			JSONDoc doc(jsonObject);
+
+			int64_t id;
+			std::string prefix;
+			doc.get("id", id);
+			doc.get("prefix", prefix);
+
+			printf("  id: %" PRId64 "\n", id);
+			printf("  prefix: %s\n", printable(prefix).c_str());
+			return true;
+		} catch (Error& e) {
+			state Error err(e);
+			if (e.code() == error_code_special_keys_api_failure) {
+				std::string errorMsgStr = wait(fdb_cli::getSpecialKeysFailureErrorMessage(tr));
+				fprintf(stderr, "ERROR: %s\n", errorMsgStr.c_str());
+				return false;
+			}
+			wait(safeThreadFutureToFuture(tr->onError(err)));
+		}
+	}
+}
+
+CommandFactory getTenantFactory("gettenant",
+                                CommandHelp("gettenant <TENANT_NAME>",
+                                            "prints the metadata for a tenant",
+                                            "Prints the metadata for a tenant."));
+} // namespace fdb_cli
diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp
index 069692346f..7b6b4b9834 100644
--- a/fdbcli/fdbcli.actor.cpp
+++ b/fdbcli/fdbcli.actor.cpp
@@ -55,6 +55,7 @@
 #include "fdbcli/fdbcli.actor.h"
 
 #include <cinttypes>
+#include <cstdio>
 #include <type_traits>
 #include <signal.h>
 
@@ -530,6 +531,19 @@ void initHelp() {
 	helpMap["writemode"] = CommandHelp("writemode <on|off>",
 	                                   "enables or disables sets and clears",
 	                                   "Setting or clearing keys from the CLI is not recommended.");
+	helpMap["usetenant"] =
+	    CommandHelp("usetenant [NAME]",
+	                "prints or configures the tenant used for transactions",
+	                "If no name is given, prints the name of the current active tenant. Otherwise, all commands that "
+	                "are used to read or write keys are done inside the tenant with the specified NAME. By default, "
+	                "no tenant is configured and operations are performed on the raw key-space. The tenant cannot be "
+	                "configured while a transaction started with `begin' is open.");
+	helpMap["defaulttenant"] =
+	    CommandHelp("defaulttenant",
+	                "configures transactions to not use a named tenant",
+	                "All commands that are used to read or write keys will be done without a tenant and will operate "
+	                "on the raw key-space. This is the default behavior. The tenant cannot be configured while a "
+	                "transaction started with `begin' is open.");
 }
 
 void printVersion() {
@@ -670,13 +684,18 @@ ACTOR Future<bool> createSnapshot(Database db, std::vector<StringRef> tokens) {
 
 // TODO: Update the function to get rid of the Database after refactoring
 Reference<ITransaction> getTransaction(Reference<IDatabase> db,
+                                       Reference<ITenant> tenant,
                                        Reference<ITransaction>& tr,
                                        FdbOptions* options,
                                        bool intrans) {
 	// Update "tr" to point to a brand new transaction object when it's not initialized or "intrans" flag is "false",
 	// which indicates we need a new transaction object
 	if (!tr || !intrans) {
-		tr = db->createTransaction();
+		if (tenant) {
+			tr = tenant->createTransaction();
+		} else {
+			tr = db->createTransaction();
+		}
 		options->apply(tr);
 	}
 
@@ -769,7 +788,8 @@ void configureGenerator(const char* text, const char* line, std::vector<std::str
 		                   "resolvers=",
 		                   "perpetual_storage_wiggle=",
 		                   "perpetual_storage_wiggle_locality=",
-		                   "storage_migration_type=",
+		                   "storage_migration_type="
+		                   "tenant_mode=",
 		                   nullptr };
 	arrayGenerator(text, line, opts, lc);
 }
@@ -1152,6 +1172,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 
 	state Database localDb;
 	state Reference<IDatabase> db;
+	state Reference<ITenant> tenant;
+	state Optional<Standalone<StringRef>> tenantName;
+	state Optional<TenantMapEntry> tenantEntry;
+
+	// This tenant is kept empty for operations that perform management tasks (e.g. killing a process)
+	state const Reference<ITenant> managementTenant;
+
 	state Reference<ITransaction> tr;
 	state Transaction trx;
 
@@ -1212,7 +1239,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 	// The 3.0 timeout is a guard to avoid waiting forever when the cli cannot talk to any coordinators
 	loop {
 		try {
-			getTransaction(db, tr, options, intrans);
+			getTransaction(db, managementTenant, tr, options, intrans);
 			tr->setOption(FDBTransactionOptions::LOCK_AWARE);
 			wait(delay(3.0) || success(safeThreadFutureToFuture(tr->getReadVersion())));
 			break;
@@ -1373,8 +1400,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 				}
 
 				if (tokencmp(tokens[0], "waitopen")) {
-					wait(makeInterruptable(
-					    success(safeThreadFutureToFuture(getTransaction(db, tr, options, intrans)->getReadVersion()))));
+					wait(makeInterruptable(success(safeThreadFutureToFuture(
+					    getTransaction(db, managementTenant, tr, options, intrans)->getReadVersion()))));
 					continue;
 				}
 
@@ -1478,14 +1505,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 				}
 
 				if (tokencmp(tokens[0], "changefeed")) {
-					bool _result = wait(makeInterruptable(changeFeedCommandActor(localDb, tokens, warn)));
+					bool _result = wait(makeInterruptable(changeFeedCommandActor(localDb, tenantEntry, tokens, warn)));
 					if (!_result)
 						is_error = true;
 					continue;
 				}
 
 				if (tokencmp(tokens[0], "blobrange")) {
-					bool _result = wait(makeInterruptable(blobRangeCommandActor(localDb, tokens)));
+					bool _result = wait(makeInterruptable(blobRangeCommandActor(localDb, tenantEntry, tokens)));
 					if (!_result)
 						is_error = true;
 					continue;
@@ -1536,7 +1563,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 					} else {
 						activeOptions = FdbOptions(globalOptions);
 						options = &activeOptions;
-						getTransaction(db, tr, options, false);
+						getTransaction(db, tenant, tr, options, false);
 						intrans = true;
 						printf("Transaction started\n");
 					}
@@ -1597,7 +1624,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 						is_error = true;
 					} else {
 						state ThreadFuture<Optional<Value>> valueF =
-						    getTransaction(db, tr, options, intrans)->get(tokens[1]);
+						    getTransaction(db, tenant, tr, options, intrans)->get(tokens[1]);
 						Optional<Standalone<StringRef>> v = wait(makeInterruptable(safeThreadFutureToFuture(valueF)));
 
 						if (v.present())
@@ -1613,8 +1640,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 						printUsage(tokens[0]);
 						is_error = true;
 					} else {
-						Version v = wait(makeInterruptable(
-						    safeThreadFutureToFuture(getTransaction(db, tr, options, intrans)->getReadVersion())));
+						Version v = wait(makeInterruptable(safeThreadFutureToFuture(
+						    getTransaction(db, tenant, tr, options, intrans)->getReadVersion())));
 						fmt::print("{}\n", v);
 					}
 					continue;
@@ -1628,7 +1655,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 				}
 
 				if (tokencmp(tokens[0], "kill")) {
-					getTransaction(db, tr, options, intrans);
+					getTransaction(db, managementTenant, tr, options, intrans);
 					bool _result = wait(makeInterruptable(killCommandActor(db, tr, tokens, &address_interface)));
 					if (!_result)
 						is_error = true;
@@ -1636,7 +1663,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 				}
 
 				if (tokencmp(tokens[0], "suspend")) {
-					getTransaction(db, tr, options, intrans);
+					getTransaction(db, managementTenant, tr, options, intrans);
 					bool _result = wait(makeInterruptable(suspendCommandActor(db, tr, tokens, &address_interface)));
 					if (!_result)
 						is_error = true;
@@ -1658,7 +1685,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 				}
 
 				if (tokencmp(tokens[0], "consistencycheck")) {
-					getTransaction(db, tr, options, intrans);
+					getTransaction(db, managementTenant, tr, options, intrans);
 					bool _result = wait(makeInterruptable(consistencyCheckCommandActor(tr, tokens, intrans)));
 					if (!_result)
 						is_error = true;
@@ -1666,7 +1693,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 				}
 
 				if (tokencmp(tokens[0], "profile")) {
-					getTransaction(db, tr, options, intrans);
+					getTransaction(db, managementTenant, tr, options, intrans);
 					bool _result = wait(makeInterruptable(profileCommandActor(tr, tokens, intrans)));
 					if (!_result)
 						is_error = true;
@@ -1674,7 +1701,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 				}
 
 				if (tokencmp(tokens[0], "expensive_data_check")) {
-					getTransaction(db, tr, options, intrans);
+					getTransaction(db, managementTenant, tr, options, intrans);
 					bool _result =
 					    wait(makeInterruptable(expensiveDataCheckCommandActor(db, tr, tokens, &address_interface)));
 					if (!_result)
@@ -1734,8 +1761,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 							endKey = strinc(tokens[1]);
 						}
 
-						state ThreadFuture<RangeResult> kvsF =
-						    getTransaction(db, tr, options, intrans)->getRange(KeyRangeRef(tokens[1], endKey), limit);
+						getTransaction(db, tenant, tr, options, intrans);
+						state ThreadFuture<RangeResult> kvsF = tr->getRange(KeyRangeRef(tokens[1], endKey), limit);
 						RangeResult kvs = wait(makeInterruptable(safeThreadFutureToFuture(kvsF)));
 
 						printf("\nRange limited to %d keys\n", limit);
@@ -1779,7 +1806,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 						printUsage(tokens[0]);
 						is_error = true;
 					} else {
-						getTransaction(db, tr, options, intrans);
+						getTransaction(db, tenant, tr, options, intrans);
 						tr->set(tokens[1], tokens[2]);
 
 						if (!intrans) {
@@ -1800,7 +1827,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 						printUsage(tokens[0]);
 						is_error = true;
 					} else {
-						getTransaction(db, tr, options, intrans);
+						getTransaction(db, tenant, tr, options, intrans);
 						tr->clear(tokens[1]);
 
 						if (!intrans) {
@@ -1821,7 +1848,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 						printUsage(tokens[0]);
 						is_error = true;
 					} else {
-						getTransaction(db, tr, options, intrans);
+						getTransaction(db, tenant, tr, options, intrans);
 						tr->clear(KeyRangeRef(tokens[1], tokens[2]));
 
 						if (!intrans) {
@@ -1910,6 +1937,86 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 					continue;
 				}
 
+				if (tokencmp(tokens[0], "usetenant")) {
+					if (tokens.size() > 2) {
+						printUsage(tokens[0]);
+						is_error = true;
+					} else if (intrans && tokens.size() == 2) {
+						fprintf(stderr, "ERROR: Tenant cannot be changed while a transaction is open\n");
+						is_error = true;
+					} else if (tokens.size() == 1) {
+						if (!tenantName.present()) {
+							printf("Using the default tenant\n");
+						} else {
+							printf("Using tenant `%s'\n", printable(tenantName.get()).c_str());
+						}
+					} else {
+						Optional<TenantMapEntry> entry =
+						    wait(makeInterruptable(ManagementAPI::tryGetTenant(db, tokens[1])));
+						if (!entry.present()) {
+							fprintf(stderr, "ERROR: Tenant `%s' does not exist\n", printable(tokens[1]).c_str());
+							is_error = true;
+						} else {
+							tenant = db->openTenant(tokens[1]);
+							tenantName = tokens[1];
+							tenantEntry = entry;
+							printf("Using tenant `%s'\n", printable(tokens[1]).c_str());
+						}
+					}
+
+					continue;
+				}
+
+				if (tokencmp(tokens[0], "defaulttenant")) {
+					if (tokens.size() != 1) {
+						printUsage(tokens[0]);
+						is_error = true;
+					} else if (intrans) {
+						fprintf(stderr, "ERROR: Tenant cannot be changed while a transaction is open\n");
+						is_error = true;
+					} else {
+						tenant = Reference<ITenant>();
+						tenantName = Optional<Standalone<StringRef>>();
+						tenantEntry = Optional<TenantMapEntry>();
+						printf("Using the default tenant\n");
+					}
+
+					continue;
+				}
+
+				if (tokencmp(tokens[0], "createtenant")) {
+					bool _result = wait(makeInterruptable(createTenantCommandActor(db, tokens)));
+					if (!_result)
+						is_error = true;
+					continue;
+				}
+
+				if (tokencmp(tokens[0], "deletetenant")) {
+					bool _result = wait(makeInterruptable(deleteTenantCommandActor(db, tokens)));
+					if (!_result)
+						is_error = true;
+					else if (tenantName.present() && tokens[1] == tenantName.get()) {
+						printAtCol("WARNING: the active tenant was deleted. Use the `usetenant' or `defaulttenant' "
+						           "command to choose a new tenant.\n",
+						           80);
+					}
+					continue;
+				}
+
+				if (tokencmp(tokens[0], "listtenants")) {
+					bool _result = wait(makeInterruptable(listTenantsCommandActor(db, tokens)));
+					if (!_result)
+						is_error = true;
+					continue;
+				}
+
+				if (tokencmp(tokens[0], "gettenant")) {
+					bool _result = wait(makeInterruptable(getTenantCommandActor(db, tokens)));
+					if (!_result)
+						is_error = true;
+					continue;
+				}
+
 				fprintf(stderr, "ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
 				is_error = true;
 			}
@@ -1917,8 +2024,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
 			TraceEvent(SevInfo, "CLICommandLog", randomID).detail("Command", line).detail("IsError", is_error);
 
 		} catch (Error& e) {
-			if (e.code() != error_code_actor_cancelled)
+			if (e.code() == error_code_tenant_name_required) {
+				printAtCol("ERROR: tenant name required. Use the `usetenant' command to select a tenant or enable the "
+				           "`RAW_ACCESS' option to read raw keys.",
+				           80,
+				           stderr);
+			} else if (e.code() != error_code_actor_cancelled) {
 				fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
+			}
 			is_error = true;
 			if (intrans) {
 				printf("Rolling back current transaction\n");
diff --git a/fdbcli/fdbcli.actor.h b/fdbcli/fdbcli.actor.h
index 0a8cbf8796..afd48bcb3c 100644
--- a/fdbcli/fdbcli.actor.h
+++ b/fdbcli/fdbcli.actor.h
@@ -138,8 +138,12 @@ ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr,
                                                 bool intrans);
 // coordinators command
 ACTOR Future<bool> coordinatorsCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
+// createtenant command
+ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
 // datadistribution command
 ACTOR Future<bool> dataDistributionCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
+// deletetenant command
+ACTOR Future<bool> deleteTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
 // exclude command
 ACTOR Future<bool> excludeCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens, Future<Void> warn);
 // expensive_data_check command
@@ -155,6 +159,8 @@ ACTOR Future<bool> fileConfigureCommandActor(Reference<IDatabase> db,
                                              bool force);
 // force_recovery_with_data_loss command
 ACTOR Future<bool> forceRecoveryWithDataLossCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
+// gettenant command
+ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
 // include command
 ACTOR Future<bool> includeCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
 // kill command
@@ -162,13 +168,20 @@ ACTOR Future<bool> killCommandActor(Reference<IDatabase> db,
                                     Reference<ITransaction> tr,
                                     std::vector<StringRef> tokens,
                                     std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface);
+// listtenants command
+ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
 // lock/unlock command
 ACTOR Future<bool> lockCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
 ACTOR Future<bool> unlockDatabaseActor(Reference<IDatabase> db, UID uid);
 // changefeed command
-ACTOR Future<bool> changeFeedCommandActor(Database localDb, std::vector<StringRef> tokens, Future<Void> warn);
+ACTOR Future<bool> changeFeedCommandActor(Database localDb,
+                                          Optional<TenantMapEntry> tenantEntry,
+                                          std::vector<StringRef> tokens,
+                                          Future<Void> warn);
 // blobrange command
-ACTOR Future<bool> blobRangeCommandActor(Database localDb, std::vector<StringRef> tokens);
+ACTOR Future<bool> blobRangeCommandActor(Database localDb,
+                                         Optional<TenantMapEntry> tenantEntry,
+                                         std::vector<StringRef> tokens);
 // maintenance command
 ACTOR Future<bool> setHealthyZone(Reference<IDatabase> db, StringRef zoneId, double seconds, bool printWarning = false);
 ACTOR Future<bool> clearHealthyZone(Reference<IDatabase> db,