/* * ConfigBroadcaster.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include "fdbserver/ConfigBroadcaster.h" #include "fdbserver/Knobs.h" #include "fdbserver/IConfigConsumer.h" #include "flow/UnitTest.h" #include "flow/actorcompiler.h" // must be last include namespace { bool matchesConfigClass(ConfigClassSet const& configClassSet, Optional configClass) { return !configClass.present() || configClassSet.contains(configClass.get()); } // Helper functions for STL containers, with flow-friendly error handling template auto get(MapContainer& m, K const& k) -> decltype(m.at(k)) { auto it = m.find(k); ASSERT(it != m.end()); return it->second; } template void remove(Container& container, K const& k) { auto it = container.find(k); ASSERT(it != container.end()); container.erase(it); } } // namespace class ConfigBroadcasterImpl { // PendingRequestStore stores a set of pending ConfigBroadcastFollowerGetChangesRequests, // indexed by configuration class. When an update is received, replies are sent for all // pending requests with affected configuration classes /* class PendingRequestStore { using Req = ConfigBroadcastFollowerGetChangesRequest; std::map> configClassToTokens; std::map tokenToRequest; public: void addRequest(Req const& req) { auto token = req.reply.getEndpoint().token; tokenToRequest[token] = req; for (const auto& configClass : req.configClassSet.getClasses()) { configClassToTokens[configClass].insert(token); } } std::vector getRequestsToNotify(Standalone> const& changes) const { std::set tokenSet; for (const auto& change : changes) { if (!change.mutation.getConfigClass().present()) { // Update everything for (const auto& [token, req] : tokenToRequest) { if (req.lastSeenVersion < change.version) { tokenSet.insert(token); } } } else { Key configClass = change.mutation.getConfigClass().get(); if (configClassToTokens.count(configClass)) { auto tokens = get(configClassToTokens, Key(change.mutation.getConfigClass().get())); for (const auto& token : tokens) { auto req = get(tokenToRequest, token); if (req.lastSeenVersion < change.version) { tokenSet.insert(token); } else { TEST(true); // Worker is ahead of config broadcaster } } } } } std::vector result; for (const auto& token : tokenSet) { result.push_back(get(tokenToRequest, token)); } return result; } std::vector getOutdatedRequests(Version newSnapshotVersion) { std::vector result; for (const auto& [token, req] : tokenToRequest) { if (req.lastSeenVersion < newSnapshotVersion) { result.push_back(req); } } return result; } void removeRequest(Req const& req) { auto token = req.reply.getEndpoint().token; for (const auto& configClass : req.configClassSet.getClasses()) { remove(get(configClassToTokens, configClass), token); // TODO: Don't leak config classes } remove(tokenToRequest, token); } } pending; */ std::map snapshot; std::deque mutationHistory; std::deque annotationHistory; Version lastCompactedVersion; Version mostRecentVersion; std::unique_ptr consumer; ActorCollection actors{ false }; std::vector> workers; // TODO: Compact into struct UID id; CounterCollection cc; Counter compactRequest; Counter successfulChangeRequest; Counter failedChangeRequest; Counter snapshotRequest; Future logger; // Push changes to the specified clients. template Future pushChanges(std::vector>::iterator begin, std::vector>::iterator end, Changes const& changes) { std::vector> responses; for (auto it = begin; it != end; ++it) { auto& [configClassSet, lastSeenVersion, worker] = *it; ConfigBroadcastChangesRequest req; // Skip if client has already been updated to the latest version. if (lastSeenVersion >= mostRecentVersion) { continue; } for (const auto& versionedMutation : changes) { if (versionedMutation.version > lastSeenVersion && matchesConfigClass(configClassSet, versionedMutation.mutation.getConfigClass())) { TraceEvent te(SevDebug, "ConfigBroadcasterSendingChangeMutation", id); te.detail("Version", versionedMutation.version) .detail("ReqLastSeenVersion", lastSeenVersion) .detail("ConfigClass", versionedMutation.mutation.getConfigClass()) .detail("KnobName", versionedMutation.mutation.getKnobName()); if (versionedMutation.mutation.isSet()) { te.detail("Op", "Set").detail("KnobValue", versionedMutation.mutation.getValue().toString()); } else { te.detail("Op", "Clear"); } req.changes.push_back_deep(req.changes.arena(), versionedMutation); } } if (req.changes.size() > 0) { lastSeenVersion = mostRecentVersion; req.mostRecentVersion = mostRecentVersion; // TODO: Retry in event of failure responses.push_back(success(worker->interf.configBroadcastInterface.getChanges.getReply(req))); ++successfulChangeRequest; } } return waitForAll(responses); } ConfigBroadcasterImpl() : lastCompactedVersion(0), mostRecentVersion(0), id(deterministicRandom()->randomUniqueID()), cc("ConfigBroadcaster"), compactRequest("CompactRequest", cc), successfulChangeRequest("SuccessfulChangeRequest", cc), failedChangeRequest("FailedChangeRequest", cc), snapshotRequest("SnapshotRequest", cc) { logger = traceCounters( "ConfigBroadcasterMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigBroadcasterMetrics"); } void addChanges(Standalone> const& changes, Version mostRecentVersion, Standalone> const& annotations) { this->mostRecentVersion = mostRecentVersion; mutationHistory.insert(mutationHistory.end(), changes.begin(), changes.end()); annotationHistory.insert(annotationHistory.end(), annotations.begin(), annotations.end()); for (const auto& change : changes) { const auto& mutation = change.mutation; if (mutation.isSet()) { snapshot[mutation.getKey()] = mutation.getValue(); } else { snapshot.erase(mutation.getKey()); } } actors.add(pushChanges(workers.begin(), workers.end(), changes)); } template Future setSnapshot(Snapshot&& snapshot, Version snapshotVersion) { this->snapshot = std::forward(snapshot); this->lastCompactedVersion = snapshotVersion; return Void(); } public: Future registerWorker(ConfigBroadcaster* self, Version lastSeenVersion, ConfigClassSet configClassSet, Future& watcher, WorkerDetails* worker) { actors.add(consumer->consume(*self)); // TODO: Use `watcher` to detect death of client workers.push_back(std::make_tuple(std::move(configClassSet), lastSeenVersion, worker)); return pushChanges(workers.end() - 1, workers.end(), mutationHistory); } void applyChanges(Standalone> const& changes, Version mostRecentVersion, Standalone> const& annotations) { TraceEvent(SevDebug, "ConfigBroadcasterApplyingChanges", id) .detail("ChangesSize", changes.size()) .detail("CurrentMostRecentVersion", this->mostRecentVersion) .detail("NewMostRecentVersion", mostRecentVersion) .detail("AnnotationsSize", annotations.size()); addChanges(changes, mostRecentVersion, annotations); } template void applySnapshotAndChanges(Snapshot&& snapshot, Version snapshotVersion, Standalone> const& changes, Version changesVersion, Standalone> const& annotations) { TraceEvent(SevDebug, "ConfigBroadcasterApplyingSnapshotAndChanges", id) .detail("CurrentMostRecentVersion", this->mostRecentVersion) .detail("SnapshotSize", snapshot.size()) .detail("SnapshotVersion", snapshotVersion) .detail("ChangesSize", changes.size()) .detail("ChangesVersion", changesVersion) .detail("AnnotationsSize", annotations.size()); setSnapshot(std::forward(snapshot), snapshotVersion); addChanges(changes, changesVersion, annotations); } ConfigBroadcasterImpl(ConfigFollowerInterface const& cfi) : ConfigBroadcasterImpl() { consumer = IConfigConsumer::createTestSimple(cfi, 0.5, Optional{}); TraceEvent(SevDebug, "ConfigBroadcasterStartingConsumer", id).detail("Consumer", consumer->getID()); } ConfigBroadcasterImpl(ServerCoordinators const& coordinators, UseConfigDB useConfigDB) : ConfigBroadcasterImpl() { if (useConfigDB != UseConfigDB::DISABLED) { if (useConfigDB == UseConfigDB::SIMPLE) { consumer = IConfigConsumer::createSimple(coordinators, 0.5, Optional{}); } else { consumer = IConfigConsumer::createPaxos(coordinators, 0.5, Optional{}); } TraceEvent(SevDebug, "BroadcasterStartingConsumer", id) .detail("Consumer", consumer->getID()) .detail("UsingSimpleConsumer", useConfigDB == UseConfigDB::SIMPLE); } } JsonBuilderObject getStatus() const { JsonBuilderObject result; JsonBuilderArray mutationsArray; for (const auto& versionedMutation : mutationHistory) { JsonBuilderObject mutationObject; mutationObject["version"] = versionedMutation.version; const auto& mutation = versionedMutation.mutation; mutationObject["config_class"] = mutation.getConfigClass().orDefault(""_sr); mutationObject["knob_name"] = mutation.getKnobName(); mutationObject["knob_value"] = mutation.getValue().toString(); mutationsArray.push_back(std::move(mutationObject)); } result["mutations"] = std::move(mutationsArray); JsonBuilderArray commitsArray; for (const auto& versionedAnnotation : annotationHistory) { JsonBuilderObject commitObject; commitObject["version"] = versionedAnnotation.version; commitObject["description"] = versionedAnnotation.annotation.description; commitObject["timestamp"] = versionedAnnotation.annotation.timestamp; commitsArray.push_back(std::move(commitObject)); } result["commits"] = std::move(commitsArray); JsonBuilderObject snapshotObject; std::map, std::vector>> snapshotMap; for (const auto& [configKey, value] : snapshot) { snapshotMap[configKey.configClass.castTo()].emplace_back(configKey.knobName, value.toString()); } for (const auto& [configClass, kvs] : snapshotMap) { JsonBuilderObject kvsObject; for (const auto& [knobName, knobValue] : kvs) { kvsObject[knobName] = knobValue; } snapshotObject[configClass.orDefault(""_sr)] = std::move(kvsObject); } result["snapshot"] = std::move(snapshotObject); result["last_compacted_version"] = lastCompactedVersion; result["most_recent_version"] = mostRecentVersion; return result; } void compact(Version compactionVersion) { { auto it = std::find_if(mutationHistory.begin(), mutationHistory.end(), [compactionVersion](const auto& vm) { return vm.version > compactionVersion; }); mutationHistory.erase(mutationHistory.begin(), it); } { auto it = std::find_if(annotationHistory.begin(), annotationHistory.end(), [compactionVersion](const auto& va) { return va.version > compactionVersion; }); annotationHistory.erase(annotationHistory.begin(), it); } } UID getID() const { return id; } static void runPendingRequestStoreTest(bool includeGlobalMutation, int expectedMatches); }; ConfigBroadcaster::ConfigBroadcaster(ConfigFollowerInterface const& cfi) : _impl(std::make_unique(cfi)) {} ConfigBroadcaster::ConfigBroadcaster(ServerCoordinators const& coordinators, UseConfigDB useConfigDB) : _impl(std::make_unique(coordinators, useConfigDB)) {} ConfigBroadcaster::ConfigBroadcaster(ConfigBroadcaster&&) = default; ConfigBroadcaster& ConfigBroadcaster::operator=(ConfigBroadcaster&&) = default; ConfigBroadcaster::~ConfigBroadcaster() = default; Future ConfigBroadcaster::registerWorker(Version lastSeenVersion, ConfigClassSet configClassSet, Future& watcher, WorkerDetails* worker) { return impl().registerWorker(this, lastSeenVersion, configClassSet, watcher, worker); } void ConfigBroadcaster::applyChanges(Standalone> const& changes, Version mostRecentVersion, Standalone> const& annotations) { impl().applyChanges(changes, mostRecentVersion, annotations); } void ConfigBroadcaster::applySnapshotAndChanges( std::map const& snapshot, Version snapshotVersion, Standalone> const& changes, Version changesVersion, Standalone> const& annotations) { impl().applySnapshotAndChanges(snapshot, snapshotVersion, changes, changesVersion, annotations); } void ConfigBroadcaster::applySnapshotAndChanges( std::map&& snapshot, Version snapshotVersion, Standalone> const& changes, Version changesVersion, Standalone> const& annotations) { impl().applySnapshotAndChanges(std::move(snapshot), snapshotVersion, changes, changesVersion, annotations); } UID ConfigBroadcaster::getID() const { return impl().getID(); } JsonBuilderObject ConfigBroadcaster::getStatus() const { return impl().getStatus(); } void ConfigBroadcaster::compact(Version compactionVersion) { impl().compact(compactionVersion); } namespace { // Standalone> getTestChanges(Version version, bool includeGlobalMutation) { // Standalone> changes; // if (includeGlobalMutation) { // ConfigKey key = ConfigKeyRef({}, "test_long"_sr); // auto value = KnobValue::create(int64_t{ 5 }); // ConfigMutation mutation = ConfigMutationRef(key, value.contents()); // changes.emplace_back_deep(changes.arena(), version, mutation); // } // { // ConfigKey key = ConfigKeyRef("class-A"_sr, "test_long"_sr); // auto value = KnobValue::create(int64_t{ 5 }); // ConfigMutation mutation = ConfigMutationRef(key, value.contents()); // changes.emplace_back_deep(changes.arena(), version, mutation); // } // return changes; // } // ConfigBroadcastFollowerGetChangesRequest getTestRequest(Version lastSeenVersion, // std::vector const& configClasses) { // Standalone> configClassesVector; // for (const auto& configClass : configClasses) { // configClassesVector.push_back_deep(configClassesVector.arena(), configClass); // } // return ConfigBroadcastFollowerGetChangesRequest{ lastSeenVersion, ConfigClassSet{ configClassesVector } }; // } } // namespace // void ConfigBroadcasterImpl::runPendingRequestStoreTest(bool includeGlobalMutation, int expectedMatches) { // PendingRequestStore pending; // for (Version v = 0; v < 5; ++v) { // pending.addRequest(getTestRequest(v, {})); // pending.addRequest(getTestRequest(v, { "class-A"_sr })); // pending.addRequest(getTestRequest(v, { "class-B"_sr })); // pending.addRequest(getTestRequest(v, { "class-A"_sr, "class-B"_sr })); // } // auto toNotify = pending.getRequestsToNotify(getTestChanges(0, includeGlobalMutation)); // ASSERT_EQ(toNotify.size(), 0); // for (Version v = 1; v <= 5; ++v) { // auto toNotify = pending.getRequestsToNotify(getTestChanges(v, includeGlobalMutation)); // ASSERT_EQ(toNotify.size(), expectedMatches); // for (const auto& req : toNotify) { // pending.removeRequest(req); // } // } // } // TEST_CASE("/fdbserver/ConfigDB/ConfigBroadcaster/Internal/PendingRequestStore/Simple") { // ConfigBroadcasterImpl::runPendingRequestStoreTest(false, 2); // return Void(); // } // // TEST_CASE("/fdbserver/ConfigDB/ConfigBroadcaster/Internal/PendingRequestStore/GlobalMutation") { // ConfigBroadcasterImpl::runPendingRequestStoreTest(true, 4); // return Void(); // }