Some initial metacluster implementation

This commit is contained in:
A.J. Beamon 2022-05-02 09:32:04 -07:00
parent be0c7a8884
commit 7e860dc92c
28 changed files with 1188 additions and 233 deletions

View File

@ -346,7 +346,7 @@ function createDatabase
# Configure the database.
else
"${BINDIR}/fdbcli" -C "${FDBCONF}" --exec 'configure new single memory tenant_mode=optional_experimental; status' --timeout "${CONFIGUREWAIT}" --log --log-dir "${LOGDIR}" &>> "${LOGDIR}/fdbclient.log"
"${BINDIR}/fdbcli" -C "${FDBCONF}" --exec 'configure new single memory tenant_mode=optional; status' --timeout "${CONFIGUREWAIT}" --log --log-dir "${LOGDIR}" &>> "${LOGDIR}/fdbclient.log"
if ! displayMessage "Checking if config succeeded"
then

View File

@ -64,7 +64,7 @@ The ``commit`` command commits the current transaction. Any sets or clears execu
configure
---------
The ``configure`` command changes the database configuration. Its syntax is ``configure [new|tss] [single|double|triple|three_data_hall|three_datacenter] [ssd|memory] [grv_proxies=<N>] [commit_proxies=<N>] [resolvers=<N>] [logs=<N>] [count=<TSS_COUNT>] [perpetual_storage_wiggle=<WIGGLE_SPEED>] [perpetual_storage_wiggle_locality=<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>] [storage_migration_type={disabled|aggressive|gradual}] [tenant_mode={disabled|optional_experimental|required_experimental}]``.
The ``configure`` command changes the database configuration. Its syntax is ``configure [new|tss] [single|double|triple|three_data_hall|three_datacenter] [ssd|memory] [grv_proxies=<N>] [commit_proxies=<N>] [resolvers=<N>] [logs=<N>] [count=<TSS_COUNT>] [perpetual_storage_wiggle=<WIGGLE_SPEED>] [perpetual_storage_wiggle_locality=<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>] [storage_migration_type={disabled|aggressive|gradual}] [tenant_mode={disabled|optional|required|management|subordinate}]``.
The ``new`` option, if present, initializes a new database with the given configuration rather than changing the configuration of an existing one. When ``new`` is used, both a redundancy mode and a storage engine must be specified.

View File

@ -782,8 +782,10 @@
"tenant_mode": {
"$enum":[
"disabled",
"optional_experimental",
"required_experimental"
"optional",
"required",
"management",
"subordinate"
]}
},
"data":{

View File

@ -27,8 +27,10 @@ In order to use tenants, the cluster must be configured with an appropriate tena
FoundationDB clusters support the following tenant modes:
* ``disabled`` - Tenants cannot be created or used. Disabled is the default tenant mode.
* ``optional_experimental`` - Tenants can be created. Each transaction can choose whether or not to use a tenant. This mode is primarily intended for migration and testing purposes, and care should be taken to avoid conflicts between tenant and non-tenant data.
* ``required_experimental`` - Tenants can be created. Each normal transaction must use a tenant. To support special access needs, transactions will be permitted to access the raw key-space using the ``RAW_ACCESS`` transaction option.
* ``optional`` - Tenants can be created. Each transaction can choose whether or not to use a tenant. This mode is primarily intended for migration and testing purposes, and care should be taken to avoid conflicts between tenant and non-tenant data.
* ``required`` - Tenants can be created. Each normal transaction must use a tenant. To support special access needs, transactions will be permitted to access the raw key-space using the ``RAW_ACCESS`` transaction option.
* ``management`` - The cluster is configured as the management cluster in a metacluster. Subordinate data clusters can be registered with this management cluster and created tenants will be assigned to them. This cluster is not allowed to store data.
* ``subordinate`` - The cluster is configured as a data cluster in a metacluster. Like ``required`` mode, except tenants cannot be created and deleted directly on a ``subordinate`` cluster.
Creating and deleting tenants
=============================
@ -51,7 +53,7 @@ All operations performed within a tenant transaction will occur within the tenan
Raw access
----------
When operating in the tenant mode ``required_experimental``, transactions are not ordinarily permitted to run without using a tenant. In order to access the system keys or perform maintenance operations that span multiple tenants, it is required to use the ``RAW_ACCESS`` transaction option to access the global key-space. It is an error to specify ``RAW_ACCESS`` on a transaction that is configured to use a tenant.
When operating in the tenant mode ``required`` or using a metacluster, transactions are not ordinarily permitted to run without using a tenant. In order to access the system keys or perform maintenance operations that span multiple tenants, it is required to use the ``RAW_ACCESS`` transaction option to access the global key-space. It is an error to specify ``RAW_ACCESS`` on a transaction that is configured to use a tenant.
.. note :: Setting the ``READ_SYSTEM_KEYS`` or ``ACCESS_SYSTEM_KEYS`` options implies ``RAW_ACCESS`` for your transaction.

View File

@ -19,6 +19,7 @@ set(FDBCLI_SRCS
LockCommand.actor.cpp
ChangeFeedCommand.actor.cpp
MaintenanceCommand.actor.cpp
MetaclusterCommands.actor.cpp
ProfileCommand.actor.cpp
SetClassCommand.actor.cpp
SnapshotCommand.actor.cpp

View File

@ -317,7 +317,7 @@ CommandFactory configureFactory(
"commit_proxies=<COMMIT_PROXIES>|grv_proxies=<GRV_PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*|"
"count=<TSS_COUNT>|perpetual_storage_wiggle=<WIGGLE_SPEED>|perpetual_storage_wiggle_locality="
"<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>|storage_migration_type={disabled|gradual|aggressive}"
"|tenant_mode={disabled|optional_experimental|required_experimental}|blob_granules_enabled={0|1}",
"|tenant_mode={disabled|optional|required|management|subordinate}|blob_granules_enabled={0|1}",
"change the database configuration",
"The `new' option, if present, initializes a new database with the given configuration rather than changing "
"the configuration of an existing one. When used, both a redundancy mode and a storage engine must be "
@ -348,7 +348,7 @@ CommandFactory configureFactory(
"perpetual_storage_wiggle_locality=<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>: Set the process filter for wiggling. "
"The processes that match the given locality key and locality value are only wiggled. The value 0 will disable "
"the locality filter and matches all the processes for wiggling.\n\n"
"tenant_mode=<disabled|optional_experimental|required_experimental>: Sets the tenant mode for the cluster. If "
"tenant_mode=<disabled|optional|required|management|subordinate>: Sets the tenant mode for the cluster. If "
"optional, then transactions can be run with or without specifying tenants. If required, all data must be "
"accessed using tenants.\n\n"

View File

@ -0,0 +1,274 @@
/*
* MetaclusterCommands.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/MetaclusterManagement.actor.h"
#include "fdbclient/Schemas.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace fdb_cli {
Optional<std::pair<Optional<std::string>, Optional<DataClusterEntry>>>
parseClusterConfiguration(std::vector<StringRef> const& tokens, DataClusterEntry const& defaults, int startIndex) {
Optional<DataClusterEntry> entry;
Optional<std::string> connectionString;
for (int tokenNum = startIndex; tokenNum < tokens.size(); ++tokenNum) {
StringRef token = tokens[tokenNum];
StringRef param = token.eat("=");
std::string value = token.toString();
if (tokencmp(param, "max_tenant_groups")) {
entry = defaults;
int n;
if (sscanf(value.c_str(), "%d%n", &entry.get().capacity.numTenantGroups, &n) != 1 || n != value.size() ||
entry.get().capacity.numTenantGroups < 0) {
fprintf(stderr, "ERROR: invalid number of tenant groups %s\n", value.c_str());
return Optional<std::pair<Optional<std::string>, Optional<DataClusterEntry>>>();
}
} else if (tokencmp(param, "connection_string")) {
connectionString = value;
} else {
fprintf(stderr, "ERROR: unrecognized configuration parameter %s\n", param.toString().c_str());
return Optional<std::pair<Optional<std::string>, Optional<DataClusterEntry>>>();
}
}
return std::make_pair(connectionString, entry);
}
void printMetaclusterConfigureOptionsUsage() {
fmt::print("max_tenant_groups sets the maximum number of tenant groups that can be assigned\n"
"to the named data cluster.\n");
fmt::print("connection_string sets the connection string for the named data cluster.\n");
}
// metacluster register command
ACTOR Future<bool> metaclusterRegisterCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() < 4) {
fmt::print("Usage: metacluster register <NAME> <max_tenant_groups=<NUM_GROUPS>|\n"
"connection_string=<CONNECTION_STRING>> ...\n\n");
fmt::print("Adds a data cluster with the given connection string to a metacluster.\n");
fmt::print("NAME is used to identify the cluster in future commands.\n");
printMetaclusterConfigureOptionsUsage();
return false;
}
DataClusterEntry defaultEntry;
auto config = parseClusterConfiguration(tokens, defaultEntry, 3);
if (!config.present()) {
return false;
} else if (!config.get().first.present()) {
fprintf(stderr, "ERROR: connection_string must be configured when registering a cluster.\n");
return false;
}
wait(MetaclusterAPI::registerCluster(
db, tokens[2], config.get().first.get(), config.get().second.orDefault(defaultEntry)));
printf("The cluster `%s' has been added\n", printable(tokens[2]).c_str());
return true;
}
// metacluster remove command
ACTOR Future<bool> metaclusterRemoveCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() != 3) {
fmt::print("Usage: metacluster remove <NAME> \n\n");
fmt::print("Removes the specified data cluster from a metacluster.\n");
return false;
}
wait(MetaclusterAPI::removeCluster(db, tokens[2]));
printf("The cluster `%s' has been removed\n", printable(tokens[2]).c_str());
return true;
}
// metacluster list command
ACTOR Future<bool> metaclusterListCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() > 6) {
fmt::print("Usage: metacluster list [BEGIN] [END] [LIMIT]\n\n");
fmt::print("Lists the data clusters in a metacluster.\n");
fmt::print("Only cluster names in the range BEGIN - END will be printed.\n");
fmt::print("An optional LIMIT can be specified to limit the number of results (default 100).\n");
return false;
}
state ClusterNameRef begin = tokens.size() > 2 ? tokens[2] : ""_sr;
state ClusterNameRef end = tokens.size() > 3 ? tokens[3] : "\xff"_sr;
int limit = 100;
if (tokens.size() > 4) {
int n = 0;
if (sscanf(tokens[3].toString().c_str(), "%d%n", &limit, &n) != 1 || n != tokens[3].size() || limit < 0) {
fprintf(stderr, "ERROR: invalid limit %s\n", tokens[3].toString().c_str());
return false;
}
}
std::map<ClusterName, DataClusterMetadata> clusters = wait(MetaclusterAPI::listClusters(db, begin, end, limit));
if (clusters.empty()) {
if (tokens.size() == 2) {
printf("The metacluster has no registered data clusters\n");
} else {
printf("The metacluster has no registered data clusters in the specified range\n");
}
}
int index = 0;
for (auto cluster : clusters) {
printf(" %d. %s\n", ++index, printable(cluster.first).c_str());
}
return true;
}
// metacluster get command
ACTOR Future<bool> metaclusterGetCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() > 3) {
fmt::print("Usage: metacluster get <NAME>\n\n");
fmt::print("Prints metadata associated with the given data cluster.\n");
return false;
}
DataClusterMetadata metadata = wait(MetaclusterAPI::getCluster(db, tokens[2]));
printf(" id: %" PRId64 "\n", metadata.entry.id);
printf(" connection string: %s\n", metadata.connectionString.toString().c_str());
printf(" tenant group capacity: %d\n", metadata.entry.capacity.numTenantGroups);
printf(" allocated tenant groups: %d\n", metadata.entry.allocated.numTenantGroups);
return true;
}
// metacluster configure command
ACTOR Future<bool> metaclusterConfigureCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() < 4) {
fmt::print("Usage: metacluster configure <NAME> <max_tenant_groups=<NUM_GROUPS>|\n"
"connection_string=<CONNECTION_STRING>> ...\n\n");
fmt::print("Updates the configuration of the metacluster.\n");
printMetaclusterConfigureOptionsUsage();
return false;
}
state Reference<ITransaction> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<DataClusterMetadata> metadata = wait(MetaclusterAPI::tryGetClusterTransaction(tr, tokens[2]));
if (!metadata.present()) {
throw cluster_not_found();
}
auto config = parseClusterConfiguration(tokens, metadata.get().entry, 3);
if (!config.present()) {
return false;
}
wait(MetaclusterAPI::updateClusterMetadataTransaction(
tr, tokens[2], config.get().first, config.get().second));
wait(safeThreadFutureToFuture(tr->commit()));
break;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
return true;
}
// metacluster command
Future<bool> metaclusterCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() == 1) {
printUsage(tokens[0]);
return true;
} else if (tokencmp(tokens[1], "register")) {
return metaclusterRegisterCommand(db, tokens);
} else if (tokencmp(tokens[1], "remove")) {
return metaclusterRemoveCommand(db, tokens);
} else if (tokencmp(tokens[1], "list")) {
return metaclusterListCommand(db, tokens);
} else if (tokencmp(tokens[1], "get")) {
return metaclusterGetCommand(db, tokens);
} else if (tokencmp(tokens[1], "configure")) {
return metaclusterConfigureCommand(db, tokens);
} else {
printUsage(tokens[0]);
return true;
}
}
void metaclusterGenerator(const char* text,
const char* line,
std::vector<std::string>& lc,
std::vector<StringRef> const& tokens) {
if (tokens.size() == 1) {
const char* opts[] = { "register", "remove", "list", "get", "configure", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() > 1 && (tokencmp(tokens[1], "register") || tokencmp(tokens[1], "configure"))) {
const char* opts[] = { "max_tenant_groups=", "connection_string=", nullptr };
arrayGenerator(text, line, opts, lc);
}
}
std::vector<const char*> metaclusterHintGenerator(std::vector<StringRef> const& tokens, bool inArgument) {
if (tokens.size() == 1) {
return { "<register|remove|list|get|configure>", "[ARGS]" };
} else if (tokencmp(tokens[1], "register")) {
static std::vector<const char*> opts = {
"<NAME>", "<max_tenant_groups=<NUM_GROUPS>|connection_string=<CONNECTION_STRING>>"
};
return std::vector<const char*>(opts.begin() + std::min<int>(1, tokens.size() - 2), opts.end());
} else if (tokencmp(tokens[1], "remove") && tokens.size() < 3) {
static std::vector<const char*> opts = { "<NAME>" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else if (tokencmp(tokens[1], "list") && tokens.size() < 5) {
static std::vector<const char*> opts = { "[BEGIN]", "[END]", "[LIMIT]" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else if (tokencmp(tokens[1], "get") && tokens.size() < 3) {
static std::vector<const char*> opts = { "<NAME>" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else if (tokencmp(tokens[1], "configure")) {
static std::vector<const char*> opts = {
"<NAME>", "<max_tenant_groups=<NUM_GROUPS>|connection_string=<CONNECTION_STRING>>"
};
return std::vector<const char*>(opts.begin() + std::min<int>(1, tokens.size() - 2), opts.end());
} else {
return std::vector<const char*>();
}
}
CommandFactory metaclusterRegisterFactory("metacluster",
CommandHelp("metacluster <register|remove|list|get|configure> [ARGS]",
"view and manage a metacluster",
"Use `register' to add a data cluster to the metacluster."),
&metaclusterGenerator,
&metaclusterHintGenerator);
} // namespace fdb_cli

View File

@ -36,9 +36,30 @@ namespace fdb_cli {
const KeyRangeRef tenantSpecialKeyRange(LiteralStringRef("\xff\xff/management/tenant_map/"),
LiteralStringRef("\xff\xff/management/tenant_map0"));
std::pair<bool, Optional<TenantMapEntry>> parseTenantConfiguration(std::vector<StringRef> const& tokens,
TenantMapEntry const& defaults,
int startIndex) {
Optional<TenantMapEntry> entry;
for (int tokenNum = startIndex; tokenNum < tokens.size(); ++tokenNum) {
StringRef token = tokens[tokenNum];
StringRef param = token.eat("=");
std::string value = token.toString();
if (tokencmp(param, "tenant_group")) {
entry = defaults;
// TODO: store tenant group
} else {
fprintf(stderr, "ERROR: unrecognized configuration parameter %s\n", param.toString().c_str());
return std::make_pair(false, Optional<TenantMapEntry>());
}
}
return std::make_pair(true, entry);
}
// createtenant command
ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() != 2) {
if (tokens.size() < 2 || tokens.size() > 3) {
printUsage(tokens[0]);
return false;
}
@ -47,6 +68,13 @@ ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector
state Reference<ITransaction> tr = db->createTransaction();
state bool doneExistenceCheck = false;
auto configuration = parseTenantConfiguration(tokens, TenantMapEntry(), 2);
if (!configuration.first) {
return false;
}
// TODO: use the tenant group
loop {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
try {
@ -78,10 +106,12 @@ ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector
return true;
}
CommandFactory createTenantFactory("createtenant",
CommandHelp("createtenant <TENANT_NAME>",
CommandFactory createTenantFactory(
"createtenant",
CommandHelp("createtenant <TENANT_NAME> [tenant_group=<TENANT_GROUP>]",
"creates a new tenant in the cluster",
"Creates a new tenant in the cluster with the specified name."));
"Creates a new tenant in the cluster with the specified name. An optional group can be specified"
"that will require this tenant to be placed on the same cluster as other tenants in the same group."));
// deletetenant command
ACTOR Future<bool> deleteTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
@ -155,7 +185,7 @@ ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<
}
if (tokens.size() == 4) {
int n = 0;
if (sscanf(tokens[3].toString().c_str(), "%d%n", &limit, &n) != 1 || n != tokens[3].size()) {
if (sscanf(tokens[3].toString().c_str(), "%d%n", &limit, &n) != 1 || n != tokens[3].size() || limit < 0) {
fprintf(stderr, "ERROR: invalid limit %s\n", tokens[3].toString().c_str());
return false;
}

View File

@ -1883,6 +1883,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "metacluster")) {
bool _result = wait(makeInterruptable(metaclusterCommand(db, tokens)));
if (!_result)
is_error = true;
continue;
}
fprintf(stderr, "ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
is_error = true;
}

View File

@ -201,6 +201,10 @@ ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<
// lock/unlock command
ACTOR Future<bool> lockCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
ACTOR Future<bool> unlockDatabaseActor(Reference<IDatabase> db, UID uid);
// metacluster command
Future<bool> metaclusterCommand(Reference<IDatabase> db, std::vector<StringRef> tokens);
// changefeed command
ACTOR Future<bool> changeFeedCommandActor(Database localDb,
Optional<TenantMapEntry> tenantEntry,

View File

@ -36,7 +36,6 @@ set(FDBCLIENT_SRCS
ClientWorkerInterface.h
ClusterConnectionFile.actor.cpp
ClusterConnectionFile.h
ClusterConnectionKey.actor.cpp
ClusterConnectionKey.actor.h
ClusterConnectionMemoryRecord.actor.cpp
ClusterConnectionMemoryRecord.h
@ -60,6 +59,7 @@ set(FDBCLIENT_SRCS
FluentDSampleIngestor.cpp
FileBackupAgent.actor.cpp
GenericManagementAPI.actor.h
GenericTransactionHelper.h
GlobalConfig.h
GlobalConfig.actor.h
GlobalConfig.actor.cpp
@ -82,6 +82,9 @@ set(FDBCLIENT_SRCS
LocalClientAPI.h
ManagementAPI.actor.cpp
ManagementAPI.actor.h
Metacluster.cpp
Metacluster.h
MetaclusterManagement.actor.h
MonitorLeader.actor.cpp
MonitorLeader.h
MultiVersionAssignmentVars.h

View File

@ -1,167 +0,0 @@
/*
* ClusterConnectionKey.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/ClusterConnectionKey.actor.h"
#include "flow/actorcompiler.h" // has to be last include
// Creates a cluster connection record with a given connection string and saves it to the specified key. Needs to be
// persisted should be set to true unless this ClusterConnectionKey is being created with the value read from the
// key.
ClusterConnectionKey::ClusterConnectionKey(Database db,
Key connectionStringKey,
ClusterConnectionString const& contents,
ConnectionStringNeedsPersisted needsToBePersisted)
: IClusterConnectionRecord(needsToBePersisted), db(db), connectionStringKey(connectionStringKey) {
if (!needsToBePersisted) {
lastPersistedConnectionString = ValueRef(contents.toString());
}
cs = contents;
}
// Loads and parses the connection string at the specified key, throwing errors if the file cannot be read or the
// format is invalid.
ACTOR Future<Reference<ClusterConnectionKey>> ClusterConnectionKey::loadClusterConnectionKey(Database db,
Key connectionStringKey) {
state Transaction tr(db);
loop {
try {
Optional<Value> v = wait(tr.get(connectionStringKey));
if (!v.present()) {
throw connection_string_invalid();
}
return makeReference<ClusterConnectionKey>(db,
connectionStringKey,
ClusterConnectionString(v.get().toString()),
ConnectionStringNeedsPersisted::False);
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Sets the connections string held by this object and persists it.
Future<Void> ClusterConnectionKey::setAndPersistConnectionString(ClusterConnectionString const& connectionString) {
cs = connectionString;
return success(persist());
}
// Get the connection string stored in the database.
ACTOR Future<ClusterConnectionString> ClusterConnectionKey::getStoredConnectionStringImpl(
Reference<ClusterConnectionKey> self) {
Reference<ClusterConnectionKey> cck =
wait(ClusterConnectionKey::loadClusterConnectionKey(self->db, self->connectionStringKey));
return cck->cs;
}
Future<ClusterConnectionString> ClusterConnectionKey::getStoredConnectionString() {
return getStoredConnectionStringImpl(Reference<ClusterConnectionKey>::addRef(this));
}
ACTOR Future<bool> ClusterConnectionKey::upToDateImpl(Reference<ClusterConnectionKey> self,
ClusterConnectionString* connectionString) {
try {
// the cluster file hasn't been created yet so there's nothing to check
if (self->needsToBePersisted())
return true;
Reference<ClusterConnectionKey> temp =
wait(ClusterConnectionKey::loadClusterConnectionKey(self->db, self->connectionStringKey));
*connectionString = temp->getConnectionString();
return connectionString->toString() == self->cs.toString();
} catch (Error& e) {
TraceEvent(SevWarnAlways, "ClusterKeyError").error(e).detail("Key", self->connectionStringKey);
return false; // Swallow the error and report that the file is out of date
}
}
// Checks whether the connection string in the database matches the connection string stored in memory. The cluster
// string stored in the database is returned via the reference parameter connectionString.
Future<bool> ClusterConnectionKey::upToDate(ClusterConnectionString& connectionString) {
return upToDateImpl(Reference<ClusterConnectionKey>::addRef(this), &connectionString);
}
// Returns the key where the connection string is stored.
std::string ClusterConnectionKey::getLocation() const {
return printable(connectionStringKey);
}
// Creates a copy of this object with a modified connection string but that isn't persisted.
Reference<IClusterConnectionRecord> ClusterConnectionKey::makeIntermediateRecord(
ClusterConnectionString const& connectionString) const {
return makeReference<ClusterConnectionKey>(db, connectionStringKey, connectionString);
}
// Returns a string representation of this cluster connection record. This will include the type of record and the
// key where the record is stored.
std::string ClusterConnectionKey::toString() const {
return "fdbkey://" + printable(connectionStringKey);
}
ACTOR Future<bool> ClusterConnectionKey::persistImpl(Reference<ClusterConnectionKey> self) {
self->setPersisted();
state Value newConnectionString = ValueRef(self->cs.toString());
try {
state Transaction tr(self->db);
loop {
try {
Optional<Value> existingConnectionString = wait(tr.get(self->connectionStringKey));
// Someone has already updated the connection string to what we want
if (existingConnectionString.present() && existingConnectionString.get() == newConnectionString) {
self->lastPersistedConnectionString = newConnectionString;
return true;
}
// Someone has updated the connection string to something we didn't expect, in which case we leave it
// alone. It's possible this could result in the stored string getting stuck if the connection string
// changes twice and only the first change is recorded. If the process that wrote the first change dies
// and no other process attempts to write the intermediate state, then only a newly opened connection
// key would be able to update the state.
else if (existingConnectionString.present() &&
existingConnectionString != self->lastPersistedConnectionString) {
TraceEvent(SevWarnAlways, "UnableToChangeConnectionKeyDueToMismatch")
.detail("ConnectionKey", self->connectionStringKey)
.detail("NewConnectionString", newConnectionString)
.detail("ExpectedStoredConnectionString", self->lastPersistedConnectionString)
.detail("ActualStoredConnectionString", existingConnectionString);
return false;
}
tr.set(self->connectionStringKey, newConnectionString);
wait(tr.commit());
self->lastPersistedConnectionString = newConnectionString;
return true;
} catch (Error& e) {
wait(tr.onError(e));
}
}
} catch (Error& e) {
TraceEvent(SevWarnAlways, "UnableToChangeConnectionKey")
.error(e)
.detail("ConnectionKey", self->connectionStringKey)
.detail("ConnectionString", self->cs.toString());
}
return false;
};
// Writes the connection string to the database
Future<bool> ClusterConnectionKey::persist() {
return persistImpl(Reference<ClusterConnectionKey>::addRef(this));
}

View File

@ -29,61 +29,162 @@
#define FDBCLIENT_CLUSTERCONNECTIONKEY_ACTOR_H
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/GenericTransactionHelper.h"
#include "fdbclient/NativeAPI.actor.h"
#include "flow/actorcompiler.h" // has to be last include
// An implementation of IClusterConnectionRecord backed by a key in a FoundationDB database.
class ClusterConnectionKey : public IClusterConnectionRecord, ReferenceCounted<ClusterConnectionKey>, NonCopyable {
template <class DB>
class ClusterConnectionKey : public IClusterConnectionRecord, ReferenceCounted<ClusterConnectionKey<DB>>, NonCopyable {
public:
// Creates a cluster connection record with a given connection string and saves it to the specified key. Needs to be
// persisted should be set to true unless this ClusterConnectionKey is being created with the value read from the
// key.
ClusterConnectionKey(Database db,
ClusterConnectionKey(DB db,
Key connectionStringKey,
ClusterConnectionString const& contents,
ConnectionStringNeedsPersisted needsToBePersisted = ConnectionStringNeedsPersisted::True);
ConnectionStringNeedsPersisted needsToBePersisted = ConnectionStringNeedsPersisted::True)
: IClusterConnectionRecord(needsToBePersisted), db(db), connectionStringKey(connectionStringKey) {
if (!needsToBePersisted) {
lastPersistedConnectionString = ValueRef(contents.toString());
}
cs = contents;
}
// Loads and parses the connection string at the specified key, throwing errors if the file cannot be read or the
// format is invalid.
ACTOR static Future<Reference<ClusterConnectionKey>> loadClusterConnectionKey(Database db, Key connectionStringKey);
// Loads and parses the connection string at the specified key, throwing errors if the file cannot be read or
// the format is invalid.
ACTOR static Future<Reference<ClusterConnectionKey>> loadClusterConnectionKey(DB db, Key connectionStringKey) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
typename transaction_future_type<Transaction, Optional<Value>>::type f = tr->get(connectionStringKey);
Optional<Value> v = wait(safeThreadFutureToFuture(f));
if (!v.present()) {
throw connection_string_invalid();
}
return makeReference<ClusterConnectionKey>(db,
connectionStringKey,
ClusterConnectionString(v.get().toString()),
ConnectionStringNeedsPersisted::False);
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
// Sets the connections string held by this object and persists it.
Future<Void> setAndPersistConnectionString(ClusterConnectionString const&) override;
Future<Void> setAndPersistConnectionString(ClusterConnectionString const& connectionString) override {
cs = connectionString;
return success(persist());
}
// Get the connection string stored in the database.
Future<ClusterConnectionString> getStoredConnectionString() override;
Future<ClusterConnectionString> getStoredConnectionString() override {
return getStoredConnectionStringImpl(Reference<ClusterConnectionKey>::addRef(this));
}
// Checks whether the connection string in the database matches the connection string stored in memory. The cluster
// string stored in the database is returned via the reference parameter connectionString.
Future<bool> upToDate(ClusterConnectionString& connectionString) override;
// Checks whether the connection string in the database matches the connection string stored in memory. The
// cluster string stored in the database is returned via the reference parameter connectionString.
Future<bool> upToDate(ClusterConnectionString& connectionString) override {
return upToDateImpl(Reference<ClusterConnectionKey>::addRef(this), &connectionString);
}
// Returns the key where the connection string is stored.
std::string getLocation() const override;
std::string getLocation() const override { return printable(connectionStringKey); }
// Creates a copy of this object with a modified connection string but that isn't persisted.
Reference<IClusterConnectionRecord> makeIntermediateRecord(
ClusterConnectionString const& connectionString) const override;
ClusterConnectionString const& connectionString) const override {
return makeReference<ClusterConnectionKey>(db, connectionStringKey, connectionString);
}
// Returns a string representation of this cluster connection record. This will include the type of record and the
// key where the record is stored.
std::string toString() const override;
// Returns a string representation of this cluster connection record. This will include the type of record and
// the key where the record is stored.
std::string toString() const override { return "fdbkey://" + printable(connectionStringKey); }
void addref() override { ReferenceCounted<ClusterConnectionKey>::addref(); }
void delref() override { ReferenceCounted<ClusterConnectionKey>::delref(); }
protected:
// Writes the connection string to the database
Future<bool> persist() override;
Future<bool> persist() override { return persistImpl(Reference<ClusterConnectionKey>::addRef(this)); }
private:
ACTOR static Future<ClusterConnectionString> getStoredConnectionStringImpl(Reference<ClusterConnectionKey> self);
ACTOR static Future<bool> upToDateImpl(Reference<ClusterConnectionKey> self,
ClusterConnectionString* connectionString);
ACTOR static Future<bool> persistImpl(Reference<ClusterConnectionKey> self);
ACTOR static Future<ClusterConnectionString> getStoredConnectionStringImpl(Reference<ClusterConnectionKey> self) {
Reference<ClusterConnectionKey> cck =
wait(ClusterConnectionKey::loadClusterConnectionKey(self->db, self->connectionStringKey));
return cck->cs;
}
// The database where the connection key is stored. Note that this does not need to be the same database as the one
// that the connection string would connect to.
Database db;
ACTOR static Future<bool> upToDateImpl(Reference<ClusterConnectionKey> self,
ClusterConnectionString* connectionString) {
try {
// the cluster file hasn't been created yet so there's nothing to check
if (self->needsToBePersisted())
return true;
Reference<ClusterConnectionKey> temp =
wait(ClusterConnectionKey::loadClusterConnectionKey(self->db, self->connectionStringKey));
*connectionString = temp->getConnectionString();
return connectionString->toString() == self->cs.toString();
} catch (Error& e) {
TraceEvent(SevWarnAlways, "ClusterKeyError").error(e).detail("Key", self->connectionStringKey);
return false; // Swallow the error and report that the file is out of date
}
}
ACTOR static Future<bool> persistImpl(Reference<ClusterConnectionKey> self) {
self->setPersisted();
state Value newConnectionString = ValueRef(self->cs.toString());
try {
state Reference<typename DB::TransactionT> tr = self->db->createTransaction();
loop {
try {
typename transaction_future_type<Transaction, Optional<Value>>::type f =
tr->get(self->connectionStringKey);
Optional<Value> existingConnectionString = wait(safeThreadFutureToFuture(f));
// Someone has already updated the connection string to what we want
if (existingConnectionString.present() && existingConnectionString.get() == newConnectionString) {
self->lastPersistedConnectionString = newConnectionString;
return true;
}
// Someone has updated the connection string to something we didn't expect, in which case we leave
// it alone. It's possible this could result in the stored string getting stuck if the connection
// string changes twice and only the first change is recorded. If the process that wrote the first
// change dies and no other process attempts to write the intermediate state, then only a newly
// opened connection key would be able to update the state.
else if (existingConnectionString.present() &&
existingConnectionString != self->lastPersistedConnectionString) {
TraceEvent(SevWarnAlways, "UnableToChangeConnectionKeyDueToMismatch")
.detail("ConnectionKey", self->connectionStringKey)
.detail("NewConnectionString", newConnectionString)
.detail("ExpectedStoredConnectionString", self->lastPersistedConnectionString)
.detail("ActualStoredConnectionString", existingConnectionString);
return false;
}
tr->set(self->connectionStringKey, newConnectionString);
wait(safeThreadFutureToFuture(tr->commit()));
self->lastPersistedConnectionString = newConnectionString;
return true;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
} catch (Error& e) {
TraceEvent(SevWarnAlways, "UnableToChangeConnectionKey")
.error(e)
.detail("ConnectionKey", self->connectionStringKey)
.detail("ConnectionString", self->cs.toString());
}
return false;
}
// The database where the connection key is stored. Note that this does not need to be the same database as the
// one that the connection string would connect to.
DB db;
Key connectionStringKey;
Optional<Value> lastPersistedConnectionString;
};

View File

@ -1328,7 +1328,7 @@ struct TenantMode {
// These enumerated values are stored in the database configuration, so can NEVER be changed. Only add new ones
// just before END.
// Note: OPTIONAL_TENANT is not named OPTIONAL because of a collision with a Windows macro.
enum Mode { DISABLED = 0, OPTIONAL_TENANT = 1, REQUIRED = 2, END = 3 };
enum Mode { DISABLED = 0, OPTIONAL_TENANT = 1, REQUIRED = 2, MANAGEMENT = 3, SUBORDINATE = 4, END = 5 };
TenantMode() : mode(DISABLED) {}
TenantMode(Mode mode) : mode(mode) {
@ -1348,9 +1348,13 @@ struct TenantMode {
case DISABLED:
return "disabled";
case OPTIONAL_TENANT:
return "optional_experimental";
return "optional";
case REQUIRED:
return "required_experimental";
return "required";
case MANAGEMENT:
return "management";
case SUBORDINATE:
return "subordinate";
default:
ASSERT(false);
}

View File

@ -36,6 +36,7 @@ the contents of the system key space.
#include <map>
#include "fdbclient/ClientBooleanParams.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbclient/GenericTransactionHelper.h"
#include "fdbclient/Status.h"
#include "fdbclient/Subspace.h"
#include "fdbclient/DatabaseConfiguration.h"
@ -129,21 +130,6 @@ bool isCompleteConfiguration(std::map<std::string, std::string> const& options);
ConfigureAutoResult parseConfig(StatusObject const& status);
template <typename Transaction, class T>
struct transaction_future_type {
using type = typename Transaction::template FutureT<T>;
};
template <typename Transaction, class T>
struct transaction_future_type<Transaction*, T> {
using type = typename transaction_future_type<Transaction, T>::type;
};
template <typename Transaction, class T>
struct transaction_future_type<Reference<Transaction>, T> {
using type = typename transaction_future_type<Transaction, T>::type;
};
// Management API written in template code to support both IClientAPI and NativeAPI
namespace ManagementAPI {

View File

@ -0,0 +1,42 @@
/*
* GenericTransactionHelper.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_GENERIC_TRANSACTION_HELPER_H
#define FDBCLIENT_GENERIC_TRANSACTION_HELPER_H
#pragma once
#include "flow/FastRef.h"
template <typename Transaction, class T>
struct transaction_future_type {
using type = typename Transaction::template FutureT<T>;
};
template <typename Transaction, class T>
struct transaction_future_type<Transaction*, T> {
using type = typename transaction_future_type<Transaction, T>::type;
};
template <typename Transaction, class T>
struct transaction_future_type<Reference<Transaction>, T> {
using type = typename transaction_future_type<Transaction, T>::type;
};
#endif

View File

@ -190,12 +190,16 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
TenantMode tenantMode;
if (value == "disabled") {
tenantMode = TenantMode::DISABLED;
} else if (value == "optional_experimental") {
} else if (value == "optional_experimental" || value == "optional") {
tenantMode = TenantMode::OPTIONAL_TENANT;
} else if (value == "required_experimental") {
} else if (value == "required_experimental" || value == "required") {
tenantMode = TenantMode::REQUIRED;
} else if (value == "management") {
tenantMode = TenantMode::MANAGEMENT;
} else if (value == "subordinate") {
tenantMode = TenantMode::SUBORDINATE;
} else {
printf("Error: Only disabled|optional_experimental|required_experimental are valid for tenant_mode.\n");
printf("Error: Only disabled|optional|required|management|subordinate are valid for tenant_mode.\n");
return out;
}
out[p + key] = format("%d", tenantMode);

27
fdbclient/Metacluster.cpp Normal file
View File

@ -0,0 +1,27 @@
/*
* Metacluster.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/Metacluster.h"
json_spirit::mObject ClusterUsage::toJson() {
json_spirit::mObject obj;
obj["num_tenant_groups"] = numTenantGroups;
return obj;
}

86
fdbclient/Metacluster.h Normal file
View File

@ -0,0 +1,86 @@
/*
* Metacluster.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_METACLUSTER_H
#define FDBCLIENT_METACLUSTER_H
#include "CoordinationInterface.h"
#include "json_spirit/json_spirit_value.h"
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbclient/VersionedMap.h"
#include "flow/flat_buffers.h"
typedef StringRef ClusterNameRef;
typedef Standalone<ClusterNameRef> ClusterName;
struct ClusterUsage {
int numTenantGroups = 0;
ClusterUsage() = default;
ClusterUsage(int numTenantGroups) : numTenantGroups(numTenantGroups) {}
json_spirit::mObject toJson();
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, numTenantGroups);
}
};
template <>
struct Traceable<ClusterUsage> : std::true_type {
static std::string toString(const ClusterUsage& value) {
return format("NumTenantGroups: %d", value.numTenantGroups);
}
};
struct DataClusterEntry {
constexpr static FileIdentifier file_identifier = 929511;
static Value idToValue(int64_t id) {
int64_t swapped = bigEndian64(id);
return StringRef(reinterpret_cast<const uint8_t*>(&swapped), 8);
}
static int64_t valueToId(ValueRef value) {
ASSERT(value.size() == 8);
int64_t id = *reinterpret_cast<const int64_t*>(value.begin());
id = bigEndian64(id);
ASSERT(id >= 0);
return id;
}
int64_t id = -1;
ClusterUsage capacity;
ClusterUsage allocated;
DataClusterEntry() = default;
DataClusterEntry(ClusterUsage capacity) : capacity(capacity) {}
DataClusterEntry(int64_t id, ClusterUsage capacity, ClusterUsage allocated)
: id(id), capacity(capacity), allocated(allocated) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, capacity, allocated);
}
};
#endif

View File

@ -0,0 +1,374 @@
/*
* MetaclusterManagement.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_METACLUSTER_MANAGEMENT_ACTOR_G_H)
#define FDBCLIENT_METACLUSTER_MANAGEMENT_ACTOR_G_H
#include "fdbclient/MetaclusterManagement.actor.g.h"
#elif !defined(FDBCLIENT_METACLUSTER_MANAGEMENT_ACTOR_H)
#define FDBCLIENT_METACLUSTER_MANAGEMENT_ACTOR_H
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GenericTransactionHelper.h"
#include "fdbclient/GenericManagementAPI.actor.h"
#include "fdbclient/Metacluster.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/VersionedMap.h"
#include "flow/flat_buffers.h"
#include "flow/actorcompiler.h" // has to be last include
struct DataClusterMetadata {
DataClusterEntry entry;
ClusterConnectionString connectionString;
DataClusterMetadata() = default;
DataClusterMetadata(DataClusterEntry const& entry, ClusterConnectionString const& connectionString)
: entry(entry), connectionString(connectionString) {}
};
namespace MetaclusterAPI {
const KeyRangeRef tenantSpecialKeyRange(LiteralStringRef("\xff\xff/management/tenant_map/"),
LiteralStringRef("\xff\xff/management/tenant_map0"));
ACTOR template <class Transaction>
Future<Optional<DataClusterMetadata>> tryGetClusterTransaction(Transaction tr, ClusterNameRef name) {
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
state typename transaction_future_type<Transaction, Optional<Value>>::type metadataFuture =
tr->get(dataClusterMetadataKey);
state typename transaction_future_type<Transaction, Optional<Value>>::type connectionRecordFuture =
tr->get(dataClusterConnectionRecordKey);
state Optional<Value> metadata = wait(safeThreadFutureToFuture(metadataFuture));
Optional<Value> connectionString = wait(safeThreadFutureToFuture(connectionRecordFuture));
if (metadata.present()) {
ASSERT(connectionString.present());
return Optional<DataClusterMetadata>(DataClusterMetadata(
decodeDataClusterEntry(metadata.get()), ClusterConnectionString(connectionString.get().toString())));
} else {
return Optional<DataClusterMetadata>();
}
}
ACTOR template <class DB>
Future<Optional<DataClusterMetadata>> tryGetCluster(Reference<DB> db, ClusterName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
return metadata;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<DataClusterMetadata> getClusterTransaction(Transaction tr, ClusterNameRef name) {
Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
if (!metadata.present()) {
throw cluster_not_found();
}
return metadata.get();
}
ACTOR template <class DB>
Future<DataClusterMetadata> getCluster(Reference<DB> db, ClusterName name) {
Optional<DataClusterMetadata> metadata = wait(tryGetCluster(db, name));
if (!metadata.present()) {
throw cluster_not_found();
}
return metadata.get();
}
// This should only be called from a transaction that has already confirmed that the cluster entry
// is present. The updatedEntry should use the existing entry and modify only those fields that need
// to be changed.
ACTOR template <class Transaction>
Future<Void> updateClusterMetadataTransaction(Transaction tr,
ClusterNameRef name,
Optional<std::string> updatedConnectionString,
Optional<DataClusterEntry> updatedEntry) {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
if (updatedEntry.present()) {
tr->set(name.withPrefix(dataClusterMetadataPrefix), encodeDataClusterEntry(updatedEntry.get()));
}
if (updatedConnectionString.present()) {
tr->set(name.withPrefix(dataClusterConnectionRecordPrefix), updatedConnectionString.get());
}
return Void();
}
ACTOR template <class Transaction>
Future<Optional<DataClusterEntry>> registerClusterTransaction(Transaction tr,
ClusterNameRef name,
std::string connectionString,
DataClusterEntry entry) {
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
if (name.startsWith("\xff"_sr)) {
throw invalid_cluster_name();
}
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
state Future<Optional<DataClusterMetadata>> dataClusterMetadataFuture = tryGetClusterTransaction(tr, name);
state typename transaction_future_type<Transaction, Optional<Value>>::type lastIdFuture =
tr->get(dataClusterLastIdKey);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantModeFuture =
tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr));
Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tenantModeFuture));
if (!tenantMode.present() || tenantMode.get() != StringRef(format("%d", TenantMode::MANAGEMENT))) {
throw invalid_metacluster_operation();
}
Optional<DataClusterMetadata> dataClusterMetadata = wait(dataClusterMetadataFuture);
if (dataClusterMetadata.present()) {
return Optional<DataClusterEntry>();
}
state Optional<Value> lastIdVal = wait(safeThreadFutureToFuture(lastIdFuture));
entry.id = lastIdVal.present() ? DataClusterEntry::valueToId(lastIdVal.get()) + 1 : 0;
entry.allocated = ClusterUsage();
tr->set(dataClusterLastIdKey, DataClusterEntry::idToValue(entry.id));
tr->set(dataClusterMetadataKey, encodeDataClusterEntry(entry));
tr->set(dataClusterConnectionRecordKey, connectionString);
return entry;
}
ACTOR template <class DB>
Future<Void> registerCluster(Reference<DB> db, ClusterName name, std::string connectionString, DataClusterEntry entry) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool firstTry = true;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (firstTry) {
Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
if (metadata.present()) {
throw cluster_already_exists();
}
firstTry = false;
}
state Optional<DataClusterEntry> newCluster =
wait(registerClusterTransaction(tr, name, connectionString, entry));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(tr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("RegisteredDataCluster")
.detail("ClusterName", name)
.detail("ClusterId", newCluster.present() ? newCluster.get().id : -1)
.detail("Capacity", entry.capacity)
.detail("Version", tr->getCommittedVersion())
.detail("ConnectionString", connectionString);
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<Optional<DataClusterEntry>> restoreClusterTransaction(Transaction tr,
ClusterName name,
std::string connectionString,
ClusterUsage capacity,
ClusterUsage allocated) {
wait(delay(0)); // TODO: remove when implementation is added
return Optional<DataClusterEntry>();
}
ACTOR template <class DB>
Future<Void> restoreCluster(Reference<DB> db,
ClusterName name,
std::string connectionString,
ClusterUsage capacity,
ClusterUsage allocated) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Optional<DataClusterEntry> newCluster =
wait(restoreCluster(tr, name, connectionString, capacity, allocated));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(tr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("RestoredDataCluster")
.detail("ClusterName", name)
.detail("ClusterId", newCluster.present() ? newCluster.get().id : -1)
.detail("Version", tr->getCommittedVersion());
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<Void> removeClusterTransaction(Transaction tr, ClusterNameRef name) {
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
state Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
if (!metadata.present()) {
return Void();
}
// TODO: verify tenant map for cluster is empty
tr->clear(dataClusterMetadataKey);
tr->clear(dataClusterConnectionRecordKey);
return Void();
}
ACTOR template <class DB>
Future<Void> removeCluster(Reference<DB> db, ClusterName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool firstTry = true;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (firstTry) {
Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
if (!metadata.present()) {
throw cluster_not_found();
}
firstTry = false;
}
wait(removeClusterTransaction(tr, name));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(tr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("RemovedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion());
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<std::map<ClusterName, DataClusterMetadata>> listClustersTransaction(Transaction tr,
ClusterNameRef begin,
ClusterNameRef end,
int limit) {
state KeyRange metadataRange = KeyRangeRef(begin, end).withPrefix(dataClusterMetadataPrefix);
state KeyRange connectionStringRange = KeyRangeRef(begin, end).withPrefix(dataClusterConnectionRecordPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
state typename transaction_future_type<Transaction, RangeResult>::type metadataFuture =
tr->getRange(firstGreaterOrEqual(metadataRange.begin), firstGreaterOrEqual(metadataRange.end), limit);
state typename transaction_future_type<Transaction, RangeResult>::type connectionStringFuture = tr->getRange(
firstGreaterOrEqual(connectionStringRange.begin), firstGreaterOrEqual(connectionStringRange.end), limit);
state RangeResult metadata = wait(safeThreadFutureToFuture(metadataFuture));
RangeResult connectionStrings = wait(safeThreadFutureToFuture(connectionStringFuture));
ASSERT(metadata.size() == connectionStrings.size());
std::map<ClusterName, DataClusterMetadata> clusters;
for (int i = 0; i < metadata.size(); ++i) {
clusters[metadata[i].key.removePrefix(dataClusterMetadataPrefix)] = DataClusterMetadata(
decodeDataClusterEntry(metadata[i].value), ClusterConnectionString(connectionStrings[i].value.toString()));
}
return clusters;
}
ACTOR template <class DB>
Future<std::map<ClusterName, DataClusterMetadata>> listClusters(Reference<DB> db,
ClusterName begin,
ClusterName end,
int limit) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
std::map<ClusterName, DataClusterMetadata> clusters = wait(listClustersTransaction(tr, begin, end, limit));
return clusters;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
}; // namespace MetaclusterAPI
#include "flow/unactorcompiler.h"
#endif

View File

@ -1479,6 +1479,12 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
if (apiVersionAtLeast(720)) {
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::METACLUSTER,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<DataClusterMapRangeImpl>(SpecialKeySpace::getMetaclusterApiCommandRange("dataclustermap")));
}
if (apiVersionAtLeast(710)) {
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::MANAGEMENT,

View File

@ -818,8 +818,10 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"tenant_mode": {
"$enum":[
"disabled",
"optional_experimental",
"required_experimental"
"optional",
"required",
"management",
"subordinate"
]}
},
"data":{

View File

@ -36,6 +36,7 @@
#include "flow/Arena.h"
#include "flow/UnitTest.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/MetaclusterManagement.actor.h"
#include "fdbclient/StatusClient.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -56,6 +57,7 @@ static bool isAlphaNumeric(const std::string& key) {
} // namespace
const KeyRangeRef TenantMapRangeImpl::submoduleRange = KeyRangeRef("tenant_map/"_sr, "tenant_map0"_sr);
const KeyRangeRef DataClusterMapRangeImpl::submoduleRange = KeyRangeRef("data_cluster/"_sr, "data_cluster0"_sr);
std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToBoundary = {
{ SpecialKeySpace::MODULE::TRANSACTION,
@ -80,7 +82,9 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) },
{ SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
KeyRangeRef(LiteralStringRef("\xff\xff/actor_profiler_conf/"),
LiteralStringRef("\xff\xff/actor_profiler_conf0")) }
LiteralStringRef("\xff\xff/actor_profiler_conf0")) },
{ SpecialKeySpace::MODULE::METACLUSTER,
KeyRangeRef(LiteralStringRef("\xff\xff/metacluster/"), LiteralStringRef("\xff\xff/metacluster0")) }
};
std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandToRange = {
@ -129,6 +133,11 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiComman
.withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) }
};
std::unordered_map<std::string, KeyRange> SpecialKeySpace::metaclusterApiCommandToRange = {
{ "dataclustermap",
DataClusterMapRangeImpl::submoduleRange.withPrefix(moduleToBoundary[MODULE::METACLUSTER].begin) }
};
std::set<std::string> SpecialKeySpace::options = { "excluded/force",
"failed/force",
"excluded_locality/force",
@ -2843,3 +2852,109 @@ Future<Optional<std::string>> TenantMapRangeImpl::commit(ReadYourWritesTransacti
return tag(waitForAll(tenantManagementFutures), Optional<std::string>());
}
DataClusterMapRangeImpl::DataClusterMapRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
ACTOR Future<RangeResult> getDataClusterList(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) {
state KeyRef metaclusterPrefix =
kr.begin.substr(0,
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER).begin.size() +
DataClusterMapRangeImpl::submoduleRange.begin.size());
kr = kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER).begin);
ClusterNameRef beginCluster = kr.begin.removePrefix(DataClusterMapRangeImpl::submoduleRange.begin);
ClusterNameRef endCluster = kr.end;
if (endCluster.startsWith(DataClusterMapRangeImpl::submoduleRange.begin)) {
endCluster = endCluster.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
} else {
endCluster = "\xff"_sr;
}
std::map<ClusterName, DataClusterMetadata> clusters = wait(
MetaclusterAPI::listClustersTransaction(&ryw->getTransaction(), beginCluster, endCluster, limitsHint.rows));
RangeResult results;
for (auto cluster : clusters) {
json_spirit::mObject clusterEntry;
clusterEntry["id"] = cluster.second.entry.id;
clusterEntry["connection_string"] = cluster.second.connectionString.toString();
clusterEntry["capacity"] = cluster.second.entry.capacity.toJson();
clusterEntry["allocation"] = cluster.second.entry.allocated.toJson();
std::string clusterEntryString = json_spirit::write_string(json_spirit::mValue(clusterEntry));
ValueRef clusterEntryBytes(results.arena(), clusterEntryString);
results.push_back(results.arena(),
KeyValueRef(cluster.first.withPrefix(metaclusterPrefix, results.arena()), clusterEntryBytes));
}
return results;
}
Future<RangeResult> DataClusterMapRangeImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
return getDataClusterList(ryw, kr, limitsHint);
}
ACTOR Future<Void> removeClusterRange(ReadYourWritesTransaction* ryw,
ClusterName beginCluster,
ClusterName endCluster) {
std::map<ClusterName, DataClusterMetadata> clusters = wait(MetaclusterAPI::listClustersTransaction(
&ryw->getTransaction(), beginCluster, endCluster, CLIENT_KNOBS->TOO_MANY));
if (clusters.size() == CLIENT_KNOBS->TOO_MANY) {
TraceEvent(SevWarn, "RemoveClustersRangeTooLange")
.detail("BeginCluster", beginCluster)
.detail("EndCluster", endCluster);
ryw->setSpecialKeySpaceErrorMsg("too many cluster to range remove");
throw special_keys_api_failure();
}
std::vector<Future<Void>> removeFutures;
for (auto cluster : clusters) {
removeFutures.push_back(MetaclusterAPI::removeClusterTransaction(&ryw->getTransaction(), cluster.first));
}
wait(waitForAll(removeFutures));
return Void();
}
Future<Optional<std::string>> DataClusterMapRangeImpl::commit(ReadYourWritesTransaction* ryw) {
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
std::vector<Future<Void>> clusterManagementFutures;
for (auto range : ranges) {
if (!range.value().first) {
continue;
}
ClusterNameRef clusterName =
range.begin()
.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER).begin)
.removePrefix(DataClusterMapRangeImpl::submoduleRange.begin);
if (range.value().second.present()) {
DataClusterEntry entry;
clusterManagementFutures.push_back(success(MetaclusterAPI::registerClusterTransaction(
&ryw->getTransaction(), clusterName, range.value().second.get().toString(), entry)));
} else {
// For a single key clear, just issue the delete
if (KeyRangeRef(range.begin(), range.end()).singleKeyRange()) {
clusterManagementFutures.push_back(
MetaclusterAPI::removeClusterTransaction(&ryw->getTransaction(), clusterName));
} else {
ClusterNameRef endCluster = range.end().removePrefix(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER).begin);
if (endCluster.startsWith(submoduleRange.begin)) {
endCluster = endCluster.removePrefix(submoduleRange.begin);
} else {
endCluster = "\xff"_sr;
}
clusterManagementFutures.push_back(removeClusterRange(ryw, clusterName, endCluster));
}
}
}
return tag(waitForAll(clusterManagementFutures), Optional<std::string>());
}

View File

@ -171,6 +171,7 @@ public:
ERRORMSG, // A single key space contains a json string which describes the last error in special-key-space
GLOBALCONFIG, // Global configuration options synchronized to all nodes
MANAGEMENT, // Management-API
METACLUSTER, // Configuration for a metacluster
METRICS, // data-distribution metrics
TESTONLY, // only used by correctness tests
TRACING, // Distributed tracing options
@ -228,6 +229,9 @@ public:
static KeyRef getActorLineageApiCommandPrefix(const std::string& command) {
return actorLineageApiCommandToRange.at(command).begin;
}
static KeyRangeRef getMetaclusterApiCommandRange(const std::string& command) {
return metaclusterApiCommandToRange.at(command);
}
static Key getManagementApiCommandOptionSpecialKey(const std::string& command, const std::string& option);
static const std::set<std::string>& getManagementApiOptionsSet() { return options; }
static const std::set<std::string>& getTracingOptions() { return tracingOptions; }
@ -257,9 +261,10 @@ private:
static std::unordered_map<SpecialKeySpace::MODULE, KeyRange> moduleToBoundary;
// management command to its special keys' range
// module command to special keys range
static std::unordered_map<std::string, KeyRange> managementApiCommandToRange;
static std::unordered_map<std::string, KeyRange> actorLineageApiCommandToRange;
static std::unordered_map<std::string, KeyRange> metaclusterApiCommandToRange;
// "<command>/<option>"
static std::set<std::string> options;
@ -549,5 +554,16 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class DataClusterMapRangeImpl : public SpecialKeyRangeRWImpl {
public:
const static KeyRangeRef submoduleRange;
explicit DataClusterMapRangeImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -1408,6 +1408,26 @@ const KeyRef tenantMapPrivatePrefix = "\xff\xff/tenantMap/"_sr;
const KeyRef tenantLastIdKey = "\xff/tenantLastId/"_sr;
const KeyRef tenantDataPrefixKey = "\xff/tenantDataPrefix"_sr;
// Metacluster keys
const KeyRangeRef dataClusterMetadataKeys("\xff/metacluster/dataCluster/metadata/"_sr,
"\xff/metacluster/dataCluster/metadata0"_sr);
const KeyRef dataClusterMetadataPrefix = dataClusterMetadataKeys.begin;
const KeyRangeRef dataClusterConnectionRecordKeys("\xff/metacluster/dataCluster/connectionString/"_sr,
"\xff/metacluster/dataCluster/connectionString0"_sr);
const KeyRef dataClusterConnectionRecordPrefix = dataClusterConnectionRecordKeys.begin;
const KeyRef dataClusterLastIdKey = "\xff/metacluster/dataCluster/lastId/"_sr;
Value encodeDataClusterEntry(DataClusterEntry const& dataClusterEntry) {
return ObjectWriter::toValue(dataClusterEntry, IncludeVersion());
}
DataClusterEntry decodeDataClusterEntry(ValueRef const& value) {
DataClusterEntry entry;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(entry);
return entry;
}
// for tests
void testSSISerdes(StorageServerInterface const& ssi) {
printf("ssi=\nid=%s\nlocality=%s\nisTss=%s\ntssId=%s\nacceptingRequests=%s\naddress=%s\ngetValue=%s\n\n\n",

View File

@ -26,6 +26,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/BlobWorkerInterface.h" // TODO move the functions that depend on this out of here and into BlobWorkerInterface.h to remove this depdendency
#include "fdbclient/Metacluster.h"
#include "fdbclient/StorageServerInterface.h"
#include "Tenant.h"
@ -627,6 +628,16 @@ extern const KeyRef tenantDataPrefixKey;
Value encodeTenantEntry(TenantMapEntry const& tenantEntry);
TenantMapEntry decodeTenantEntry(ValueRef const& value);
// Metacluster keys
extern const KeyRangeRef dataClusterMetadataKeys;
extern const KeyRef dataClusterMetadataPrefix;
extern const KeyRangeRef dataClusterConnectionRecordKeys;
extern const KeyRef dataClusterConnectionRecordPrefix;
extern const KeyRef dataClusterLastIdKey;
Value encodeDataClusterEntry(DataClusterEntry const& dataClusterEntry);
DataClusterEntry decodeDataClusterEntry(ValueRef const& value);
#pragma clang diagnostic pop
#endif

View File

@ -232,6 +232,11 @@ ERROR( tenants_disabled, 2136, "Tenants have been disabled in the cluster");
ERROR( unknown_tenant, 2137, "Tenant is not available from this server")
ERROR( illegal_tenant_access, 2138, "Illegal tenant access")
ERROR( invalid_cluster_name, 2150, "Data cluster name cannot begin with \\xff" )
ERROR( invalid_metacluster_operation, 2151, "Metacluster operation performed on non-metacluster" )
ERROR( cluster_already_exists, 2152, "A data cluster with the given name already exists" )
ERROR( cluster_not_found, 2153, "Data cluster does not exist" )
// 2200 - errors from bindings and official APIs
ERROR( api_version_unset, 2200, "API version is not set" )
ERROR( api_version_already_set, 2201, "API version may be set only once" )

View File

@ -233,7 +233,7 @@ logdir = {logdir}
def create_database(self, storage="ssd", enable_tenants=True):
db_config = "configure new single {}".format(storage)
if enable_tenants:
db_config += " tenant_mode=optional_experimental"
db_config += " tenant_mode=optional"
if self.blob_granules_enabled:
db_config += " blob_granules_enabled:=1"
args = [self.fdbcli_binary, "-C", self.cluster_file, "--exec", db_config]