diff --git a/fdbserver/ConfigBroadcaster.actor.cpp b/fdbserver/ConfigBroadcaster.actor.cpp index d09fbe2040..3848f4f317 100644 --- a/fdbserver/ConfigBroadcaster.actor.cpp +++ b/fdbserver/ConfigBroadcaster.actor.cpp @@ -31,9 +31,8 @@ bool matchesConfigClass(Optional const& configClassSet, Optional } // namespace class ConfigBroadcasterImpl { - std::map configClassToToken; - std::map> tokenToReply; - std::map> tokenToConfigClasses; + std::map> configClassToTokens; + std::map tokenToRequest; std::map snapshot; std::deque> versionedMutations; Version lastCompactedVersion; @@ -44,11 +43,32 @@ class ConfigBroadcasterImpl { UID id; CounterCollection cc; Counter compactRequest; - Counter successfulChangeRequest; + mutable Counter successfulChangeRequest; Counter failedChangeRequest; Counter snapshotRequest; Future logger; + template + void sendChangesReply(ConfigFollowerGetChangesRequest const& req, Changes const& changes) const { + ASSERT_LT(req.lastSeenVersion, mostRecentVersion); + ConfigFollowerGetChangesReply reply; + reply.mostRecentVersion = mostRecentVersion; + for (const auto& versionedMutation : changes) { + if (versionedMutation.version > req.lastSeenVersion && + matchesConfigClass(req.configClassSet, versionedMutation.mutation.getConfigClass())) { + TraceEvent(SevDebug, "ConfigBroadcasterSendingChangeMutation", id) + .detail("Version", versionedMutation.version) + .detail("ReqLastSeenVersion", req.lastSeenVersion) + .detail("ConfigClass", versionedMutation.mutation.getConfigClass()) + .detail("KnobName", versionedMutation.mutation.getKnobName()) + .detail("KnobValue", versionedMutation.mutation.getValue()); + reply.versionedMutations.push_back_deep(reply.versionedMutations.arena(), versionedMutation); + } + } + req.reply.send(reply); + ++successfulChangeRequest; + } + ACTOR static Future serve(ConfigBroadcaster* self, ConfigBroadcasterImpl* impl, ConfigFollowerInterface cfi) { impl->actors.add(impl->consumer->consume(*self)); loop { @@ -70,29 +90,19 @@ class ConfigBroadcasterImpl { ++impl->failedChangeRequest; continue; } - // TODO: If there are no new changes, register the request and push - // changes when ready - ConfigFollowerGetChangesReply reply; - reply.mostRecentVersion = impl->mostRecentVersion; - for (const auto& versionedMutation : impl->versionedMutations) { - if (versionedMutation.version > req.lastSeenVersion && - matchesConfigClass(req.configClassSet, versionedMutation.mutation.getConfigClass())) { - TraceEvent(SevDebug, "ConfigBroadcasterSendingChangeMutation", impl->id) - .detail("Version", versionedMutation.version) - .detail("ReqLastSeenVersion", req.lastSeenVersion) - .detail("ConfigClass", versionedMutation.mutation.getConfigClass()) - .detail("KnobName", versionedMutation.mutation.getKnobName()) - .detail("KnobValue", versionedMutation.mutation.getValue()); - reply.versionedMutations.push_back_deep(reply.versionedMutations.arena(), - versionedMutation); + if (req.lastSeenVersion < impl->mostRecentVersion) { + impl->sendChangesReply(req, impl->versionedMutations); + } else { + auto token = req.reply.getEndpoint().token; + impl->tokenToRequest[token] = req; + ASSERT(req.configClassSet.present()); + for (const auto& configClass : req.configClassSet.get().getClasses()) { + impl->configClassToTokens[configClass].push_back(token); } } - req.reply.send(reply); - ++impl->successfulChangeRequest; } when(ConfigFollowerCompactRequest req = waitNext(cfi.compact.getFuture())) { ++impl->compactRequest; - // TODO: Use std::algorithm here while (!impl->versionedMutations.empty()) { const auto& version = impl->versionedMutations.front().version; if (version > req.version) { @@ -121,19 +131,35 @@ class ConfigBroadcasterImpl { public: Future serve(ConfigBroadcaster* self, ConfigFollowerInterface const& cfi) { return serve(self, this, cfi); } - Future addVersionedMutations(Standalone> const& versionedMutations, + Future addVersionedMutations(Standalone> const& changes, Version mostRecentVersion) { - this->versionedMutations.insert( - this->versionedMutations.end(), versionedMutations.begin(), versionedMutations.end()); - for (const auto& versionedMutation : versionedMutations) { + this->mostRecentVersion = mostRecentVersion; + versionedMutations.insert(versionedMutations.end(), changes.begin(), changes.end()); + std::set toNotify; + for (const auto& versionedMutation : changes) { const auto& mutation = versionedMutation.mutation; + if (!mutation.getConfigClass().present()) { + // Update everything + for (const auto& [token, req] : tokenToRequest) { + toNotify.insert(token); + } + } else { + for (const auto& token : configClassToTokens[mutation.getConfigClass().get()]) { + toNotify.insert(token); + } + configClassToTokens.clear(); + } if (mutation.isSet()) { snapshot[mutation.getKey()] = mutation.getValue().get(); } else { snapshot.erase(mutation.getKey()); } } - this->mostRecentVersion = mostRecentVersion; + for (const auto& token : toNotify) { + // TODO: What if this reply gets lost? + sendChangesReply(tokenToRequest[token], changes); + tokenToRequest.erase(token); + } return Void(); } diff --git a/fdbserver/ConfigFollowerInterface.cpp b/fdbserver/ConfigFollowerInterface.cpp index 8b496b0a63..d53764c130 100644 --- a/fdbserver/ConfigFollowerInterface.cpp +++ b/fdbserver/ConfigFollowerInterface.cpp @@ -56,3 +56,7 @@ ConfigClassSet::ConfigClassSet(VectorRef configClasses) { bool ConfigClassSet::contains(KeyRef configClass) const { return classes.count(configClass); } + +std::set const& ConfigClassSet::getClasses() const { + return classes; +} diff --git a/fdbserver/ConfigFollowerInterface.h b/fdbserver/ConfigFollowerInterface.h index a527dbe8e3..b00790aa6e 100644 --- a/fdbserver/ConfigFollowerInterface.h +++ b/fdbserver/ConfigFollowerInterface.h @@ -37,6 +37,7 @@ public: ConfigClassSet(VectorRef configClasses); bool contains(KeyRef configClass) const; + std::set const& getClasses() const; template void serialize(Ar& ar) { diff --git a/fdbserver/LocalConfiguration.actor.cpp b/fdbserver/LocalConfiguration.actor.cpp index f823a2abc3..a9bdedd2a6 100644 --- a/fdbserver/LocalConfiguration.actor.cpp +++ b/fdbserver/LocalConfiguration.actor.cpp @@ -58,7 +58,8 @@ class ConfigKnobOverrides { public: ConfigKnobOverrides() = default; explicit ConfigKnobOverrides(std::string const& paramString) { - // TODO: Validate string + ASSERT(std::all_of( + paramString.begin(), paramString.end(), [](char c) { return isalpha(c) || c == '/' || c == '-'; })); StringRef s = stringToKeyRef(paramString); while (s.size()) { configPath.push_back_deep(configPath.arena(), s.eat("/"_sr)); diff --git a/fdbserver/LocalConfiguration.h b/fdbserver/LocalConfiguration.h index 44cd3714c2..b8b0fe7ebd 100644 --- a/fdbserver/LocalConfiguration.h +++ b/fdbserver/LocalConfiguration.h @@ -53,7 +53,6 @@ public: ClientKnobs const& getClientKnobs() const; ServerKnobs const& getServerKnobs() const; TestKnobs const& getTestKnobs() const; - // TODO: Only one field of serverDBInfo is required, so improve encapsulation Future consume(Reference const> const& broadcaster); Future setSnapshot(std::map const& snapshot, Version snapshotVersion); Future addVersionedMutations(Standalone> versionedMutations, diff --git a/fdbserver/SimpleConfigConsumer.actor.cpp b/fdbserver/SimpleConfigConsumer.actor.cpp index 5b3bf88f31..2aa127a41d 100644 --- a/fdbserver/SimpleConfigConsumer.actor.cpp +++ b/fdbserver/SimpleConfigConsumer.actor.cpp @@ -146,8 +146,7 @@ public: template Future consume(ConfigStore& configStore) { - // TODO: Reenable compaction - return fetchChanges(this, &configStore); /* ||compactor(this); */ + return fetchChanges(this, &configStore) || compactor(this); } UID getID() const { return id; } diff --git a/fdbserver/SimpleConfigDatabaseNode.actor.cpp b/fdbserver/SimpleConfigDatabaseNode.actor.cpp index b87e2e6541..cd389fc1d1 100644 --- a/fdbserver/SimpleConfigDatabaseNode.actor.cpp +++ b/fdbserver/SimpleConfigDatabaseNode.actor.cpp @@ -30,11 +30,11 @@ namespace { -const KeyRef lastCompactedVersionKey = LiteralStringRef("lastCompactedVersion"); -const KeyRef liveTransactionVersionKey = LiteralStringRef("liveTransactionVersion"); -const KeyRef committedVersionKey = LiteralStringRef("committedVersion"); -const KeyRangeRef kvKeys = KeyRangeRef(LiteralStringRef("kv/"), LiteralStringRef("kv0")); -const KeyRangeRef mutationKeys = KeyRangeRef(LiteralStringRef("mutation/"), LiteralStringRef("mutation0")); +const KeyRef lastCompactedVersionKey = "lastCompactedVersion"_sr; +const KeyRef liveTransactionVersionKey = "liveTransactionVersion"_sr; +const KeyRef committedVersionKey = "committedVersion"_sr; +const KeyRangeRef kvKeys = KeyRangeRef("kv/"_sr, "kv0"_sr); +const KeyRangeRef mutationKeys = KeyRangeRef("mutation/"_sr, "mutation0"_sr); Key versionedMutationKey(Version version, uint32_t index) { ASSERT(version >= 0);