diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index b8b4300019..c97604b98c 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -382,6 +382,12 @@ extern "C" DLLEXPORT fdb_error_t fdb_create_database(const char* cluster_file_pa return fdb_create_database_impl(cluster_file_path, out_database); } +extern "C" DLLEXPORT fdb_error_t fdb_create_database_from_connection_string(const char* connection_string, + FDBDatabase** out_database) { + CATCH_AND_RETURN(*out_database = + (FDBDatabase*)API->createDatabaseFromConnectionString(connection_string).extractPtr();); +} + extern "C" DLLEXPORT fdb_error_t fdb_database_set_option(FDBDatabase* d, FDBDatabaseOption option, uint8_t const* value, diff --git a/bindings/c/foundationdb/fdb_c_internal.h b/bindings/c/foundationdb/fdb_c_internal.h index f1897a598e..2b1a2163c7 100644 --- a/bindings/c/foundationdb/fdb_c_internal.h +++ b/bindings/c/foundationdb/fdb_c_internal.h @@ -46,6 +46,9 @@ DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedStat DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_shared_state(FDBFuture* f, DatabaseSharedState** outPtr); +DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_create_database_from_connection_string(const char* connection_string, + FDBDatabase** out_database); + #ifdef __cplusplus } #endif diff --git a/documentation/sphinx/source/special-keys.rst b/documentation/sphinx/source/special-keys.rst index 83ffeacac1..1a278d19b4 100644 --- a/documentation/sphinx/source/special-keys.rst +++ b/documentation/sphinx/source/special-keys.rst @@ -17,18 +17,22 @@ Users will also (by default) see a ``special_keys_cross_module_read`` error if t The error is to save the user from the surprise of seeing the behavior of multiple modules in the same read. Users may opt out of these restrictions by setting the ``special_key_space_relaxed`` transaction option. -Each special key that existed before api version 630 is its own module. These are +Each special key that existed before api version 630 is its own module. These are: -#. ``\xff\xff/cluster_file_path`` See :ref:`cluster file client access ` -#. ``\xff\xff/status/json`` See :doc:`Machine-readable status ` +#. ``\xff\xff/cluster_file_path`` - See :ref:`cluster file client access ` +#. ``\xff\xff/status/json`` - See :doc:`Machine-readable status ` -Prior to api version 630, it was also possible to read a range starting at -``\xff\xff/worker_interfaces``. This is mostly an implementation detail of fdbcli, +Prior to api version 630, it was also possible to read a range starting at ``\xff\xff/worker_interfaces``. This is mostly an implementation detail of fdbcli, but it's available in api version 630 as a module with prefix ``\xff\xff/worker_interfaces/``. -Api version 630 includes two new modules with prefixes -``\xff\xff/transaction/`` (information about the current transaction), and -``\xff\xff/metrics/`` (various metrics, not transactional). +Api version 630 includes two new modules: + +#. ``\xff\xff/transaction/`` - information about the current transaction +#. ``\xff\xff/metrics/`` - various metrics, not transactional + +Api version 720 includes one new module: + +#. ``\xff\xff/clusterId`` - returns an immutable unique ID for a cluster Transaction module ------------------ diff --git a/fdbclient/ClusterConnectionFile.actor.cpp b/fdbclient/ClusterConnectionFile.actor.cpp index 01ecba277c..59b1aabd18 100644 --- a/fdbclient/ClusterConnectionFile.actor.cpp +++ b/fdbclient/ClusterConnectionFile.actor.cpp @@ -40,6 +40,16 @@ ClusterConnectionFile::ClusterConnectionFile(std::string const& filename, Cluste cs = contents; } +// Creates a cluster file from the given filename. If the filename is empty, attempts to load the default +// cluster file instead. +Reference ClusterConnectionFile::openOrDefault(std::string const& filename) { + return makeReference(lookupClusterFileName(filename).first); +} + +Reference ClusterConnectionFile::openOrDefault(const char* filename) { + return openOrDefault(std::string(filename == nullptr ? "" : filename)); +} + // Sets the connections string held by this object and persists it. Future ClusterConnectionFile::setAndPersistConnectionString(ClusterConnectionString const& conn) { ASSERT(filename.size()); diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 57daade016..b08457a3ac 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -631,6 +631,11 @@ void DLApi::init() { loadClientFunction(&api->runNetwork, lib, fdbCPath, "fdb_run_network", headerVersion >= 0); loadClientFunction(&api->stopNetwork, lib, fdbCPath, "fdb_stop_network", headerVersion >= 0); loadClientFunction(&api->createDatabase, lib, fdbCPath, "fdb_create_database", headerVersion >= 610); + loadClientFunction(&api->createDatabaseFromConnectionString, + lib, + fdbCPath, + "fdb_create_database_from_connection_string", + headerVersion >= 720); loadClientFunction(&api->databaseOpenTenant, lib, fdbCPath, "fdb_database_open_tenant", headerVersion >= 710); loadClientFunction( @@ -864,6 +869,16 @@ Reference DLApi::createDatabase(const char* clusterFilePath) { } } +Reference DLApi::createDatabaseFromConnectionString(const char* connectionString) { + if (api->createDatabaseFromConnectionString == nullptr) { + throw unsupported_operation(); + } + + FdbCApi::FDBDatabase* db; + throwIfError(api->createDatabaseFromConnectionString(connectionString, &db)); + return Reference(new DLDatabase(api, db)); +} + void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) { MutexHolder holder(lock); threadCompletionHooks.emplace_back(hook, hookParameter); @@ -1406,11 +1421,11 @@ void MultiVersionTenant::TenantState::close() { // MultiVersionDatabase MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, int threadIdx, - std::string clusterFilePath, + ClusterConnectionRecord const& connectionRecord, Reference db, Reference versionMonitorDb, bool openConnectors) - : dbState(new DatabaseState(clusterFilePath, versionMonitorDb)) { + : dbState(new DatabaseState(connectionRecord, versionMonitorDb)) { dbState->db = db; dbState->dbVar->set(db); if (openConnectors) { @@ -1420,7 +1435,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, api->runOnExternalClients(threadIdx, [this](Reference client) { dbState->addClient(client); }); - api->runOnExternalClientsAllThreads([&clusterFilePath](Reference client) { + api->runOnExternalClientsAllThreads([&connectionRecord](Reference client) { // This creates a database to initialize some client state on the external library. // We only do this on 6.2+ clients to avoid some bugs associated with older versions. // This deletes the new database immediately to discard its connections. @@ -1430,7 +1445,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, // to run this initialization in case the other fails, and it's safe to run them in parallel. if (client->protocolVersion.hasCloseUnusedConnection() && !client->initialized) { try { - Reference newDb = client->api->createDatabase(clusterFilePath.c_str()); + Reference newDb = connectionRecord.createDatabase(client->api); client->initialized = true; } catch (Error& e) { // This connection is not initialized. It is still possible to connect with it, @@ -1439,23 +1454,23 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, TraceEvent(SevWarnAlways, "FailedToInitializeExternalClient") .error(e) .detail("LibraryPath", client->libPath) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); } } }); // For clients older than 6.2 we create and maintain our database connection - api->runOnExternalClients(threadIdx, [this, &clusterFilePath](Reference client) { + api->runOnExternalClients(threadIdx, [this, &connectionRecord](Reference client) { if (!client->protocolVersion.hasCloseUnusedConnection()) { try { dbState->legacyDatabaseConnections[client->protocolVersion] = - client->api->createDatabase(clusterFilePath.c_str()); + connectionRecord.createDatabase(client->api); } catch (Error& e) { // This connection is discarded TraceEvent(SevWarnAlways, "FailedToCreateLegacyDatabaseConnection") .error(e) .detail("LibraryPath", client->libPath) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); } } }); @@ -1472,7 +1487,8 @@ MultiVersionDatabase::~MultiVersionDatabase() { // Create a MultiVersionDatabase that wraps an already created IDatabase object // For internal use in testing Reference MultiVersionDatabase::debugCreateFromExistingDatabase(Reference db) { - return Reference(new MultiVersionDatabase(MultiVersionApi::api, 0, "", db, db, false)); + return Reference(new MultiVersionDatabase( + MultiVersionApi::api, 0, ClusterConnectionRecord::fromConnectionString(""), db, db, false)); } Reference MultiVersionDatabase::openTenant(TenantNameRef tenantName) { @@ -1570,9 +1586,10 @@ ThreadFuture MultiVersionDatabase::getServerProtocol(Optional

versionMonitorDb->getServerProtocol(expectedVersion); } -MultiVersionDatabase::DatabaseState::DatabaseState(std::string clusterFilePath, Reference versionMonitorDb) +MultiVersionDatabase::DatabaseState::DatabaseState(ClusterConnectionRecord const& connectionRecord, + Reference versionMonitorDb) : dbVar(new ThreadSafeAsyncVar>(Reference(nullptr))), - clusterFilePath(clusterFilePath), versionMonitorDb(versionMonitorDb), closed(false) {} + connectionRecord(connectionRecord), versionMonitorDb(versionMonitorDb), closed(false) {} // Adds a client (local or externally loaded) that can be used to connect to the cluster void MultiVersionDatabase::DatabaseState::addClient(Reference client) { @@ -1658,7 +1675,7 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion // When the protocol version changes, clear the corresponding entry in the shared state map // so it can be re-initialized. Only do so if there was a valid previous protocol version. if (dbProtocolVersion.present() && MultiVersionApi::apiVersionAtLeast(710)) { - MultiVersionApi::api->clearClusterSharedStateMapEntry(clusterFilePath, dbProtocolVersion.get()); + MultiVersionApi::api->clearClusterSharedStateMapEntry(clusterId, dbProtocolVersion.get()); } dbProtocolVersion = protocolVersion; @@ -1673,13 +1690,13 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion Reference newDb; try { - newDb = client->api->createDatabase(clusterFilePath.c_str()); + newDb = connectionRecord.createDatabase(client->api); } catch (Error& e) { TraceEvent(SevWarnAlways, "MultiVersionClientFailedToCreateDatabase") .error(e) .detail("LibraryPath", client->libPath) .detail("External", client->external) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); // Put the client in a disconnected state until the version changes again updateDatabase(Reference(), Reference()); @@ -1748,35 +1765,45 @@ void MultiVersionDatabase::DatabaseState::updateDatabase(Reference ne } else { // For older clients that don't have an API to get the protocol version, we have to monitor it locally try { - versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str()); + versionMonitorDb = connectionRecord.createDatabase(MultiVersionApi::api->getLocalClient()->api); } catch (Error& e) { // We can't create a new database to monitor the cluster version. This means we will continue using the // previous one, which should hopefully continue to work. TraceEvent(SevWarnAlways, "FailedToCreateDatabaseForVersionMonitoring") .error(e) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); } } } else { // We don't have a database connection, so use the local client to monitor the protocol version db = Reference(); try { - versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str()); + versionMonitorDb = connectionRecord.createDatabase(MultiVersionApi::api->getLocalClient()->api); } catch (Error& e) { // We can't create a new database to monitor the cluster version. This means we will continue using the // previous one, which should hopefully continue to work. TraceEvent(SevWarnAlways, "FailedToCreateDatabaseForVersionMonitoring") .error(e) - .detail("ClusterFilePath", clusterFilePath); + .detail("ConnectionRecord", connectionRecord); } } if (db.isValid() && dbProtocolVersion.present() && MultiVersionApi::apiVersionAtLeast(710)) { - auto updateResult = - MultiVersionApi::api->updateClusterSharedStateMap(clusterFilePath, dbProtocolVersion.get(), db); - auto handler = mapThreadFuture(updateResult, [this](ErrorOr result) { - TraceEvent("ClusterSharedStateUpdated").detail("ClusterFilePath", clusterFilePath); + Future updateResult = + MultiVersionApi::api->updateClusterSharedStateMap(connectionRecord, dbProtocolVersion.get(), db); + sharedStateUpdater = map(errorOr(updateResult), [this](ErrorOr result) { + if (result.present()) { + clusterId = result.get(); + TraceEvent("ClusterSharedStateUpdated") + .detail("ClusterId", result.get()) + .detail("ProtocolVersion", dbProtocolVersion.get()); + } else { + TraceEvent(SevWarnAlways, "ClusterSharedStateUpdateError") + .error(result.getError()) + .detail("ConnectionRecord", connectionRecord) + .detail("ProtocolVersion", dbProtocolVersion.get()); + } dbVar->set(db); - return ErrorOr(Void()); + return Void(); }); } else { dbVar->set(db); @@ -2376,14 +2403,12 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* } // Creates an IDatabase object that represents a connection to the cluster -Reference MultiVersionApi::createDatabase(const char* clusterFilePath) { +Reference MultiVersionApi::createDatabase(ClusterConnectionRecord const& connectionRecord) { lock.enter(); if (!networkSetup) { lock.leave(); throw network_not_setup(); } - std::string clusterFile(clusterFilePath); - if (localClientDisabled) { ASSERT(!bypassMultiClientApi); @@ -2391,23 +2416,32 @@ Reference MultiVersionApi::createDatabase(const char* clusterFilePath nextThread = (nextThread + 1) % threadCount; lock.leave(); - Reference localDb = localClient->api->createDatabase(clusterFilePath); + Reference localDb = connectionRecord.createDatabase(localClient->api); return Reference( - new MultiVersionDatabase(this, threadIdx, clusterFile, Reference(), localDb)); + new MultiVersionDatabase(this, threadIdx, connectionRecord, Reference(), localDb)); } lock.leave(); ASSERT_LE(threadCount, 1); - Reference localDb = localClient->api->createDatabase(clusterFilePath); + Reference localDb = connectionRecord.createDatabase(localClient->api); if (bypassMultiClientApi) { return localDb; } else { - return Reference(new MultiVersionDatabase(this, 0, clusterFile, Reference(), localDb)); + return Reference( + new MultiVersionDatabase(this, 0, connectionRecord, Reference(), localDb)); } } +Reference MultiVersionApi::createDatabase(const char* clusterFilePath) { + return createDatabase(ClusterConnectionRecord::fromFile(clusterFilePath)); +} + +Reference MultiVersionApi::createDatabaseFromConnectionString(const char* connectionString) { + return createDatabase(ClusterConnectionRecord::fromConnectionString(connectionString)); +} + void MultiVersionApi::updateSupportedVersions() { if (networkSetup) { Standalone> versionStr; @@ -2432,55 +2466,79 @@ void MultiVersionApi::updateSupportedVersions() { } } -ThreadFuture MultiVersionApi::updateClusterSharedStateMap(std::string clusterFilePath, - ProtocolVersion dbProtocolVersion, - Reference db) { - MutexHolder holder(lock); - if (clusterSharedStateMap.find(clusterFilePath) == clusterSharedStateMap.end()) { +// Must be called from the main thread +ACTOR Future updateClusterSharedStateMapImpl(MultiVersionApi* self, + ClusterConnectionRecord connectionRecord, + ProtocolVersion dbProtocolVersion, + Reference db) { + // The cluster ID will be the connection record string (either a filename or the connection string itself) + // in API versions before we could read the cluster ID. + state std::string clusterId = connectionRecord.toString(); + if (MultiVersionApi::apiVersionAtLeast(720)) { + state Reference tr = db->createTransaction(); + loop { + try { + state ThreadFuture> clusterIdFuture = tr->get("\xff\xff/cluster_id"_sr); + Optional clusterIdVal = wait(safeThreadFutureToFuture(clusterIdFuture)); + ASSERT(clusterIdVal.present()); + clusterId = clusterIdVal.get().toString(); + ASSERT(UID::fromString(clusterId).isValid()); + break; + } catch (Error& e) { + wait(safeThreadFutureToFuture(tr->onError(e))); + } + } + } + + if (self->clusterSharedStateMap.find(clusterId) == self->clusterSharedStateMap.end()) { TraceEvent("CreatingClusterSharedState") - .detail("ClusterFilePath", clusterFilePath) + .detail("ClusterId", clusterId) .detail("ProtocolVersion", dbProtocolVersion); - clusterSharedStateMap[clusterFilePath] = { db->createSharedState(), dbProtocolVersion }; + self->clusterSharedStateMap[clusterId] = { db->createSharedState(), dbProtocolVersion }; } else { - auto& sharedStateInfo = clusterSharedStateMap[clusterFilePath]; + auto& sharedStateInfo = self->clusterSharedStateMap[clusterId]; if (sharedStateInfo.protocolVersion != dbProtocolVersion) { // This situation should never happen, because we are connecting to the same cluster, // so the protocol version must be the same TraceEvent(SevError, "ClusterStateProtocolVersionMismatch") - .detail("ClusterFilePath", clusterFilePath) + .detail("ClusterId", clusterId) .detail("ProtocolVersionExpected", dbProtocolVersion) .detail("ProtocolVersionFound", sharedStateInfo.protocolVersion); - return Void(); + return clusterId; } + TraceEvent("SettingClusterSharedState") - .detail("ClusterFilePath", clusterFilePath) + .detail("ClusterId", clusterId) .detail("ProtocolVersion", dbProtocolVersion); - ThreadFuture entry = sharedStateInfo.sharedStateFuture; - return mapThreadFuture(entry, [db](ErrorOr result) { - if (result.isError()) { - return ErrorOr(result.getError()); - } - auto ssPtr = result.get(); - db->setSharedState(ssPtr); - return ErrorOr(Void()); - }); + + state ThreadFuture entry = sharedStateInfo.sharedStateFuture; + DatabaseSharedState* sharedState = wait(safeThreadFutureToFuture(entry)); + db->setSharedState(sharedState); } - return Void(); + + return clusterId; } -void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterFilePath, ProtocolVersion dbProtocolVersion) { - MutexHolder holder(lock); - auto mapEntry = clusterSharedStateMap.find(clusterFilePath); - // It can be that other database instances on the same cluster path are already upgraded and thus +// Must be called from the main thread +Future MultiVersionApi::updateClusterSharedStateMap(ClusterConnectionRecord const& connectionRecord, + ProtocolVersion dbProtocolVersion, + Reference db) { + return updateClusterSharedStateMapImpl(this, connectionRecord, dbProtocolVersion, db); +} + +// Must be called from the main thread +void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterId, ProtocolVersion dbProtocolVersion) { + auto mapEntry = clusterSharedStateMap.find(clusterId); + // It can be that other database instances on the same cluster are already upgraded and thus // have cleared or even created a new shared object entry if (mapEntry == clusterSharedStateMap.end()) { - TraceEvent("ClusterSharedStateMapEntryNotFound").detail("ClusterFilePath", clusterFilePath); + TraceEvent("ClusterSharedStateMapEntryNotFound").detail("ClusterId", clusterId); return; } auto sharedStateInfo = mapEntry->second; if (sharedStateInfo.protocolVersion != dbProtocolVersion) { TraceEvent("ClusterSharedStateClearSkipped") - .detail("ClusterFilePath", clusterFilePath) + .detail("ClusterId", clusterId) .detail("ProtocolVersionExpected", dbProtocolVersion) .detail("ProtocolVersionFound", sharedStateInfo.protocolVersion); return; @@ -2488,9 +2546,7 @@ void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterFilePat auto ssPtr = sharedStateInfo.sharedStateFuture.get(); ssPtr->delRef(ssPtr); clusterSharedStateMap.erase(mapEntry); - TraceEvent("ClusterSharedStateCleared") - .detail("ClusterFilePath", clusterFilePath) - .detail("ProtocolVersion", dbProtocolVersion); + TraceEvent("ClusterSharedStateCleared").detail("ClusterId", clusterId).detail("ProtocolVersion", dbProtocolVersion); } std::vector parseOptionValues(std::string valueStr) { diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 8f7f0fe946..e1c9d6e82c 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1416,6 +1416,13 @@ KeyRangeRef toRelativeRange(KeyRangeRef range, KeyRef prefix) { } } +ACTOR Future getClusterId(Database db) { + while (!db->clientInfo->get().clusterId.isValid()) { + wait(db->clientInfo->onChange()); + } + return db->clientInfo->get().clusterId; +} + DatabaseContext::DatabaseContext(Reference>> connectionRecord, Reference> clientInfo, Reference> const> coordinator, @@ -1497,6 +1504,21 @@ DatabaseContext::DatabaseContext(Reference(this); if (apiVersionAtLeast(720)) { + registerSpecialKeysImpl( + SpecialKeySpace::MODULE::CLUSTERID, + SpecialKeySpace::IMPLTYPE::READONLY, + std::make_unique( + LiteralStringRef("\xff\xff/cluster_id"), [](ReadYourWritesTransaction* ryw) -> Future> { + try { + if (ryw->getDatabase().getPtr()) { + return map(getClusterId(ryw->getDatabase()), + [](UID id) { return Optional(StringRef(id.toString())); }); + } + } catch (Error& e) { + return e; + } + return Optional(); + })); registerSpecialKeysImpl( SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE, @@ -2259,8 +2281,7 @@ Database Database::createDatabase(std::string connFileName, int apiVersion, IsInternal internal, LocalityData const& clientLocality) { - Reference rccr = Reference( - new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFileName).first)); + Reference rccr = ClusterConnectionFile::openOrDefault(connFileName); return Database::createDatabase(rccr, apiVersion, internal, clientLocality); } diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index baa3803c08..85f237cb6d 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -78,7 +78,8 @@ std::unordered_map SpecialKeySpace::moduleToB KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) }, { SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF, KeyRangeRef(LiteralStringRef("\xff\xff/actor_profiler_conf/"), - LiteralStringRef("\xff\xff/actor_profiler_conf0")) } + LiteralStringRef("\xff\xff/actor_profiler_conf0")) }, + { SpecialKeySpace::MODULE::CLUSTERID, singleKeyRange(LiteralStringRef("\xff\xff/cluster_id")) }, }; std::unordered_map SpecialKeySpace::managementApiCommandToRange = { diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index f2935c20bb..1dc5357572 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -20,6 +20,7 @@ #include "fdbclient/BlobGranuleFiles.h" #include "fdbclient/ClusterConnectionFile.h" +#include "fdbclient/ClusterConnectionMemoryRecord.h" #include "fdbclient/ThreadSafeTransaction.h" #include "fdbclient/DatabaseContext.h" #include "fdbclient/versions.h" @@ -142,19 +143,14 @@ ThreadFuture ThreadSafeDatabase::waitPurgeGranulesComplete(const KeyRef& p return onMainThread([db, key]() -> Future { return db->waitPurgeGranulesComplete(key); }); } -ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) { - ClusterConnectionFile* connFile = - new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first); - +ThreadSafeDatabase::ThreadSafeDatabase(Reference connectionRecord, int apiVersion) { // Allocate memory for the Database from this thread (so the pointer is known for subsequent method calls) // but run its constructor on the main thread DatabaseContext* db = this->db = DatabaseContext::allocateOnForeignThread(); - onMainThreadVoid([db, connFile, apiVersion]() { + onMainThreadVoid([db, connectionRecord, apiVersion]() { try { - Database::createDatabase( - Reference(connFile), apiVersion, IsInternal::False, LocalityData(), db) - .extractPtr(); + Database::createDatabase(connectionRecord, apiVersion, IsInternal::False, LocalityData(), db).extractPtr(); } catch (Error& e) { new (db) DatabaseContext(e); } catch (...) { @@ -635,7 +631,13 @@ void ThreadSafeApi::stopNetwork() { } Reference ThreadSafeApi::createDatabase(const char* clusterFilePath) { - return Reference(new ThreadSafeDatabase(clusterFilePath, apiVersion)); + return Reference( + new ThreadSafeDatabase(ClusterConnectionFile::openOrDefault(clusterFilePath), apiVersion)); +} + +Reference ThreadSafeApi::createDatabaseFromConnectionString(const char* connectionString) { + return Reference(new ThreadSafeDatabase( + makeReference(ClusterConnectionString(connectionString)), apiVersion)); } void ThreadSafeApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) { diff --git a/fdbclient/include/fdbclient/ClusterConnectionFile.h b/fdbclient/include/fdbclient/ClusterConnectionFile.h index eded1726e3..43181b0343 100644 --- a/fdbclient/include/fdbclient/ClusterConnectionFile.h +++ b/fdbclient/include/fdbclient/ClusterConnectionFile.h @@ -33,6 +33,11 @@ public: // Creates a cluster file with a given connection string and saves it to the specified file. explicit ClusterConnectionFile(std::string const& filename, ClusterConnectionString const& contents); + // Creates a cluster file from the given filename. If the filename is empty, attempts to load the default + // cluster file instead. + static Reference openOrDefault(std::string const& filename); + static Reference openOrDefault(const char* filename); + // Sets the connections string held by this object and persists it. Future setAndPersistConnectionString(ClusterConnectionString const&) override; diff --git a/fdbclient/include/fdbclient/CommitProxyInterface.h b/fdbclient/include/fdbclient/CommitProxyInterface.h index 149e77521d..9a8095f808 100644 --- a/fdbclient/include/fdbclient/CommitProxyInterface.h +++ b/fdbclient/include/fdbclient/CommitProxyInterface.h @@ -116,6 +116,7 @@ struct ClientDBInfo { firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk Optional forward; std::vector history; + UID clusterId; TenantMode tenantMode; @@ -129,7 +130,7 @@ struct ClientDBInfo { if constexpr (!is_fb_function) { ASSERT(ar.protocolVersion().isValid()); } - serializer(ar, grvProxies, commitProxies, id, forward, history, tenantMode); + serializer(ar, grvProxies, commitProxies, id, forward, history, tenantMode, clusterId); } }; diff --git a/fdbclient/include/fdbclient/CoordinationInterface.h b/fdbclient/include/fdbclient/CoordinationInterface.h index c94a7498cc..7ccaf9170a 100644 --- a/fdbclient/include/fdbclient/CoordinationInterface.h +++ b/fdbclient/include/fdbclient/CoordinationInterface.h @@ -157,6 +157,11 @@ private: bool connectionStringNeedsPersisted; }; +template <> +struct Traceable : std::true_type { + static std::string toString(IClusterConnectionRecord const& record) { return record.toString(); } +}; + struct LeaderInfo { constexpr static FileIdentifier file_identifier = 8338794; // The first 7 bits of changeID represent cluster controller process class fitness, the lower the better diff --git a/fdbclient/include/fdbclient/IClientApi.h b/fdbclient/include/fdbclient/IClientApi.h index 8927286005..73e743d060 100644 --- a/fdbclient/include/fdbclient/IClientApi.h +++ b/fdbclient/include/fdbclient/IClientApi.h @@ -20,14 +20,13 @@ #ifndef FDBCLIENT_ICLIENTAPI_H #define FDBCLIENT_ICLIENTAPI_H -#include "flow/ProtocolVersion.h" #pragma once #include "fdbclient/FDBOptions.g.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/Tenant.h" - #include "fdbclient/Tracing.h" +#include "flow/ProtocolVersion.h" #include "flow/ThreadHelper.actor.h" struct VersionVector; @@ -199,6 +198,7 @@ public: virtual void stopNetwork() = 0; virtual Reference createDatabase(const char* clusterFilePath) = 0; + virtual Reference createDatabaseFromConnectionString(const char* connectionString) = 0; virtual void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) = 0; }; diff --git a/fdbclient/include/fdbclient/MultiVersionTransaction.h b/fdbclient/include/fdbclient/MultiVersionTransaction.h index c20b76f8b2..4a59872c23 100644 --- a/fdbclient/include/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/include/fdbclient/MultiVersionTransaction.h @@ -20,14 +20,13 @@ #ifndef FDBCLIENT_MULTIVERSIONTRANSACTION_H #define FDBCLIENT_MULTIVERSIONTRANSACTION_H -#include "flow/ProtocolVersion.h" #pragma once #include "fdbclient/fdb_c_options.g.h" #include "fdbclient/FDBOptions.g.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/IClientApi.h" - +#include "flow/ProtocolVersion.h" #include "flow/ThreadHelper.actor.h" // FdbCApi is used as a wrapper around the FoundationDB C API that gets loaded from an external client library. @@ -128,6 +127,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { fdb_error_t (*runNetwork)(); fdb_error_t (*stopNetwork)(); fdb_error_t (*createDatabase)(const char* clusterFilePath, FDBDatabase** db); + fdb_error_t (*createDatabaseFromConnectionString)(const char* connectionString, FDBDatabase** db); // Database fdb_error_t (*databaseOpenTenant)(FDBDatabase* database, @@ -498,9 +498,11 @@ public: void runNetwork() override; void stopNetwork() override; - Reference createDatabase(const char* clusterFilePath) override; + Reference createDatabase(const char* clusterFile) override; Reference createDatabase609(const char* clusterFilePath); // legacy database creation + Reference createDatabaseFromConnectionString(const char* connectionString) override; + void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override; private: @@ -722,6 +724,57 @@ public: Reference tenantState; }; +class ClusterConnectionRecord { +private: + enum class Type { FILE, CONNECTION_STRING }; + ClusterConnectionRecord(Type type, std::string const& recordStr) : type(type), recordStr(recordStr) {} + + Type type; + std::string recordStr; + +public: + static ClusterConnectionRecord fromFile(std::string const& clusterFilePath) { + return ClusterConnectionRecord(Type::FILE, clusterFilePath); + } + + static ClusterConnectionRecord fromConnectionString(std::string const& connectionString) { + return ClusterConnectionRecord(Type::CONNECTION_STRING, connectionString); + } + + Reference createDatabase(IClientApi* api) const { + switch (type) { + case Type::FILE: + return api->createDatabase(recordStr.c_str()); + case Type::CONNECTION_STRING: + return api->createDatabaseFromConnectionString(recordStr.c_str()); + default: + ASSERT(false); + throw internal_error(); + } + } + + std::string toString() const { + switch (type) { + case Type::FILE: + if (recordStr.empty()) { + return "default file"; + } else { + return "file: " + recordStr; + } + case Type::CONNECTION_STRING: + return "connection string: " + recordStr; + default: + ASSERT(false); + throw internal_error(); + } + } +}; + +template <> +struct Traceable : std::true_type { + static std::string toString(ClusterConnectionRecord const& connectionRecord) { return connectionRecord.toString(); } +}; + // An implementation of IDatabase that wraps a database created either locally or through a dynamically loaded // external client. The MultiVersionDatabase monitors the protocol version of the cluster and automatically // replaces the wrapped database when the protocol version changes. @@ -729,7 +782,7 @@ class MultiVersionDatabase final : public IDatabase, ThreadSafeReferenceCounted< public: MultiVersionDatabase(MultiVersionApi* api, int threadIdx, - std::string clusterFilePath, + ClusterConnectionRecord const& connectionRecord, Reference db, Reference versionMonitorDb, bool openConnectors = true); @@ -771,7 +824,7 @@ public: // A struct that manages the current connection state of the MultiVersionDatabase. This wraps the underlying // IDatabase object that is currently interacting with the cluster. struct DatabaseState : ThreadSafeReferenceCounted { - DatabaseState(std::string clusterFilePath, Reference versionMonitorDb); + DatabaseState(ClusterConnectionRecord const& connectionRecord, Reference versionMonitorDb); // Replaces the active database connection with a new one. Must be called from the main thread. void updateDatabase(Reference newDb, Reference client); @@ -796,7 +849,8 @@ public: Reference db; const Reference>> dbVar; - std::string clusterFilePath; + ClusterConnectionRecord connectionRecord; + std::string clusterId; // Used to monitor the cluster protocol version. Will be the same as db unless we have either not connected // yet or if the client version associated with db does not support protocol monitoring. In those cases, @@ -809,6 +863,8 @@ public: ThreadFuture dbReady; ThreadFuture protocolVersionMonitor; + Future sharedStateUpdater; + // Versions older than 6.1 do not benefit from having their database connections closed. Additionally, // there are various issues that result in negative behavior in some cases if the connections are closed. // Therefore, we leave them open. @@ -873,7 +929,10 @@ public: void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override; // Creates an IDatabase object that represents a connection to the cluster + Reference createDatabase(ClusterConnectionRecord const& connectionRecord); Reference createDatabase(const char* clusterFilePath) override; + Reference createDatabaseFromConnectionString(const char* connectionString) override; + static MultiVersionApi* api; Reference getLocalClient(); @@ -886,10 +945,18 @@ public: bool callbackOnMainThread; bool localClientDisabled; - ThreadFuture updateClusterSharedStateMap(std::string clusterFilePath, - ProtocolVersion dbProtocolVersion, - Reference db); - void clearClusterSharedStateMapEntry(std::string clusterFilePath, ProtocolVersion dbProtocolVersion); + Future updateClusterSharedStateMap(ClusterConnectionRecord const& connectionRecord, + ProtocolVersion dbProtocolVersion, + Reference db); + void clearClusterSharedStateMapEntry(std::string clusterId, ProtocolVersion dbProtocolVersion); + + // Map of cluster ID -> DatabaseSharedState pointer Future + // Upon cluster version upgrade, clear the map entry for that cluster + struct SharedStateInfo { + ThreadFuture sharedStateFuture; + ProtocolVersion protocolVersion; + }; + std::map clusterSharedStateMap; static bool apiVersionAtLeast(int minVersion); @@ -913,13 +980,6 @@ private: Reference localClient; std::map externalClientDescriptions; std::map>> externalClients; - // Map of clusterFilePath -> DatabaseSharedState pointer Future - // Upon cluster version upgrade, clear the map entry for that cluster - struct SharedStateInfo { - ThreadFuture sharedStateFuture; - ProtocolVersion protocolVersion; - }; - std::map clusterSharedStateMap; bool networkStartSetup; volatile bool networkSetup; diff --git a/fdbclient/include/fdbclient/SpecialKeySpace.actor.h b/fdbclient/include/fdbclient/SpecialKeySpace.actor.h index fcff6dbb0e..3a2c7f6b83 100644 --- a/fdbclient/include/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/include/fdbclient/SpecialKeySpace.actor.h @@ -166,6 +166,7 @@ public: ACTORLINEAGE, // Sampling data ACTOR_PROFILER_CONF, // profiler configuration CLUSTERFILEPATH, + CLUSTERID, // An immutable UID for a cluster CONFIGURATION, // Configuration of the cluster CONNECTIONSTRING, ERRORMSG, // A single key space contains a json string which describes the last error in special-key-space diff --git a/fdbclient/include/fdbclient/ThreadSafeTransaction.h b/fdbclient/include/fdbclient/ThreadSafeTransaction.h index 687a231e3e..875664ea76 100644 --- a/fdbclient/include/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/include/fdbclient/ThreadSafeTransaction.h @@ -72,7 +72,7 @@ private: DatabaseContext* db; public: // Internal use only - ThreadSafeDatabase(std::string connFilename, int apiVersion); + ThreadSafeDatabase(Reference connectionRecord, int apiVersion); ThreadSafeDatabase(DatabaseContext* db) : db(db) {} DatabaseContext* unsafeGetPtr() const { return db; } }; @@ -212,6 +212,7 @@ public: void stopNetwork() override; Reference createDatabase(const char* clusterFilePath) override; + Reference createDatabaseFromConnectionString(const char* connectionString) override; void addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) override; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 75babbb99a..b2bfa65529 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -250,6 +250,7 @@ ACTOR Future clusterWatchDatabase(ClusterControllerData* cluster, dbInfo.myLocality = db->serverInfo->get().myLocality; dbInfo.client = ClientDBInfo(); dbInfo.client.tenantMode = db->config.tenantMode; + dbInfo.client.clusterId = db->serverInfo->get().client.clusterId; TraceEvent("CCWDB", cluster->id) .detail("NewMaster", dbInfo.master.id().toString()) @@ -1022,7 +1023,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co // Construct the client information if (db->clientInfo->get().commitProxies != req.commitProxies || db->clientInfo->get().grvProxies != req.grvProxies || - db->clientInfo->get().tenantMode != db->config.tenantMode) { + db->clientInfo->get().tenantMode != db->config.tenantMode || db->clientInfo->get().clusterId != req.clusterId) { TraceEvent("PublishNewClientInfo", self->id) .detail("Master", dbInfo.master.id()) .detail("GrvProxies", db->clientInfo->get().grvProxies) @@ -1030,7 +1031,9 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co .detail("CommitProxies", db->clientInfo->get().commitProxies) .detail("ReqCPs", req.commitProxies) .detail("TenantMode", db->clientInfo->get().tenantMode.toString()) - .detail("ReqTenantMode", db->config.tenantMode.toString()); + .detail("ReqTenantMode", db->config.tenantMode.toString()) + .detail("ClusterId", db->clientInfo->get().clusterId) + .detail("ReqClusterId", req.clusterId); isChanged = true; // TODO why construct a new one and not just copy the old one and change proxies + id? ClientDBInfo clientInfo; @@ -1038,6 +1041,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co clientInfo.commitProxies = req.commitProxies; clientInfo.grvProxies = req.grvProxies; clientInfo.tenantMode = db->config.tenantMode; + clientInfo.clusterId = req.clusterId; db->clientInfo->set(clientInfo); dbInfo.client = db->clientInfo->get(); } @@ -1057,11 +1061,6 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co dbInfo.recoveryCount = req.recoveryCount; } - if (dbInfo.clusterId != req.clusterId) { - isChanged = true; - dbInfo.clusterId = req.clusterId; - } - if (isChanged) { dbInfo.id = deterministicRandom()->randomUniqueID(); dbInfo.infoGeneration = ++self->db.dbInfoCount; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index f9dd88598d..c09acc6f79 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -2626,7 +2626,7 @@ ACTOR Future serveTLogInterface(TLogData* self, } // Persist cluster ID once cluster has recovered. - auto ccClusterId = self->dbInfo->get().clusterId; + auto ccClusterId = self->dbInfo->get().client.clusterId; if (self->dbInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED && !self->durableClusterId.isValid()) { ASSERT(ccClusterId.isValid()); diff --git a/fdbserver/include/fdbserver/ServerDBInfo.actor.h b/fdbserver/include/fdbserver/ServerDBInfo.actor.h index 9a1bd30de6..1a7a9a4211 100644 --- a/fdbserver/include/fdbserver/ServerDBInfo.actor.h +++ b/fdbserver/include/fdbserver/ServerDBInfo.actor.h @@ -65,7 +65,6 @@ struct ServerDBInfo { // which need to stay alive in case this recovery fails Optional latencyBandConfig; int64_t infoGeneration; - UID clusterId; ServerDBInfo() : recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED), logSystemConfig(0), infoGeneration(0) {} @@ -91,8 +90,7 @@ struct ServerDBInfo { logSystemConfig, priorCommittedLogServers, latencyBandConfig, - infoGeneration, - clusterId); + infoGeneration); } };