1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-25 08:40:05 +08:00

Create a special keys API to create, delete, and read tenants. Remove the C API to create/delete tenants.

This commit is contained in:
A.J. Beamon 2022-03-06 22:16:42 -08:00
parent 1e1098ca9a
commit 8bc2b283e1
11 changed files with 129 additions and 100 deletions

@ -449,14 +449,6 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db
}).extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_database_allocate_tenant(FDBDatabase* db, uint8_t const* name, int name_length) {
return (FDBFuture*)(DB(db)->createTenant(TenantNameRef(name, name_length)).extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_database_remove_tenant(FDBDatabase* db, uint8_t const* name, int name_length) {
return (FDBFuture*)(DB(db)->deleteTenant(TenantNameRef(name, name_length)).extractPtr());
}
extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction) {
CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr(););
}

@ -205,6 +205,7 @@ that process, and wait for necessary data to be moved away.
#. ``\xff\xff/management/failed_locality/<locality>`` Read/write. Indicates that the cluster should consider matching processes as permanently failed. This allows the cluster to avoid maintaining extra state and doing extra work in the hope that these processes come back. See :ref:`removing machines from a cluster <removing-machines-from-a-cluster>` for documentation for the corresponding fdbcli command.
#. ``\xff\xff/management/options/excluded_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/excluded_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/options/failed_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/tenant_map/<tenant>`` Read/write. Setting a key in this range to any value will result in a tenant being created with name `<tenant>`. Clearing a key in this range will delete the tenant with name `<tenant>`. Reading all or a portion of this range will return the list of tenants currently present in the cluster, excluding any created in this transaction. Values read in this range will be JSON objects containing the metadata for the associated tenants. Note: the tenants key-space does not support range clears.
An exclusion is syntactically either an ip address (e.g. ``127.0.0.1``), or
an ip address and port (e.g. ``127.0.0.1:4500``) or any locality (e.g ``locality_dcid:primary-satellite`` or

@ -630,7 +630,7 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db,
std::string generateErrorMessage(const CoordinatorsResult& res);
ACTOR template <class Transaction>
Future<Optional<TenantMapEntry>> tryGetTenantTransaction(Reference<Transaction> tr, TenantName name) {
Future<Optional<TenantMapEntry>> tryGetTenantTransaction(Transaction tr, TenantName name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
@ -656,7 +656,7 @@ Future<Optional<TenantMapEntry>> tryGetTenant(Reference<DB> db, TenantName name)
}
ACTOR template <class Transaction>
Future<TenantMapEntry> getTenantTransaction(Reference<Transaction> tr, TenantName name) {
Future<TenantMapEntry> getTenantTransaction(Transaction tr, TenantName name) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (!entry.present()) {
throw tenant_not_found();
@ -676,7 +676,7 @@ Future<TenantMapEntry> getTenant(Reference<DB> db, TenantName name) {
}
ACTOR template <class Transaction>
Future<Optional<TenantMapEntry>> createTenantTransaction(Reference<Transaction> tr, TenantName name) {
Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantNameRef name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
if (name.startsWith("\xff"_sr)) {
@ -758,7 +758,7 @@ Future<Void> createTenant(Reference<DB> db, TenantName name) {
}
ACTOR template <class Transaction>
Future<Void> deleteTenantTransaction(Reference<Transaction> tr, TenantName name) {
Future<Void> deleteTenantTransaction(Transaction tr, TenantNameRef name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -816,9 +816,9 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
}
ACTOR template <class Transaction>
Future<Standalone<VectorRef<TenantNameRef>>> listTenantsTransaction(Reference<Transaction> tr,
StringRef begin,
StringRef end,
Future<std::map<TenantName, TenantMapEntry>> listTenantsTransaction(Transaction tr,
TenantNameRef begin,
TenantNameRef end,
int limit) {
state KeyRange range = KeyRangeRef(begin, end).withPrefix(tenantMapPrefix);
@ -828,22 +828,24 @@ Future<Standalone<VectorRef<TenantNameRef>>> listTenantsTransaction(Reference<Tr
RangeResult results = wait(safeThreadFutureToFuture(
tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit)));
Standalone<VectorRef<TenantNameRef>> tenants;
std::map<TenantName, TenantMapEntry> tenants;
for (auto kv : results) {
tenants.push_back_deep(tenants.arena(), kv.key.removePrefix(tenantMapPrefix));
tenants[kv.key.removePrefix(tenantMapPrefix)] = decodeTenantEntry(kv.value);
}
return tenants;
}
ACTOR template <class DB>
Future<Standalone<VectorRef<TenantNameRef>>> listTenants(Reference<DB> db, StringRef begin, StringRef end, int limit) {
Future<std::map<TenantName, TenantMapEntry>> listTenants(Reference<DB> db,
TenantName begin,
TenantName end,
int limit) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state KeyRange range = KeyRangeRef(begin, end).withPrefix(tenantMapPrefix);
loop {
try {
Standalone<VectorRef<TenantNameRef>> tenants = wait(listTenantsTransaction(tr, begin, end, limit));
std::map<TenantName, TenantMapEntry> tenants = wait(listTenantsTransaction(tr, begin, end, limit));
return tenants;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));

@ -140,12 +140,6 @@ public:
virtual ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) = 0;
// Registers a tenant with the given name. A prefix is automatically allocated for the tenant.
virtual ThreadFuture<Void> createTenant(TenantNameRef const& tenantName) = 0;
// Deletes the tenant with the given name. The tenant must be empty.
virtual ThreadFuture<Void> deleteTenant(TenantNameRef const& tenantName) = 0;
virtual void addref() = 0;
virtual void delref() = 0;

@ -494,26 +494,6 @@ ThreadFuture<ProtocolVersion> DLDatabase::getServerProtocol(Optional<ProtocolVer
});
}
// Registers a tenant with the given name. A prefix is automatically allocated for the tenant.
ThreadFuture<Void> DLDatabase::createTenant(TenantNameRef const& tenantName) {
if (api->databaseAllocateTenant == nullptr) {
throw unsupported_operation();
}
FdbCApi::FDBFuture* f = api->databaseAllocateTenant(db, tenantName.begin(), tenantName.size());
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
// Deletes the tenant with the given name. The tenant must be empty.
ThreadFuture<Void> DLDatabase::deleteTenant(TenantNameRef const& tenantName) {
if (api->databaseRemoveTenant == nullptr) {
throw unsupported_operation();
}
FdbCApi::FDBFuture* f = api->databaseRemoveTenant(db, tenantName.begin(), tenantName.size());
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
// DLApi
// Loads the specified function from a dynamic library
@ -1413,22 +1393,6 @@ ThreadFuture<ProtocolVersion> MultiVersionDatabase::getServerProtocol(Optional<P
return dbState->versionMonitorDb->getServerProtocol(expectedVersion);
}
// Registers a tenant with the given name. A prefix is automatically allocated for the tenant.
ThreadFuture<Void> MultiVersionDatabase::createTenant(TenantNameRef const& tenantName) {
Standalone<StringRef> tenantNameCopy = tenantName;
Reference<MultiVersionDatabase> self = Reference<MultiVersionDatabase>::addRef(this);
return onMainThread([self, tenantNameCopy]() { return ManagementAPI::createTenant(self, tenantNameCopy); });
}
// Deletes the tenant with the given name. The tenant must be empty.
ThreadFuture<Void> MultiVersionDatabase::deleteTenant(TenantNameRef const& tenantName) {
Standalone<StringRef> tenantNameCopy = tenantName;
Reference<MultiVersionDatabase> self = Reference<MultiVersionDatabase>::addRef(this);
return onMainThread([self, tenantNameCopy]() { return ManagementAPI::deleteTenant(self, tenantNameCopy); });
}
MultiVersionDatabase::DatabaseState::DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb)
: dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(nullptr))),
clusterFilePath(clusterFilePath), versionMonitorDb(versionMonitorDb), closed(false) {}

@ -421,12 +421,6 @@ public:
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
// Registers a tenant with the given name. A prefix is automatically allocated for the tenant.
ThreadFuture<Void> createTenant(TenantNameRef const& tenantName) override;
// Deletes the tenant with the given name. The tenant must be empty.
ThreadFuture<Void> deleteTenant(TenantNameRef const& tenantName) override;
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
@ -686,12 +680,6 @@ public:
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
// Registers a tenant with the given name. A prefix is automatically allocated for the tenant.
ThreadFuture<Void> createTenant(TenantNameRef const& tenantName) override;
// Deletes the tenant with the given name. The tenant must be empty.
ThreadFuture<Void> deleteTenant(TenantNameRef const& tenantName) override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }

@ -1368,6 +1368,12 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
if (apiVersionAtLeast(710)) {
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TenantMapRangeImpl>(SpecialKeySpace::getManagementApiCommandRange("tenantmap")));
}
if (apiVersionAtLeast(700)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ERRORMSG,
SpecialKeySpace::IMPLTYPE::READONLY,
@ -4992,7 +4998,6 @@ Future<Version> Transaction::getRawReadVersion() {
Future<Void> Transaction::watch(Reference<Watch> watch) {
++trState->cx->transactionWatchRequests;
trState->cx->addWatch();
watches.push_back(watch);
return ::watch(watch,

@ -55,6 +55,8 @@ static bool isAlphaNumeric(const std::string& key) {
}
} // namespace
const KeyRangeRef TenantMapRangeImpl::submoduleRange = KeyRangeRef("tenant_map/"_sr, "tenant_map0"_sr);
std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToBoundary = {
{ SpecialKeySpace::MODULE::TRANSACTION,
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/"), LiteralStringRef("\xff\xff/transaction0")) },
@ -112,7 +114,8 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "datadistribution",
KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "tenantmap", TenantMapRangeImpl::submoduleRange.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
};
std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiCommandToRange = {
@ -2699,3 +2702,96 @@ Future<Optional<std::string>> FailedLocalitiesRangeImpl::commit(ReadYourWritesTr
// exclude locality with failed option as true.
return excludeLocalityCommitActor(ryw, true);
}
ACTOR Future<RangeResult> getTenantList(ReadYourWritesTransaction* ryw, KeyRangeRef kr, GetRangeLimits limitsHint) {
KeyRangeRef tenantRange =
kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
state KeyRef managementPrefix =
kr.begin.substr(0,
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() +
TenantMapRangeImpl::submoduleRange.begin.size());
std::map<TenantName, TenantMapEntry> tenants = wait(ManagementAPI::listTenantsTransaction(
Reference<ReadYourWritesTransaction>::addRef(ryw), tenantRange.begin, tenantRange.end, limitsHint.rows));
RangeResult results;
for (auto tenant : tenants) {
json_spirit::mObject tenantEntry;
tenantEntry["id"] = tenant.second.id;
tenantEntry["prefix"] = printable(tenant.second.prefix);
std::string tenantEntryString =
json_spirit::write_string(json_spirit::mValue(tenantEntry), json_spirit::Output_options::raw_utf8);
ValueRef tenantEntryBytes(results.arena(), tenantEntryString);
results.push_back(results.arena(),
KeyValueRef(tenant.first.withPrefix(managementPrefix, results.arena()), tenantEntryBytes));
}
return results;
}
TenantMapRangeImpl::TenantMapRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<RangeResult> TenantMapRangeImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
return getTenantList(ryw, kr, limitsHint);
}
ACTOR Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw, TenantName beginTenant, TenantName endTenant) {
std::map<TenantName, TenantMapEntry> tenants = wait(
ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->TOO_MANY));
if (tenants.size() == CLIENT_KNOBS->TOO_MANY) {
TraceEvent(SevWarn, "DeleteTenantRangeTooLange")
.detail("BeginTenant", beginTenant)
.detail("EndTenant", endTenant);
ryw->setSpecialKeySpaceErrorMsg("too many tenants to range delete");
throw special_keys_api_failure();
}
std::vector<Future<Void>> deleteFutures;
for (auto tenant : tenants) {
deleteFutures.push_back(ManagementAPI::deleteTenantTransaction(&ryw->getTransaction(), tenant.first));
}
wait(waitForAll(deleteFutures));
return Void();
}
Future<Optional<std::string>> TenantMapRangeImpl::commit(ReadYourWritesTransaction* ryw) {
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
std::vector<Future<Void>> tenantManagementFutures;
for (auto range : ranges) {
if (!range.value().first) {
continue;
}
TenantNameRef tenantName =
range.begin()
.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
if (range.value().second.present()) {
tenantManagementFutures.push_back(
success(ManagementAPI::createTenantTransaction(&ryw->getTransaction(), tenantName)));
} else {
// For a single key clear, just issue the delete
if (KeyRangeRef(range.begin(), range.end()).singleKeyRange()) {
tenantManagementFutures.push_back(
ManagementAPI::deleteTenantTransaction(&ryw->getTransaction(), tenantName));
} else {
TenantNameRef endTenant = range.end().removePrefix(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
if (endTenant.startsWith(submoduleRange.begin)) {
endTenant = endTenant.removePrefix(submoduleRange.end);
} else {
endTenant = "\xff"_sr;
}
tenantManagementFutures.push_back(deleteTenantRange(ryw, tenantName, endTenant));
}
}
}
return tag(waitForAll(tenantManagementFutures), Optional<std::string>());
}

@ -528,5 +528,16 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class TenantMapRangeImpl : public SpecialKeyRangeRWImpl {
public:
const static KeyRangeRef submoduleRange;
explicit TenantMapRangeImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
#include "flow/unactorcompiler.h"
#endif

@ -116,24 +116,6 @@ ThreadFuture<ProtocolVersion> ThreadSafeDatabase::getServerProtocol(Optional<Pro
[db, expectedVersion]() -> Future<ProtocolVersion> { return db->getClusterProtocol(expectedVersion); });
}
// Registers a tenant with the given name. A prefix is automatically allocated for the tenant.
ThreadFuture<Void> ThreadSafeDatabase::createTenant(TenantNameRef const& name) {
DatabaseContext* db = this->db;
TenantName tenantNameCopy = name;
return onMainThread([db, tenantNameCopy]() -> Future<Void> {
return ManagementAPI::createTenant(Reference<DatabaseContext>::addRef(db), tenantNameCopy);
});
}
// Deletes the tenant with the given name. The tenant must be empty.
ThreadFuture<Void> ThreadSafeDatabase::deleteTenant(TenantNameRef const& name) {
DatabaseContext* db = this->db;
TenantName tenantNameCopy = name;
return onMainThread([db, tenantNameCopy]() -> Future<Void> {
return ManagementAPI::deleteTenant(Reference<DatabaseContext>::addRef(db), tenantNameCopy);
});
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
ClusterConnectionFile* connFile =
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);

@ -47,12 +47,6 @@ public:
ThreadFuture<ProtocolVersion> getServerProtocol(
Optional<ProtocolVersion> expectedVersion = Optional<ProtocolVersion>()) override;
// Registers a tenant with the given name. A prefix is automatically allocated for the tenant.
ThreadFuture<Void> createTenant(TenantNameRef const& name) override;
// Deletes the tenant with the given name. The tenant must be empty.
ThreadFuture<Void> deleteTenant(TenantNameRef const& name) override;
// Returns after a majority of coordination servers are available and have reported a leader. The
// cluster file therefore is valid, but the database might be unavailable.
ThreadFuture<Void> onConnected();