diff --git a/fdbclient/GlobalConfig.actor.cpp b/fdbclient/GlobalConfig.actor.cpp index 5fa901df0e..b5bba35b5b 100644 --- a/fdbclient/GlobalConfig.actor.cpp +++ b/fdbclient/GlobalConfig.actor.cpp @@ -34,16 +34,7 @@ const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_inf const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate"); const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost"); -GlobalConfig::GlobalConfig() : lastUpdate(0) {} - -void GlobalConfig::create(DatabaseContext* cx, Reference> dbInfo) { - if (g_network->global(INetwork::enGlobalConfig) == nullptr) { - auto config = new GlobalConfig{}; - config->cx = Database(cx); - g_network->setGlobal(INetwork::enGlobalConfig, config); - config->_updater = updater(config, dbInfo); - } -} +GlobalConfig::GlobalConfig(Database& cx) : cx(cx), lastUpdate(0) {} GlobalConfig& GlobalConfig::globalConfig() { void* res = g_network->global(INetwork::enGlobalConfig); @@ -77,6 +68,14 @@ Future GlobalConfig::onInitialized() { return initialized.getFuture(); } +Future GlobalConfig::onChange() { + return configChanged.onTrigger(); +} + +void GlobalConfig::trigger(KeyRef key, std::function)> fn) { + callbacks.emplace(key, std::move(fn)); +} + void GlobalConfig::insert(KeyRef key, ValueRef value) { data.erase(key); @@ -89,6 +88,8 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) { any = StringRef(arena, t.getString(0).contents()); } else if (t.getType(0) == Tuple::ElementType::INT) { any = t.getInt(0); + } else if (t.getType(0) == Tuple::ElementType::BOOL) { + any = t.getBool(0); } else if (t.getType(0) == Tuple::ElementType::FLOAT) { any = t.getFloat(0); } else if (t.getType(0) == Tuple::ElementType::DOUBLE) { @@ -97,19 +98,26 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) { ASSERT(false); } data[stableKey] = makeReference(std::move(arena), std::move(any)); + + if (callbacks.find(stableKey) != callbacks.end()) { + callbacks[stableKey](data[stableKey]->value); + } } catch (Error& e) { - TraceEvent("GlobalConfigTupleParseError").detail("What", e.what()); + TraceEvent(SevWarn, "GlobalConfigTupleParseError").detail("What", e.what()); } } -void GlobalConfig::erase(KeyRef key) { - data.erase(key); +void GlobalConfig::erase(Key key) { + erase(KeyRangeRef(key, keyAfter(key))); } void GlobalConfig::erase(KeyRangeRef range) { auto it = data.begin(); while (it != data.end()) { if (range.contains(it->first)) { + if (callbacks.find(it->first) != callbacks.end()) { + callbacks[it->first](std::nullopt); + } it = data.erase(it); } else { ++it; @@ -134,36 +142,39 @@ ACTOR Future GlobalConfig::migrate(GlobalConfig* self) { state Optional sampleRate = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr))); state Optional sizeLimit = wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr))); - loop { - try { - tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); - // The value doesn't matter too much, as long as the key is set. - tr->set(migratedKey.contents(), "1"_sr); - if (sampleRate.present()) { - const double sampleRateDbl = - BinaryReader::fromStringRef(sampleRate.get().contents(), Unversioned()); - Tuple rate = Tuple().appendDouble(sampleRateDbl); - tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack()); - } - if (sizeLimit.present()) { - const int64_t sizeLimitInt = - BinaryReader::fromStringRef(sizeLimit.get().contents(), Unversioned()); - Tuple size = Tuple().append(sizeLimitInt); - tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack()); - } - - wait(tr->commit()); - return Void(); - } catch (Error& e) { - throw; + try { + tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + // The value doesn't matter too much, as long as the key is set. + tr->set(migratedKey.contents(), "1"_sr); + if (sampleRate.present()) { + const double sampleRateDbl = + BinaryReader::fromStringRef(sampleRate.get().contents(), Unversioned()); + Tuple rate = Tuple().appendDouble(sampleRateDbl); + tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack()); } + if (sizeLimit.present()) { + const int64_t sizeLimitInt = + BinaryReader::fromStringRef(sizeLimit.get().contents(), Unversioned()); + Tuple size = Tuple().append(sizeLimitInt); + tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack()); + } + + wait(tr->commit()); + } catch (Error& e) { + // If multiple fdbserver processes are started at once, they will all + // attempt this migration at the same time, sometimes resulting in + // aborts due to conflicts. Purposefully avoid retrying, making this + // migration best-effort. + TraceEvent(SevInfo, "GlobalConfigMigrationError").detail("What", e.what()); } + + return Void(); } // Updates local copy of global configuration by reading the entire key-range // from storage. ACTOR Future GlobalConfig::refresh(GlobalConfig* self) { - self->data.clear(); + self->erase(KeyRangeRef(""_sr, "\xff"_sr)); Transaction tr(self->cx); RangeResult result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY)); @@ -176,7 +187,8 @@ ACTOR Future GlobalConfig::refresh(GlobalConfig* self) { // Applies updates to the local copy of the global configuration when this // process receives an updated history. -ACTOR Future GlobalConfig::updater(GlobalConfig* self, Reference> dbInfo) { +ACTOR Future GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo* dbInfo) { + wait(self->cx->onConnected()); wait(self->migrate(self)); wait(self->refresh(self)); @@ -184,9 +196,9 @@ ACTOR Future GlobalConfig::updater(GlobalConfig* self, ReferenceonChange()); + wait(self->dbInfoChanged.onTrigger()); - auto& history = dbInfo->get().history; + auto& history = dbInfo->history; if (history.size() == 0) { continue; } @@ -196,8 +208,8 @@ ACTOR Future GlobalConfig::updater(GlobalConfig* self, Referencerefresh(self)); - if (dbInfo->get().history.size() > 0) { - self->lastUpdate = dbInfo->get().history.back().version; + if (dbInfo->history.size() > 0) { + self->lastUpdate = dbInfo->history.back().version; } } else { // Apply history in order, from lowest version to highest @@ -222,6 +234,8 @@ ACTOR Future GlobalConfig::updater(GlobalConfig* self, ReferencelastUpdate = vh.version; } } + + self->configChanged.trigger(); } catch (Error& e) { throw; } diff --git a/fdbclient/GlobalConfig.actor.h b/fdbclient/GlobalConfig.actor.h index 5c3693f450..816c0933af 100644 --- a/fdbclient/GlobalConfig.actor.h +++ b/fdbclient/GlobalConfig.actor.h @@ -62,10 +62,28 @@ struct ConfigValue : ReferenceCounted { class GlobalConfig : NonCopyable { public: - // Creates a GlobalConfig singleton, accessed by calling GlobalConfig(). - // This function should only be called once by each process (however, it is - // idempotent and calling it multiple times will have no effect). - static void create(DatabaseContext* cx, Reference> dbInfo); + // Creates a GlobalConfig singleton, accessed by calling + // GlobalConfig::globalConfig(). This function requires a database object + // to allow global configuration to run transactions on the database, and + // an AsyncVar object to watch for changes on. The ClientDBInfo pointer + // should point to a ClientDBInfo object which will contain the updated + // global configuration history when the given AsyncVar changes. This + // function should be called whenever the database object changes, in order + // to allow global configuration to run transactions on the latest + // database. + template + static void create(Database& cx, Reference> db, const ClientDBInfo* dbInfo) { + if (g_network->global(INetwork::enGlobalConfig) == nullptr) { + auto config = new GlobalConfig{cx}; + g_network->setGlobal(INetwork::enGlobalConfig, config); + config->_updater = updater(config, dbInfo); + // Bind changes in `db` to the `dbInfoChanged` AsyncTrigger. + forward(db, std::addressof(config->dbInfoChanged)); + } else { + GlobalConfig* oldConfig = reinterpret_cast(g_network->global(INetwork::enGlobalConfig)); + oldConfig->cx = cx; + } + } // Returns a reference to the global GlobalConfig object. Clients should // call this function whenever they need to read a value out of the global @@ -114,8 +132,18 @@ public: // been created and is ready. Future onInitialized(); + // Triggers the returned future when any key-value pair in the global + // configuration changes. + Future onChange(); + + // Calls \ref fn when the value associated with \ref key is changed. \ref + // fn is passed the updated value for the key, or an empty optional if the + // key has been cleared. If the value is an allocated object, its memory + // remains in the control of the global configuration. + void trigger(KeyRef key, std::function)> fn); + private: - GlobalConfig(); + GlobalConfig(Database& cx); // The functions below only affect the local copy of the global // configuration keyspace! To insert or remove values across all nodes you @@ -127,20 +155,23 @@ private: void insert(KeyRef key, ValueRef value); // Removes the given key (and associated value) from the local copy of the // global configuration keyspace. - void erase(KeyRef key); + void erase(Key key); // Removes the given key range (and associated values) from the local copy // of the global configuration keyspace. void erase(KeyRangeRef range); ACTOR static Future migrate(GlobalConfig* self); ACTOR static Future refresh(GlobalConfig* self); - ACTOR static Future updater(GlobalConfig* self, Reference> dbInfo); + ACTOR static Future updater(GlobalConfig* self, const ClientDBInfo* dbInfo); Database cx; + AsyncTrigger dbInfoChanged; Future _updater; Promise initialized; + AsyncTrigger configChanged; std::unordered_map> data; Version lastUpdate; + std::unordered_map)>> callbacks; }; #endif diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index ccbf992c2f..5e07c1c38d 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1152,8 +1152,6 @@ DatabaseContext::DatabaseContext(Reference connFile, /*switchable*/ true); } - return Database(db); + auto database = Database(db); + GlobalConfig::create(database, clientInfo, std::addressof(clientInfo->get())); + return database; } Database Database::createDatabase(std::string connFileName, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index ade50d2e7b..d51d27a23c 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1384,6 +1384,9 @@ Future GlobalConfigImpl::getRange(ReadYourWritesTransaction* ryw, K } else if (config->value.type() == typeid(int64_t)) { result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::to_string(std::any_cast(config->value)))); + } else if (config->value.type() == typeid(bool)) { + result.push_back_deep(result.arena(), + KeyValueRef(prefixedKey, std::to_string(std::any_cast(config->value)))); } else if (config->value.type() == typeid(float)) { result.push_back_deep(result.arena(), KeyValueRef(prefixedKey, std::to_string(std::any_cast(config->value)))); diff --git a/fdbclient/Tuple.cpp b/fdbclient/Tuple.cpp index 367a7b80fb..ab1fcb0314 100644 --- a/fdbclient/Tuple.cpp +++ b/fdbclient/Tuple.cpp @@ -71,6 +71,8 @@ Tuple::Tuple(StringRef const& str, bool exclude_incomplete) { i += sizeof(float) + 1; } else if (data[i] == 0x21) { i += sizeof(double) + 1; + } else if (data[i] == 0x26 || data[i] == 0x27) { + i += 1; } else if (data[i] == '\x00') { i += 1; } else { @@ -144,6 +146,16 @@ Tuple& Tuple::append(int64_t value) { return *this; } +Tuple& Tuple::appendBool(bool value) { + offsets.push_back(data.size()); + if (value) { + data.push_back(data.arena(), 0x27); + } else { + data.push_back(data.arena(), 0x26); + } + return *this; +} + Tuple& Tuple::appendFloat(float value) { offsets.push_back(data.size()); float swap = bigEndianFloat(value); @@ -192,6 +204,8 @@ Tuple::ElementType Tuple::getType(size_t index) const { return ElementType::FLOAT; } else if (code == 0x21) { return ElementType::DOUBLE; + } else if (code == 0x26 || code == 0x27) { + return ElementType::BOOL; } else { throw invalid_tuple_data_type(); } @@ -287,6 +301,21 @@ int64_t Tuple::getInt(size_t index, bool allow_incomplete) const { } // TODO: Combine with bindings/flow/Tuple.*. This code is copied from there. +bool Tuple::getBool(size_t index) const { + if (index >= offsets.size()) { + throw invalid_tuple_index(); + } + ASSERT_LT(offsets[index], data.size()); + uint8_t code = data[offsets[index]]; + if (code == 0x26) { + return false; + } else if (code == 0x27) { + return true; + } else { + throw invalid_tuple_data_type(); + } +} + float Tuple::getFloat(size_t index) const { if (index >= offsets.size()) { throw invalid_tuple_index(); diff --git a/fdbclient/Tuple.h b/fdbclient/Tuple.h index 3dc597f262..62feba307b 100644 --- a/fdbclient/Tuple.h +++ b/fdbclient/Tuple.h @@ -40,6 +40,7 @@ struct Tuple { Tuple& append(int64_t); // There are some ambiguous append calls in fdbclient, so to make it easier // to add append for floats and doubles, name them differently for now. + Tuple& appendBool(bool); Tuple& appendFloat(float); Tuple& appendDouble(double); Tuple& appendNull(); @@ -51,7 +52,7 @@ struct Tuple { return append(t); } - enum ElementType { NULL_TYPE, INT, BYTES, UTF8, FLOAT, DOUBLE }; + enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE }; // this is number of elements, not length of data size_t size() const { return offsets.size(); } @@ -59,6 +60,7 @@ struct Tuple { ElementType getType(size_t index) const; Standalone getString(size_t index) const; int64_t getInt(size_t index, bool allow_incomplete = false) const; + bool getBool(size_t index) const; float getFloat(size_t index) const; double getDouble(size_t index) const; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index d6a2482950..ab4cfe2b3e 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -3988,7 +3988,7 @@ ACTOR Future monitorGlobalConfig(ClusterControllerData::DBInfo* db) { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); state Optional globalConfigVersion = wait(tr.get(globalConfigVersionKey)); - state ClientDBInfo clientInfo = db->clientInfo->get(); + state ClientDBInfo clientInfo = db->serverInfo->get().client; if (globalConfigVersion.present()) { // Since the history keys end with versionstamps, they @@ -4046,6 +4046,14 @@ ACTOR Future monitorGlobalConfig(ClusterControllerData::DBInfo* db) { } clientInfo.id = deterministicRandom()->randomUniqueID(); + // Update ServerDBInfo so fdbserver processes receive updated history. + ServerDBInfo serverInfo = db->serverInfo->get(); + serverInfo.id = deterministicRandom()->randomUniqueID(); + serverInfo.infoGeneration = ++db->dbInfoCount; + serverInfo.client = clientInfo; + db->serverInfo->set(serverInfo); + + // Update ClientDBInfo so client processes receive updated history. db->clientInfo->set(clientInfo); } diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 1b4affa606..e45a1d6617 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -22,6 +22,7 @@ #include #include "fdbrpc/Locality.h" +#include "fdbclient/GlobalConfig.actor.h" #include "fdbclient/StorageServerInterface.h" #include "fdbserver/Knobs.h" #include "flow/ActorCollection.h" @@ -139,12 +140,14 @@ Database openDBOnServer(Reference> const& db, bool enableLocalityLoadBalance, bool lockAware) { auto info = makeReference>(); - return DatabaseContext::create(info, - extractClientInfo(db, info), - enableLocalityLoadBalance ? db->get().myLocality : LocalityData(), - enableLocalityLoadBalance, - taskID, - lockAware); + auto cx = DatabaseContext::create(info, + extractClientInfo(db, info), + enableLocalityLoadBalance ? db->get().myLocality : LocalityData(), + enableLocalityLoadBalance, + taskID, + lockAware); + GlobalConfig::create(cx, db, std::addressof(db->get().client)); + return cx; } struct ErrorInfo { @@ -1292,7 +1295,6 @@ ACTOR Future workerServer(Reference connFile, notUpdated = interf.updateServerDBInfo.getEndpoint(); } else if (localInfo.infoGeneration > dbInfo->get().infoGeneration || dbInfo->get().clusterInterface != ccInterface->get().get()) { - TraceEvent("GotServerDBInfoChange") .detail("ChangeID", localInfo.id) .detail("MasterID", localInfo.master.id()) diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index 400b9cdf41..a4b67f6fdf 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -697,6 +697,16 @@ private: AsyncVar v; }; +// Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes +// the AsyncTrigger is triggered. +ACTOR template +void forward(Reference> from, AsyncTrigger* to) { + loop { + wait(from->onChange()); + to->trigger(); + } +} + class Debouncer : NonCopyable { public: explicit Debouncer(double delay) { worker = debounceWorker(this, delay); }