From c4b0f6eaae03e54a0cb19cf7f0dd946d458ff19a Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" <aj.beamon@snowflake.com> Date: Thu, 7 Jul 2022 01:12:49 -0700 Subject: [PATCH] Add an internal C API to support connection to a cluster using a connection string (#7438) * Add an internal C API to support memory connection records * Track shared state in the client using a unique and immutable cluster ID from the cluster * Add missing code to store the clusterId in the database state object * Update some arguments to pass by const& --- bindings/c/fdb_c.cpp | 6 + bindings/c/foundationdb/fdb_c_internal.h | 3 + documentation/sphinx/source/special-keys.rst | 20 +- fdbclient/ClusterConnectionFile.actor.cpp | 10 + fdbclient/MultiVersionTransaction.actor.cpp | 176 ++++++++++++------ fdbclient/NativeAPI.actor.cpp | 25 ++- fdbclient/SpecialKeySpace.actor.cpp | 3 +- fdbclient/ThreadSafeTransaction.cpp | 20 +- .../include/fdbclient/ClusterConnectionFile.h | 5 + .../include/fdbclient/CommitProxyInterface.h | 3 +- .../include/fdbclient/CoordinationInterface.h | 5 + fdbclient/include/fdbclient/IClientApi.h | 4 +- .../fdbclient/MultiVersionTransaction.h | 94 ++++++++-- .../include/fdbclient/SpecialKeySpace.actor.h | 1 + .../include/fdbclient/ThreadSafeTransaction.h | 3 +- fdbserver/ClusterController.actor.cpp | 13 +- fdbserver/TLogServer.actor.cpp | 2 +- .../include/fdbserver/ServerDBInfo.actor.h | 4 +- 18 files changed, 285 insertions(+), 112 deletions(-) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index b8b4300019..c97604b98c 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -382,6 +382,12 @@ extern "C" DLLEXPORT fdb_error_t fdb_create_database(const char* cluster_file_pa return fdb_create_database_impl(cluster_file_path, out_database); } +extern "C" DLLEXPORT fdb_error_t fdb_create_database_from_connection_string(const char* connection_string, + FDBDatabase** out_database) { + CATCH_AND_RETURN(*out_database = + (FDBDatabase*)API->createDatabaseFromConnectionString(connection_string).extractPtr();); +} + extern "C" DLLEXPORT fdb_error_t fdb_database_set_option(FDBDatabase* d, FDBDatabaseOption option, uint8_t const* value, diff --git a/bindings/c/foundationdb/fdb_c_internal.h b/bindings/c/foundationdb/fdb_c_internal.h index f1897a598e..2b1a2163c7 100644 --- a/bindings/c/foundationdb/fdb_c_internal.h +++ b/bindings/c/foundationdb/fdb_c_internal.h @@ -46,6 +46,9 @@ DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedStat DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_shared_state(FDBFuture* f, DatabaseSharedState** outPtr); +DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_create_database_from_connection_string(const char* connection_string, + FDBDatabase** out_database); + #ifdef __cplusplus } #endif diff --git a/documentation/sphinx/source/special-keys.rst b/documentation/sphinx/source/special-keys.rst index 83ffeacac1..1a278d19b4 100644 --- a/documentation/sphinx/source/special-keys.rst +++ b/documentation/sphinx/source/special-keys.rst @@ -17,18 +17,22 @@ Users will also (by default) see a ``special_keys_cross_module_read`` error if t The error is to save the user from the surprise of seeing the behavior of multiple modules in the same read. Users may opt out of these restrictions by setting the ``special_key_space_relaxed`` transaction option. -Each special key that existed before api version 630 is its own module. These are +Each special key that existed before api version 630 is its own module. These are: -#. ``\xff\xff/cluster_file_path`` See :ref:`cluster file client access <cluster-file-client-access>` -#. ``\xff\xff/status/json`` See :doc:`Machine-readable status <mr-status>` +#. ``\xff\xff/cluster_file_path`` - See :ref:`cluster file client access <cluster-file-client-access>` +#. ``\xff\xff/status/json`` - See :doc:`Machine-readable status <mr-status>` -Prior to api version 630, it was also possible to read a range starting at -``\xff\xff/worker_interfaces``. This is mostly an implementation detail of fdbcli, +Prior to api version 630, it was also possible to read a range starting at ``\xff\xff/worker_interfaces``. This is mostly an implementation detail of fdbcli, but it's available in api version 630 as a module with prefix ``\xff\xff/worker_interfaces/``. -Api version 630 includes two new modules with prefixes -``\xff\xff/transaction/`` (information about the current transaction), and -``\xff\xff/metrics/`` (various metrics, not transactional). +Api version 630 includes two new modules: + +#. ``\xff\xff/transaction/`` - information about the current transaction +#. ``\xff\xff/metrics/`` - various metrics, not transactional + +Api version 720 includes one new module: + +#. ``\xff\xff/clusterId`` - returns an immutable unique ID for a cluster Transaction module ------------------ diff --git a/fdbclient/ClusterConnectionFile.actor.cpp b/fdbclient/ClusterConnectionFile.actor.cpp index 01ecba277c..59b1aabd18 100644 --- a/fdbclient/ClusterConnectionFile.actor.cpp +++ b/fdbclient/ClusterConnectionFile.actor.cpp @@ -40,6 +40,16 @@ ClusterConnectionFile::ClusterConnectionFile(std::string const& filename, Cluste cs = contents; } +// Creates a cluster file from the given filename. If the filename is empty, attempts to load the default +// cluster file instead. +Reference<ClusterConnectionFile> ClusterConnectionFile::openOrDefault(std::string const& filename) { + return makeReference<ClusterConnectionFile>(lookupClusterFileName(filename).first); +} + +Reference<ClusterConnectionFile> ClusterConnectionFile::openOrDefault(const char* filename) { + return openOrDefault(std::string(filename == nullptr ? "" : filename)); +} + // Sets the connections string held by this object and persists it. Future<Void> ClusterConnectionFile::setAndPersistConnectionString(ClusterConnectionString const& conn) { ASSERT(filename.size()); diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 57daade016..b08457a3ac 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -631,6 +631,11 @@ void DLApi::init() { loadClientFunction(&api->runNetwork, lib, fdbCPath, "fdb_run_network", headerVersion >= 0); loadClientFunction(&api->stopNetwork, lib, fdbCPath, "fdb_stop_network", headerVersion >= 0); loadClientFunction(&api->createDatabase, lib, fdbCPath, "fdb_create_database", headerVersion >= 610); + loadClientFunction(&api->createDatabaseFromConnectionString, + lib, + fdbCPath, + "fdb_create_database_from_connection_string", + headerVersion >= 720); loadClientFunction(&api->databaseOpenTenant, lib, fdbCPath, "fdb_database_open_tenant", headerVersion >= 710); loadClientFunction( @@ -864,6 +869,16 @@ Reference<IDatabase> DLApi::createDatabase(const char* clusterFilePath) { } } +Reference<IDatabase> DLApi::createDatabaseFromConnectionString(const char* connectionString) { + if (api->createDatabaseFromConnectionString == nullptr) { + throw unsupported_operation(); + } + + FdbCApi::FDBDatabase* db; + throwIfError(api->createDatabaseFromConnectionString(connectionString, &db)); + return Reference<IDatabase>(new DLDatabase(api, db)); +} + void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) { MutexHolder holder(lock); threadCompletionHooks.emplace_back(hook, hookParameter); @@ -1406,11 +1421,11 @@ void MultiVersionTenant::TenantState::close() { // MultiVersionDatabase MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, int threadIdx, - std::string clusterFilePath, + ClusterConnectionRecord const& connectionRecord, Reference<IDatabase> db, Reference<IDatabase> versionMonitorDb, bool openConnectors) - : dbState(new DatabaseState(clusterFilePath, versionMonitorDb)) { + : dbState(new DatabaseState(connectionRecord, versionMonitorDb)) { dbState->db = db; dbState->dbVar->set(db); if (openConnectors) { @@ -1420,7 +1435,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, api->runOnExternalClients(threadIdx, [this](Reference<ClientInfo> client) { dbState->addClient(client); }); - api->runOnExternalClientsAllThreads([&clusterFilePath](Reference<ClientInfo> client) { + api->runOnExternalClientsAllThreads([&connectionRecord](Reference<ClientInfo> client) { // This creates a database to initialize some client state on the external library. // We only do this on 6.2+ clients to avoid some bugs associated with older versions. // This deletes the new database immediately to discard its connections. @@ -1430,7 +1445,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, // to run this initialization in case the other fails, and it's safe to run them in parallel. if (client->protocolVersion.hasCloseUnusedConnection() && !client->initialized) { try { - Reference<IDatabase> newDb = client->api->createDatabase(clusterFilePath.c_str()); + Reference<IDatabase> newDb = connectionRecord.createDatabase(client->api); client->initialized = true; } catch (Error& e) { // This connection is not initialized. It is still possible to connect with it, @@ -1439,23 +1454,23 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, TraceEvent(SevWarnAlways, "FailedToInitializeExternalClient") .error(e) .detail("LibraryPath", client->libPath) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); } } }); // For clients older than 6.2 we create and maintain our database connection - api->runOnExternalClients(threadIdx, [this, &clusterFilePath](Reference<ClientInfo> client) { + api->runOnExternalClients(threadIdx, [this, &connectionRecord](Reference<ClientInfo> client) { if (!client->protocolVersion.hasCloseUnusedConnection()) { try { dbState->legacyDatabaseConnections[client->protocolVersion] = - client->api->createDatabase(clusterFilePath.c_str()); + connectionRecord.createDatabase(client->api); } catch (Error& e) { // This connection is discarded TraceEvent(SevWarnAlways, "FailedToCreateLegacyDatabaseConnection") .error(e) .detail("LibraryPath", client->libPath) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); } } }); @@ -1472,7 +1487,8 @@ MultiVersionDatabase::~MultiVersionDatabase() { // Create a MultiVersionDatabase that wraps an already created IDatabase object // For internal use in testing Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) { - return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, 0, "", db, db, false)); + return Reference<IDatabase>(new MultiVersionDatabase( + MultiVersionApi::api, 0, ClusterConnectionRecord::fromConnectionString(""), db, db, false)); } Reference<ITenant> MultiVersionDatabase::openTenant(TenantNameRef tenantName) { @@ -1570,9 +1586,10 @@ ThreadFuture<ProtocolVersion> MultiVersionDatabase::getServerProtocol(Optional<P return dbState->versionMonitorDb->getServerProtocol(expectedVersion); } -MultiVersionDatabase::DatabaseState::DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb) +MultiVersionDatabase::DatabaseState::DatabaseState(ClusterConnectionRecord const& connectionRecord, + Reference<IDatabase> versionMonitorDb) : dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(nullptr))), - clusterFilePath(clusterFilePath), versionMonitorDb(versionMonitorDb), closed(false) {} + connectionRecord(connectionRecord), versionMonitorDb(versionMonitorDb), closed(false) {} // Adds a client (local or externally loaded) that can be used to connect to the cluster void MultiVersionDatabase::DatabaseState::addClient(Reference<ClientInfo> client) { @@ -1658,7 +1675,7 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion // When the protocol version changes, clear the corresponding entry in the shared state map // so it can be re-initialized. Only do so if there was a valid previous protocol version. if (dbProtocolVersion.present() && MultiVersionApi::apiVersionAtLeast(710)) { - MultiVersionApi::api->clearClusterSharedStateMapEntry(clusterFilePath, dbProtocolVersion.get()); + MultiVersionApi::api->clearClusterSharedStateMapEntry(clusterId, dbProtocolVersion.get()); } dbProtocolVersion = protocolVersion; @@ -1673,13 +1690,13 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion Reference<IDatabase> newDb; try { - newDb = client->api->createDatabase(clusterFilePath.c_str()); + newDb = connectionRecord.createDatabase(client->api); } catch (Error& e) { TraceEvent(SevWarnAlways, "MultiVersionClientFailedToCreateDatabase") .error(e) .detail("LibraryPath", client->libPath) .detail("External", client->external) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); // Put the client in a disconnected state until the version changes again updateDatabase(Reference<IDatabase>(), Reference<ClientInfo>()); @@ -1748,35 +1765,45 @@ void MultiVersionDatabase::DatabaseState::updateDatabase(Reference<IDatabase> ne } else { // For older clients that don't have an API to get the protocol version, we have to monitor it locally try { - versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str()); + versionMonitorDb = connectionRecord.createDatabase(MultiVersionApi::api->getLocalClient()->api); } catch (Error& e) { // We can't create a new database to monitor the cluster version. This means we will continue using the // previous one, which should hopefully continue to work. TraceEvent(SevWarnAlways, "FailedToCreateDatabaseForVersionMonitoring") .error(e) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); } } } else { // We don't have a database connection, so use the local client to monitor the protocol version db = Reference<IDatabase>(); try { - versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str()); + versionMonitorDb = connectionRecord.createDatabase(MultiVersionApi::api->getLocalClient()->api); } catch (Error& e) { // We can't create a new database to monitor the cluster version. This means we will continue using the // previous one, which should hopefully continue to work. TraceEvent(SevWarnAlways, "FailedToCreateDatabaseForVersionMonitoring") .error(e) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); } } if (db.isValid() && dbProtocolVersion.present() && MultiVersionApi::apiVersionAtLeast(710)) { - auto updateResult = - MultiVersionApi::api->updateClusterSharedStateMap(clusterFilePath, dbProtocolVersion.get(), db); - auto handler = mapThreadFuture<Void, Void>(updateResult, [this](ErrorOr<Void> result) { - TraceEvent("ClusterSharedStateUpdated").detail("ClusterFilePath", clusterFilePath); + Future<std::string> updateResult = + MultiVersionApi::api->updateClusterSharedStateMap(connectionRecord, dbProtocolVersion.get(), db); + sharedStateUpdater = map(errorOr(updateResult), [this](ErrorOr<std::string> result) { + if (result.present()) { + clusterId = result.get(); + TraceEvent("ClusterSharedStateUpdated") + .detail("ClusterId", result.get()) + .detail("ProtocolVersion", dbProtocolVersion.get()); + } else { + TraceEvent(SevWarnAlways, "ClusterSharedStateUpdateError") + .error(result.getError()) + .detail("ConnectionRecord", connectionRecord) + .detail("ProtocolVersion", dbProtocolVersion.get()); + } dbVar->set(db); - return ErrorOr<Void>(Void()); + return Void(); }); } else { dbVar->set(db); @@ -2376,14 +2403,12 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* } // Creates an IDatabase object that represents a connection to the cluster -Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath) { +Reference<IDatabase> MultiVersionApi::createDatabase(ClusterConnectionRecord const& connectionRecord) { lock.enter(); if (!networkSetup) { lock.leave(); throw network_not_setup(); } - std::string clusterFile(clusterFilePath); - if (localClientDisabled) { ASSERT(!bypassMultiClientApi); @@ -2391,23 +2416,32 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath nextThread = (nextThread + 1) % threadCount; lock.leave(); - Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath); + Reference<IDatabase> localDb = connectionRecord.createDatabase(localClient->api); return Reference<IDatabase>( - new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>(), localDb)); + new MultiVersionDatabase(this, threadIdx, connectionRecord, Reference<IDatabase>(), localDb)); } lock.leave(); ASSERT_LE(threadCount, 1); - Reference<IDatabase> localDb = localClient->api->createDatabase(clusterFilePath); + Reference<IDatabase> localDb = connectionRecord.createDatabase(localClient->api); if (bypassMultiClientApi) { return localDb; } else { - return Reference<IDatabase>(new MultiVersionDatabase(this, 0, clusterFile, Reference<IDatabase>(), localDb)); + return Reference<IDatabase>( + new MultiVersionDatabase(this, 0, connectionRecord, Reference<IDatabase>(), localDb)); } } +Reference<IDatabase> MultiVersionApi::createDatabase(const char* clusterFilePath) { + return createDatabase(ClusterConnectionRecord::fromFile(clusterFilePath)); +} + +Reference<IDatabase> MultiVersionApi::createDatabaseFromConnectionString(const char* connectionString) { + return createDatabase(ClusterConnectionRecord::fromConnectionString(connectionString)); +} + void MultiVersionApi::updateSupportedVersions() { if (networkSetup) { Standalone<VectorRef<uint8_t>> versionStr; @@ -2432,55 +2466,79 @@ void MultiVersionApi::updateSupportedVersions() { } } -ThreadFuture<Void> MultiVersionApi::updateClusterSharedStateMap(std::string clusterFilePath, - ProtocolVersion dbProtocolVersion, - Reference<IDatabase> db) { - MutexHolder holder(lock); - if (clusterSharedStateMap.find(clusterFilePath) == clusterSharedStateMap.end()) { +// Must be called from the main thread +ACTOR Future<std::string> updateClusterSharedStateMapImpl(MultiVersionApi* self, + ClusterConnectionRecord connectionRecord, + ProtocolVersion dbProtocolVersion, + Reference<IDatabase> db) { + // The cluster ID will be the connection record string (either a filename or the connection string itself) + // in API versions before we could read the cluster ID. + state std::string clusterId = connectionRecord.toString(); + if (MultiVersionApi::apiVersionAtLeast(720)) { + state Reference<ITransaction> tr = db->createTransaction(); + loop { + try { + state ThreadFuture<Optional<Value>> clusterIdFuture = tr->get("\xff\xff/cluster_id"_sr); + Optional<Value> clusterIdVal = wait(safeThreadFutureToFuture(clusterIdFuture)); + ASSERT(clusterIdVal.present()); + clusterId = clusterIdVal.get().toString(); + ASSERT(UID::fromString(clusterId).isValid()); + break; + } catch (Error& e) { + wait(safeThreadFutureToFuture(tr->onError(e))); + } + } + } + + if (self->clusterSharedStateMap.find(clusterId) == self->clusterSharedStateMap.end()) { TraceEvent("CreatingClusterSharedState") - .detail("ClusterFilePath", clusterFilePath) + .detail("ClusterId", clusterId) .detail("ProtocolVersion", dbProtocolVersion); - clusterSharedStateMap[clusterFilePath] = { db->createSharedState(), dbProtocolVersion }; + self->clusterSharedStateMap[clusterId] = { db->createSharedState(), dbProtocolVersion }; } else { - auto& sharedStateInfo = clusterSharedStateMap[clusterFilePath]; + auto& sharedStateInfo = self->clusterSharedStateMap[clusterId]; if (sharedStateInfo.protocolVersion != dbProtocolVersion) { // This situation should never happen, because we are connecting to the same cluster, // so the protocol version must be the same TraceEvent(SevError, "ClusterStateProtocolVersionMismatch") - .detail("ClusterFilePath", clusterFilePath) + .detail("ClusterId", clusterId) .detail("ProtocolVersionExpected", dbProtocolVersion) .detail("ProtocolVersionFound", sharedStateInfo.protocolVersion); - return Void(); + return clusterId; } + TraceEvent("SettingClusterSharedState") - .detail("ClusterFilePath", clusterFilePath) + .detail("ClusterId", clusterId) .detail("ProtocolVersion", dbProtocolVersion); - ThreadFuture<DatabaseSharedState*> entry = sharedStateInfo.sharedStateFuture; - return mapThreadFuture<DatabaseSharedState*, Void>(entry, [db](ErrorOr<DatabaseSharedState*> result) { - if (result.isError()) { - return ErrorOr<Void>(result.getError()); - } - auto ssPtr = result.get(); - db->setSharedState(ssPtr); - return ErrorOr<Void>(Void()); - }); + + state ThreadFuture<DatabaseSharedState*> entry = sharedStateInfo.sharedStateFuture; + DatabaseSharedState* sharedState = wait(safeThreadFutureToFuture(entry)); + db->setSharedState(sharedState); } - return Void(); + + return clusterId; } -void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterFilePath, ProtocolVersion dbProtocolVersion) { - MutexHolder holder(lock); - auto mapEntry = clusterSharedStateMap.find(clusterFilePath); - // It can be that other database instances on the same cluster path are already upgraded and thus +// Must be called from the main thread +Future<std::string> MultiVersionApi::updateClusterSharedStateMap(ClusterConnectionRecord const& connectionRecord, + ProtocolVersion dbProtocolVersion, + Reference<IDatabase> db) { + return updateClusterSharedStateMapImpl(this, connectionRecord, dbProtocolVersion, db); +} + +// Must be called from the main thread +void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterId, ProtocolVersion dbProtocolVersion) { + auto mapEntry = clusterSharedStateMap.find(clusterId); + // It can be that other database instances on the same cluster are already upgraded and thus // have cleared or even created a new shared object entry if (mapEntry == clusterSharedStateMap.end()) { - TraceEvent("ClusterSharedStateMapEntryNotFound").detail("ClusterFilePath", clusterFilePath); + TraceEvent("ClusterSharedStateMapEntryNotFound").detail("ClusterId", clusterId); return; } auto sharedStateInfo = mapEntry->second; if (sharedStateInfo.protocolVersion != dbProtocolVersion) { TraceEvent("ClusterSharedStateClearSkipped") - .detail("ClusterFilePath", clusterFilePath) + .detail("ClusterId", clusterId) .detail("ProtocolVersionExpected", dbProtocolVersion) .detail("ProtocolVersionFound", sharedStateInfo.protocolVersion); return; @@ -2488,9 +2546,7 @@ void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterFilePat auto ssPtr = sharedStateInfo.sharedStateFuture.get(); ssPtr->delRef(ssPtr); clusterSharedStateMap.erase(mapEntry); - TraceEvent("ClusterSharedStateCleared") - .detail("ClusterFilePath", clusterFilePath) - .detail("ProtocolVersion", dbProtocolVersion); + TraceEvent("ClusterSharedStateCleared").detail("ClusterId", clusterId).detail("ProtocolVersion", dbProtocolVersion); } std::vector<std::string> parseOptionValues(std::string valueStr) { diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 8f7f0fe946..e1c9d6e82c 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1416,6 +1416,13 @@ KeyRangeRef toRelativeRange(KeyRangeRef range, KeyRef prefix) { } } +ACTOR Future<UID> getClusterId(Database db) { + while (!db->clientInfo->get().clusterId.isValid()) { + wait(db->clientInfo->onChange()); + } + return db->clientInfo->get().clusterId; +} + DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord, Reference<AsyncVar<ClientDBInfo>> clientInfo, Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator, @@ -1497,6 +1504,21 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection globalConfig = std::make_unique<GlobalConfig>(this); if (apiVersionAtLeast(720)) { + registerSpecialKeysImpl( + SpecialKeySpace::MODULE::CLUSTERID, + SpecialKeySpace::IMPLTYPE::READONLY, + std::make_unique<SingleSpecialKeyImpl>( + LiteralStringRef("\xff\xff/cluster_id"), [](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> { + try { + if (ryw->getDatabase().getPtr()) { + return map(getClusterId(ryw->getDatabase()), + [](UID id) { return Optional<Value>(StringRef(id.toString())); }); + } + } catch (Error& e) { + return e; + } + return Optional<Value>(); + })); registerSpecialKeysImpl( SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE, @@ -2259,8 +2281,7 @@ Database Database::createDatabase(std::string connFileName, int apiVersion, IsInternal internal, LocalityData const& clientLocality) { - Reference<IClusterConnectionRecord> rccr = Reference<IClusterConnectionRecord>( - new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFileName).first)); + Reference<IClusterConnectionRecord> rccr = ClusterConnectionFile::openOrDefault(connFileName); return Database::createDatabase(rccr, apiVersion, internal, clientLocality); } diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index baa3803c08..85f237cb6d 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -78,7 +78,8 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) }, { SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF, KeyRangeRef(LiteralStringRef("\xff\xff/actor_profiler_conf/"), - LiteralStringRef("\xff\xff/actor_profiler_conf0")) } + LiteralStringRef("\xff\xff/actor_profiler_conf0")) }, + { SpecialKeySpace::MODULE::CLUSTERID, singleKeyRange(LiteralStringRef("\xff\xff/cluster_id")) }, }; std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandToRange = { diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index f2935c20bb..1dc5357572 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -20,6 +20,7 @@ #include "fdbclient/BlobGranuleFiles.h" #include "fdbclient/ClusterConnectionFile.h" +#include "fdbclient/ClusterConnectionMemoryRecord.h" #include "fdbclient/ThreadSafeTransaction.h" #include "fdbclient/DatabaseContext.h" #include "fdbclient/versions.h" @@ -142,19 +143,14 @@ ThreadFuture<Void> ThreadSafeDatabase::waitPurgeGranulesComplete(const KeyRef& p return onMainThread([db, key]() -> Future<Void> { return db->waitPurgeGranulesComplete(key); }); } -ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) { - ClusterConnectionFile* connFile = - new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first); - +ThreadSafeDatabase::ThreadSafeDatabase(Reference<IClusterConnectionRecord> connectionRecord, int apiVersion) { // Allocate memory for the Database from this thread (so the pointer is known for subsequent method calls) // but run its constructor on the main thread DatabaseContext* db = this->db = DatabaseContext::allocateOnForeignThread(); - onMainThreadVoid([db, connFile, apiVersion]() { + onMainThreadVoid([db, connectionRecord, apiVersion]() { try { - Database::createDatabase( - Reference<ClusterConnectionFile>(connFile), apiVersion, IsInternal::False, LocalityData(), db) - .extractPtr(); + Database::createDatabase(connectionRecord, apiVersion, IsInternal::False, LocalityData(), db).extractPtr(); } catch (Error& e) { new (db) DatabaseContext(e); } catch (...) { @@ -635,7 +631,13 @@ void ThreadSafeApi::stopNetwork() { } Reference<IDatabase> ThreadSafeApi::createDatabase(const char* clusterFilePath) { - return Reference<IDatabase>(new ThreadSafeDatabase(clusterFilePath, apiVersion)); + return Reference<IDatabase>( + new ThreadSafeDatabase(ClusterConnectionFile::openOrDefault(clusterFilePath), apiVersion)); +} + +Reference<IDatabase> ThreadSafeApi::createDatabaseFromConnectionString(const char* connectionString) { + return Reference<IDatabase>(new ThreadSafeDatabase( + makeReference<ClusterConnectionMemoryRecord>(ClusterConnectionString(connectionString)), apiVersion)); } void ThreadSafeApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) { diff --git a/fdbclient/include/fdbclient/ClusterConnectionFile.h b/fdbclient/include/fdbclient/ClusterConnectionFile.h index eded1726e3..43181b0343 100644 --- a/fdbclient/include/fdbclient/ClusterConnectionFile.h +++ b/fdbclient/include/fdbclient/ClusterConnectionFile.h @@ -33,6 +33,11 @@ public: // Creates a cluster file with a given connection string and saves it to the specified file. explicit ClusterConnectionFile(std::string const& filename, ClusterConnectionString const& contents); + // Creates a cluster file from the given filename. If the filename is empty, attempts to load the default + // cluster file instead. + static Reference<ClusterConnectionFile> openOrDefault(std::string const& filename); + static Reference<ClusterConnectionFile> openOrDefault(const char* filename); + // Sets the connections string held by this object and persists it. Future<Void> setAndPersistConnectionString(ClusterConnectionString const&) override; diff --git a/fdbclient/include/fdbclient/CommitProxyInterface.h b/fdbclient/include/fdbclient/CommitProxyInterface.h index 149e77521d..9a8095f808 100644 --- a/fdbclient/include/fdbclient/CommitProxyInterface.h +++ b/fdbclient/include/fdbclient/CommitProxyInterface.h @@ -116,6 +116,7 @@ struct ClientDBInfo { firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk Optional<Value> forward; std::vector<VersionHistory> history; + UID clusterId; TenantMode tenantMode; @@ -129,7 +130,7 @@ struct ClientDBInfo { if constexpr (!is_fb_function<Archive>) { ASSERT(ar.protocolVersion().isValid()); } - serializer(ar, grvProxies, commitProxies, id, forward, history, tenantMode); + serializer(ar, grvProxies, commitProxies, id, forward, history, tenantMode, clusterId); } }; diff --git a/fdbclient/include/fdbclient/CoordinationInterface.h b/fdbclient/include/fdbclient/CoordinationInterface.h index c94a7498cc..7ccaf9170a 100644 --- a/fdbclient/include/fdbclient/CoordinationInterface.h +++ b/fdbclient/include/fdbclient/CoordinationInterface.h @@ -157,6 +157,11 @@ private: bool connectionStringNeedsPersisted; }; +template <> +struct Traceable<IClusterConnectionRecord> : std::true_type { + static std::string toString(IClusterConnectionRecord const& record) { return record.toString(); } +}; + struct LeaderInfo { constexpr static FileIdentifier file_identifier = 8338794; // The first 7 bits of changeID represent cluster controller process class fitness, the lower the better diff --git a/fdbclient/include/fdbclient/IClientApi.h b/fdbclient/include/fdbclient/IClientApi.h index 8927286005..73e743d060 100644 --- a/fdbclient/include/fdbclient/IClientApi.h +++ b/fdbclient/include/fdbclient/IClientApi.h @@ -20,14 +20,13 @@ #ifndef FDBCLIENT_ICLIENTAPI_H #define FDBCLIENT_ICLIENTAPI_H -#include "flow/ProtocolVersion.h" #pragma once #include "fdbclient/FDBOptions.g.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/Tenant.h" - #include "fdbclient/Tracing.h" +#include "flow/ProtocolVersion.h" #include "flow/ThreadHelper.actor.h" struct VersionVector; @@ -199,6 +198,7 @@ public: virtual void stopNetwork() = 0; virtual Reference<IDatabase> createDatabase(const char* clusterFilePath) = 0; + virtual Reference<IDatabase> createDatabaseFromConnectionString(const char* connectionString) = 0; virtual void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) = 0; }; diff --git a/fdbclient/include/fdbclient/MultiVersionTransaction.h b/fdbclient/include/fdbclient/MultiVersionTransaction.h index c20b76f8b2..4a59872c23 100644 --- a/fdbclient/include/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/include/fdbclient/MultiVersionTransaction.h @@ -20,14 +20,13 @@ #ifndef FDBCLIENT_MULTIVERSIONTRANSACTION_H #define FDBCLIENT_MULTIVERSIONTRANSACTION_H -#include "flow/ProtocolVersion.h" #pragma once #include "fdbclient/fdb_c_options.g.h" #include "fdbclient/FDBOptions.g.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/IClientApi.h" - +#include "flow/ProtocolVersion.h" #include "flow/ThreadHelper.actor.h" // FdbCApi is used as a wrapper around the FoundationDB C API that gets loaded from an external client library. @@ -128,6 +127,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> { fdb_error_t (*runNetwork)(); fdb_error_t (*stopNetwork)(); fdb_error_t (*createDatabase)(const char* clusterFilePath, FDBDatabase** db); + fdb_error_t (*createDatabaseFromConnectionString)(const char* connectionString, FDBDatabase** db); // Database fdb_error_t (*databaseOpenTenant)(FDBDatabase* database, @@ -498,9 +498,11 @@ public: void runNetwork() override; void stopNetwork() override; - Reference<IDatabase> createDatabase(const char* clusterFilePath) override; + Reference<IDatabase> createDatabase(const char* clusterFile) override; Reference<IDatabase> createDatabase609(const char* clusterFilePath); // legacy database creation + Reference<IDatabase> createDatabaseFromConnectionString(const char* connectionString) override; + void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override; private: @@ -722,6 +724,57 @@ public: Reference<TenantState> tenantState; }; +class ClusterConnectionRecord { +private: + enum class Type { FILE, CONNECTION_STRING }; + ClusterConnectionRecord(Type type, std::string const& recordStr) : type(type), recordStr(recordStr) {} + + Type type; + std::string recordStr; + +public: + static ClusterConnectionRecord fromFile(std::string const& clusterFilePath) { + return ClusterConnectionRecord(Type::FILE, clusterFilePath); + } + + static ClusterConnectionRecord fromConnectionString(std::string const& connectionString) { + return ClusterConnectionRecord(Type::CONNECTION_STRING, connectionString); + } + + Reference<IDatabase> createDatabase(IClientApi* api) const { + switch (type) { + case Type::FILE: + return api->createDatabase(recordStr.c_str()); + case Type::CONNECTION_STRING: + return api->createDatabaseFromConnectionString(recordStr.c_str()); + default: + ASSERT(false); + throw internal_error(); + } + } + + std::string toString() const { + switch (type) { + case Type::FILE: + if (recordStr.empty()) { + return "default file"; + } else { + return "file: " + recordStr; + } + case Type::CONNECTION_STRING: + return "connection string: " + recordStr; + default: + ASSERT(false); + throw internal_error(); + } + } +}; + +template <> +struct Traceable<ClusterConnectionRecord> : std::true_type { + static std::string toString(ClusterConnectionRecord const& connectionRecord) { return connectionRecord.toString(); } +}; + // An implementation of IDatabase that wraps a database created either locally or through a dynamically loaded // external client. The MultiVersionDatabase monitors the protocol version of the cluster and automatically // replaces the wrapped database when the protocol version changes. @@ -729,7 +782,7 @@ class MultiVersionDatabase final : public IDatabase, ThreadSafeReferenceCounted< public: MultiVersionDatabase(MultiVersionApi* api, int threadIdx, - std::string clusterFilePath, + ClusterConnectionRecord const& connectionRecord, Reference<IDatabase> db, Reference<IDatabase> versionMonitorDb, bool openConnectors = true); @@ -771,7 +824,7 @@ public: // A struct that manages the current connection state of the MultiVersionDatabase. This wraps the underlying // IDatabase object that is currently interacting with the cluster. struct DatabaseState : ThreadSafeReferenceCounted<DatabaseState> { - DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb); + DatabaseState(ClusterConnectionRecord const& connectionRecord, Reference<IDatabase> versionMonitorDb); // Replaces the active database connection with a new one. Must be called from the main thread. void updateDatabase(Reference<IDatabase> newDb, Reference<ClientInfo> client); @@ -796,7 +849,8 @@ public: Reference<IDatabase> db; const Reference<ThreadSafeAsyncVar<Reference<IDatabase>>> dbVar; - std::string clusterFilePath; + ClusterConnectionRecord connectionRecord; + std::string clusterId; // Used to monitor the cluster protocol version. Will be the same as db unless we have either not connected // yet or if the client version associated with db does not support protocol monitoring. In those cases, @@ -809,6 +863,8 @@ public: ThreadFuture<Void> dbReady; ThreadFuture<Void> protocolVersionMonitor; + Future<Void> sharedStateUpdater; + // Versions older than 6.1 do not benefit from having their database connections closed. Additionally, // there are various issues that result in negative behavior in some cases if the connections are closed. // Therefore, we leave them open. @@ -873,7 +929,10 @@ public: void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override; // Creates an IDatabase object that represents a connection to the cluster + Reference<IDatabase> createDatabase(ClusterConnectionRecord const& connectionRecord); Reference<IDatabase> createDatabase(const char* clusterFilePath) override; + Reference<IDatabase> createDatabaseFromConnectionString(const char* connectionString) override; + static MultiVersionApi* api; Reference<ClientInfo> getLocalClient(); @@ -886,10 +945,18 @@ public: bool callbackOnMainThread; bool localClientDisabled; - ThreadFuture<Void> updateClusterSharedStateMap(std::string clusterFilePath, - ProtocolVersion dbProtocolVersion, - Reference<IDatabase> db); - void clearClusterSharedStateMapEntry(std::string clusterFilePath, ProtocolVersion dbProtocolVersion); + Future<std::string> updateClusterSharedStateMap(ClusterConnectionRecord const& connectionRecord, + ProtocolVersion dbProtocolVersion, + Reference<IDatabase> db); + void clearClusterSharedStateMapEntry(std::string clusterId, ProtocolVersion dbProtocolVersion); + + // Map of cluster ID -> DatabaseSharedState pointer Future + // Upon cluster version upgrade, clear the map entry for that cluster + struct SharedStateInfo { + ThreadFuture<DatabaseSharedState*> sharedStateFuture; + ProtocolVersion protocolVersion; + }; + std::map<std::string, SharedStateInfo> clusterSharedStateMap; static bool apiVersionAtLeast(int minVersion); @@ -913,13 +980,6 @@ private: Reference<ClientInfo> localClient; std::map<std::string, ClientDesc> externalClientDescriptions; std::map<std::string, std::vector<Reference<ClientInfo>>> externalClients; - // Map of clusterFilePath -> DatabaseSharedState pointer Future - // Upon cluster version upgrade, clear the map entry for that cluster - struct SharedStateInfo { - ThreadFuture<DatabaseSharedState*> sharedStateFuture; - ProtocolVersion protocolVersion; - }; - std::map<std::string, SharedStateInfo> clusterSharedStateMap; bool networkStartSetup; volatile bool networkSetup; diff --git a/fdbclient/include/fdbclient/SpecialKeySpace.actor.h b/fdbclient/include/fdbclient/SpecialKeySpace.actor.h index fcff6dbb0e..3a2c7f6b83 100644 --- a/fdbclient/include/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/include/fdbclient/SpecialKeySpace.actor.h @@ -166,6 +166,7 @@ public: ACTORLINEAGE, // Sampling data ACTOR_PROFILER_CONF, // profiler configuration CLUSTERFILEPATH, + CLUSTERID, // An immutable UID for a cluster CONFIGURATION, // Configuration of the cluster CONNECTIONSTRING, ERRORMSG, // A single key space contains a json string which describes the last error in special-key-space diff --git a/fdbclient/include/fdbclient/ThreadSafeTransaction.h b/fdbclient/include/fdbclient/ThreadSafeTransaction.h index 687a231e3e..875664ea76 100644 --- a/fdbclient/include/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/include/fdbclient/ThreadSafeTransaction.h @@ -72,7 +72,7 @@ private: DatabaseContext* db; public: // Internal use only - ThreadSafeDatabase(std::string connFilename, int apiVersion); + ThreadSafeDatabase(Reference<IClusterConnectionRecord> connectionRecord, int apiVersion); ThreadSafeDatabase(DatabaseContext* db) : db(db) {} DatabaseContext* unsafeGetPtr() const { return db; } }; @@ -212,6 +212,7 @@ public: void stopNetwork() override; Reference<IDatabase> createDatabase(const char* clusterFilePath) override; + Reference<IDatabase> createDatabaseFromConnectionString(const char* connectionString) override; void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 75babbb99a..b2bfa65529 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -250,6 +250,7 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster, dbInfo.myLocality = db->serverInfo->get().myLocality; dbInfo.client = ClientDBInfo(); dbInfo.client.tenantMode = db->config.tenantMode; + dbInfo.client.clusterId = db->serverInfo->get().client.clusterId; TraceEvent("CCWDB", cluster->id) .detail("NewMaster", dbInfo.master.id().toString()) @@ -1022,7 +1023,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co // Construct the client information if (db->clientInfo->get().commitProxies != req.commitProxies || db->clientInfo->get().grvProxies != req.grvProxies || - db->clientInfo->get().tenantMode != db->config.tenantMode) { + db->clientInfo->get().tenantMode != db->config.tenantMode || db->clientInfo->get().clusterId != req.clusterId) { TraceEvent("PublishNewClientInfo", self->id) .detail("Master", dbInfo.master.id()) .detail("GrvProxies", db->clientInfo->get().grvProxies) @@ -1030,7 +1031,9 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co .detail("CommitProxies", db->clientInfo->get().commitProxies) .detail("ReqCPs", req.commitProxies) .detail("TenantMode", db->clientInfo->get().tenantMode.toString()) - .detail("ReqTenantMode", db->config.tenantMode.toString()); + .detail("ReqTenantMode", db->config.tenantMode.toString()) + .detail("ClusterId", db->clientInfo->get().clusterId) + .detail("ReqClusterId", req.clusterId); isChanged = true; // TODO why construct a new one and not just copy the old one and change proxies + id? ClientDBInfo clientInfo; @@ -1038,6 +1041,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co clientInfo.commitProxies = req.commitProxies; clientInfo.grvProxies = req.grvProxies; clientInfo.tenantMode = db->config.tenantMode; + clientInfo.clusterId = req.clusterId; db->clientInfo->set(clientInfo); dbInfo.client = db->clientInfo->get(); } @@ -1057,11 +1061,6 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co dbInfo.recoveryCount = req.recoveryCount; } - if (dbInfo.clusterId != req.clusterId) { - isChanged = true; - dbInfo.clusterId = req.clusterId; - } - if (isChanged) { dbInfo.id = deterministicRandom()->randomUniqueID(); dbInfo.infoGeneration = ++self->db.dbInfoCount; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index f9dd88598d..c09acc6f79 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -2626,7 +2626,7 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self, } // Persist cluster ID once cluster has recovered. - auto ccClusterId = self->dbInfo->get().clusterId; + auto ccClusterId = self->dbInfo->get().client.clusterId; if (self->dbInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED && !self->durableClusterId.isValid()) { ASSERT(ccClusterId.isValid()); diff --git a/fdbserver/include/fdbserver/ServerDBInfo.actor.h b/fdbserver/include/fdbserver/ServerDBInfo.actor.h index 9a1bd30de6..1a7a9a4211 100644 --- a/fdbserver/include/fdbserver/ServerDBInfo.actor.h +++ b/fdbserver/include/fdbserver/ServerDBInfo.actor.h @@ -65,7 +65,6 @@ struct ServerDBInfo { // which need to stay alive in case this recovery fails Optional<LatencyBandConfig> latencyBandConfig; int64_t infoGeneration; - UID clusterId; ServerDBInfo() : recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED), logSystemConfig(0), infoGeneration(0) {} @@ -91,8 +90,7 @@ struct ServerDBInfo { logSystemConfig, priorCommittedLogServers, latencyBandConfig, - infoGeneration, - clusterId); + infoGeneration); } };