/* * ConfigNode.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include "fdbclient/SystemData.h" #include "fdbserver/ConfigNode.h" #include "fdbserver/IKeyValueStore.h" #include "fdbserver/OnDemandStore.h" #include "flow/Arena.h" #include "flow/genericactors.actor.h" #include "flow/UnitTest.h" #include "flow/actorcompiler.h" // This must be the last #include. namespace { const KeyRef lastCompactedVersionKey = "lastCompactedVersion"_sr; const KeyRef currentGenerationKey = "currentGeneration"_sr; const KeyRef registeredKey = "registered"_sr; const KeyRangeRef kvKeys = KeyRangeRef("kv/"_sr, "kv0"_sr); const KeyRangeRef mutationKeys = KeyRangeRef("mutation/"_sr, "mutation0"_sr); const KeyRangeRef annotationKeys = KeyRangeRef("annotation/"_sr, "annotation0"_sr); Key versionedAnnotationKey(Version version) { ASSERT_GE(version, 0); return BinaryWriter::toValue(bigEndian64(version), IncludeVersion()).withPrefix(annotationKeys.begin); } Version getVersionFromVersionedAnnotationKey(KeyRef versionedAnnotationKey) { return fromBigEndian64(BinaryReader::fromStringRef( versionedAnnotationKey.removePrefix(annotationKeys.begin), IncludeVersion())); } Key versionedMutationKey(Version version, uint32_t index) { ASSERT_GE(version, 0); BinaryWriter bw(IncludeVersion()); bw << bigEndian64(version); bw << bigEndian32(index); return bw.toValue().withPrefix(mutationKeys.begin); } Version getVersionFromVersionedMutationKey(KeyRef versionedMutationKey) { uint64_t bigEndianResult; ASSERT(versionedMutationKey.startsWith(mutationKeys.begin)); BinaryReader br(versionedMutationKey.removePrefix(mutationKeys.begin), IncludeVersion()); br >> bigEndianResult; return fromBigEndian64(bigEndianResult); } template void assertCommitted(RangeResult const& range, VectorRef versionedConfigs, std::function fn) { if (range.size() == 0) { return; } // Verify every versioned value read from disk (minus the last one which // may not be committed on a quorum) exists in the rollforward changes. for (auto it = range.begin(); it != std::prev(range.end()); ++it) { Version version = fn(it->key); auto resultIt = std::find_if( versionedConfigs.begin(), versionedConfigs.end(), [version](T const& o) { return o.version == version; }); ASSERT(resultIt != versionedConfigs.end()); } } } // namespace TEST_CASE("/fdbserver/ConfigDB/ConfigNode/Internal/versionedMutationKeys") { std::vector keys; for (Version version = 0; version < 1000; ++version) { for (int index = 0; index < 5; ++index) { keys.push_back(versionedMutationKey(version, index)); } } for (int i = 0; i < 5000; ++i) { ASSERT(getVersionFromVersionedMutationKey(keys[i]) == i / 5); } return Void(); } TEST_CASE("/fdbserver/ConfigDB/ConfigNode/Internal/versionedMutationKeyOrdering") { Standalone> keys; for (Version version = 0; version < 1000; ++version) { for (auto index = 0; index < 5; ++index) { keys.push_back_deep(keys.arena(), versionedMutationKey(version, index)); } } for (auto index = 0; index < 1000; ++index) { keys.push_back_deep(keys.arena(), versionedMutationKey(1000, index)); } ASSERT(std::is_sorted(keys.begin(), keys.end())); return Void(); } class ConfigNodeImpl { UID id; OnDemandStore kvStore; CounterCollection cc; // Follower counters Counter compactRequests; Counter rollbackRequests; Counter rollforwardRequests; Counter successfulChangeRequests; Counter failedChangeRequests; Counter snapshotRequests; Counter getCommittedVersionRequests; // Transaction counters Counter successfulCommits; Counter failedCommits; Counter setMutations; Counter clearMutations; Counter getValueRequests; Counter getGenerationRequests; Future logger; ACTOR static Future getGeneration(ConfigNodeImpl* self) { state ConfigGeneration generation; Optional value = wait(self->kvStore->readValue(currentGenerationKey)); if (value.present()) { generation = BinaryReader::fromStringRef(value.get(), IncludeVersion()); } else { self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion()))); wait(self->kvStore->commit()); } return generation; } ACTOR static Future getLastCompactedVersion(ConfigNodeImpl* self) { Optional value = wait(self->kvStore->readValue(lastCompactedVersionKey)); state Version lastCompactedVersion = 0; if (value.present()) { lastCompactedVersion = BinaryReader::fromStringRef(value.get(), IncludeVersion()); } else { self->kvStore->set( KeyValueRef(lastCompactedVersionKey, BinaryWriter::toValue(lastCompactedVersion, IncludeVersion()))); wait(self->kvStore->commit()); } return lastCompactedVersion; } // Returns all commit annotations between for commits with version in [startVersion, endVersion] ACTOR static Future>> getAnnotations(ConfigNodeImpl* self, Version startVersion, Version endVersion) { Key startKey = versionedAnnotationKey(startVersion); Key endKey = versionedAnnotationKey(endVersion + 1); state KeyRangeRef keys(startKey, endKey); RangeResult range = wait(self->kvStore->readRange(keys)); Standalone> result; for (const auto& kv : range) { auto version = getVersionFromVersionedAnnotationKey(kv.key); ASSERT_LE(version, endVersion); auto annotation = BinaryReader::fromStringRef(kv.value, IncludeVersion()); result.emplace_back_deep(result.arena(), version, annotation); } return result; } // Returns all mutations with version in [startVersion, endVersion] ACTOR static Future>> getMutations(ConfigNodeImpl* self, Version startVersion, Version endVersion) { Key startKey = versionedMutationKey(startVersion, 0); Key endKey = versionedMutationKey(endVersion + 1, 0); state KeyRangeRef keys(startKey, endKey); RangeResult range = wait(self->kvStore->readRange(keys)); Standalone> result; for (const auto& kv : range) { auto version = getVersionFromVersionedMutationKey(kv.key); ASSERT_LE(version, endVersion); auto mutation = ObjectReader::fromStringRef(kv.value, IncludeVersion()); result.emplace_back_deep(result.arena(), version, mutation); } return result; } ACTOR static Future getChanges(ConfigNodeImpl* self, ConfigFollowerGetChangesRequest req) { Version lastCompactedVersion = wait(getLastCompactedVersion(self)); if (req.lastSeenVersion < lastCompactedVersion) { ++self->failedChangeRequests; req.reply.sendError(version_already_compacted()); return Void(); } state Version committedVersion = wait(map(getGeneration(self), [](auto const& gen) { return gen.committedVersion; })); // TODO: Reenable this when running the ConfigIncrement workload with reboot=false if (committedVersion < req.mostRecentVersion) { // Handle a very rare case where a ConfigNode loses data between // responding with a committed version and responding to the // subsequent get changes request. TEST(true); // ConfigNode data loss occurred on a minority of coordinators req.reply.sendError(process_behind()); // Reuse the process_behind error return Void(); } state Standalone> versionedMutations = wait(getMutations(self, req.lastSeenVersion + 1, committedVersion)); state Standalone> versionedAnnotations = wait(getAnnotations(self, req.lastSeenVersion + 1, committedVersion)); TraceEvent(SevDebug, "ConfigNodeSendingChanges", self->id) .detail("ReqLastSeenVersion", req.lastSeenVersion) .detail("CommittedVersion", committedVersion) .detail("NumMutations", versionedMutations.size()) .detail("NumCommits", versionedAnnotations.size()); ++self->successfulChangeRequests; req.reply.send(ConfigFollowerGetChangesReply{ committedVersion, versionedMutations, versionedAnnotations }); return Void(); } // New transactions increment the database's current live version. This effectively serves as a lock, providing // serializability ACTOR static Future getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) { state ConfigGeneration generation = wait(getGeneration(self)); ++generation.liveVersion; if (req.lastSeenLiveVersion.present()) { TEST(req.lastSeenLiveVersion.get() >= generation.liveVersion); // Node is lagging behind some other node generation.liveVersion = std::max(generation.liveVersion, req.lastSeenLiveVersion.get() + 1); } self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion()))); wait(self->kvStore->commit()); req.reply.send(ConfigTransactionGetGenerationReply{ generation }); return Void(); } ACTOR static Future get(ConfigNodeImpl* self, ConfigTransactionGetRequest req) { ConfigGeneration currentGeneration = wait(getGeneration(self)); if (req.generation != currentGeneration) { // TODO: Also send information about highest seen version req.reply.sendError(transaction_too_old()); return Void(); } state Optional serializedValue = wait(self->kvStore->readValue(BinaryWriter::toValue(req.key, IncludeVersion()).withPrefix(kvKeys.begin))); state Optional value; if (serializedValue.present()) { value = ObjectReader::fromStringRef(serializedValue.get(), IncludeVersion()); } Standalone> versionedMutations = wait(getMutations(self, 0, req.generation.committedVersion)); for (const auto& versionedMutation : versionedMutations) { const auto& mutation = versionedMutation.mutation; if (mutation.getKey() == req.key) { if (mutation.isSet()) { value = mutation.getValue(); } else { value = {}; } } } req.reply.send(ConfigTransactionGetReply{ value }); return Void(); } // Retrieve all configuration classes that contain explicitly defined knobs // TODO: Currently it is possible that extra configuration classes may be returned, we // may want to fix this to clean up the contract ACTOR static Future getConfigClasses(ConfigNodeImpl* self, ConfigTransactionGetConfigClassesRequest req) { ConfigGeneration currentGeneration = wait(getGeneration(self)); if (req.generation != currentGeneration) { req.reply.sendError(transaction_too_old()); return Void(); } state RangeResult snapshot = wait(self->kvStore->readRange(kvKeys)); state std::set configClassesSet; for (const auto& kv : snapshot) { auto configKey = BinaryReader::fromStringRef(kv.key.removePrefix(kvKeys.begin), IncludeVersion()); if (configKey.configClass.present()) { configClassesSet.insert(configKey.configClass.get()); } } state Version lastCompactedVersion = wait(getLastCompactedVersion(self)); state Standalone> mutations = wait(getMutations(self, lastCompactedVersion + 1, req.generation.committedVersion)); for (const auto& versionedMutation : mutations) { auto configClass = versionedMutation.mutation.getConfigClass(); if (configClass.present()) { configClassesSet.insert(configClass.get()); } } Standalone> configClasses; for (const auto& configClass : configClassesSet) { configClasses.push_back_deep(configClasses.arena(), configClass); } req.reply.send(ConfigTransactionGetConfigClassesReply{ configClasses }); return Void(); } // Retrieve all knobs explicitly defined for the specified configuration class ACTOR static Future getKnobs(ConfigNodeImpl* self, ConfigTransactionGetKnobsRequest req) { ConfigGeneration currentGeneration = wait(getGeneration(self)); if (req.generation != currentGeneration) { req.reply.sendError(transaction_too_old()); return Void(); } // FIXME: Filtering after reading from disk is very inefficient state RangeResult snapshot = wait(self->kvStore->readRange(kvKeys)); state std::set knobSet; for (const auto& kv : snapshot) { auto configKey = BinaryReader::fromStringRef(kv.key.removePrefix(kvKeys.begin), IncludeVersion()); if (configKey.configClass.template castTo() == req.configClass) { knobSet.insert(configKey.knobName); } } state Version lastCompactedVersion = wait(getLastCompactedVersion(self)); state Standalone> mutations = wait(getMutations(self, lastCompactedVersion + 1, req.generation.committedVersion)); for (const auto& versionedMutation : mutations) { if (versionedMutation.mutation.getConfigClass().template castTo() == req.configClass) { if (versionedMutation.mutation.isSet()) { knobSet.insert(versionedMutation.mutation.getKnobName()); } else { knobSet.erase(versionedMutation.mutation.getKnobName()); } } } Standalone> knobNames; for (const auto& knobName : knobSet) { knobNames.push_back_deep(knobNames.arena(), knobName); } req.reply.send(ConfigTransactionGetKnobsReply{ knobNames }); return Void(); } ACTOR static Future commitMutations(ConfigNodeImpl* self, Standalone> mutations, Standalone> annotations, Version commitVersion) { Version latestVersion = 0; int index = 0; for (const auto& mutation : mutations) { if (mutation.version > commitVersion) { continue; } // Mutations should be in ascending version order. ASSERT_GE(mutation.version, latestVersion); if (mutation.version > latestVersion) { latestVersion = mutation.version; index = 0; } Key key = versionedMutationKey(mutation.version, index++); Value value = ObjectWriter::toValue(mutation.mutation, IncludeVersion()); if (mutation.mutation.isSet()) { TraceEvent("ConfigNodeSetting") .detail("ConfigClass", mutation.mutation.getConfigClass()) .detail("KnobName", mutation.mutation.getKnobName()) .detail("Value", mutation.mutation.getValue().toString()) .detail("Version", mutation.version); ++self->setMutations; } else { ++self->clearMutations; } self->kvStore->set(KeyValueRef(key, value)); } for (const auto& annotation : annotations) { self->kvStore->set(KeyValueRef(versionedAnnotationKey(annotation.version), BinaryWriter::toValue(annotation.annotation, IncludeVersion()))); } ConfigGeneration newGeneration = { commitVersion, commitVersion }; self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(newGeneration, IncludeVersion()))); wait(self->kvStore->commit()); ++self->successfulCommits; return Void(); } ACTOR static Future commit(ConfigNodeImpl* self, ConfigTransactionCommitRequest req) { ConfigGeneration currentGeneration = wait(getGeneration(self)); if (req.generation.committedVersion != currentGeneration.committedVersion) { ++self->failedCommits; req.reply.sendError(commit_unknown_result()); return Void(); } else if (req.generation.liveVersion != currentGeneration.liveVersion) { ++self->failedCommits; req.reply.sendError(not_committed()); return Void(); } Standalone> mutations; for (const auto& mutation : req.mutations) { mutations.emplace_back_deep(mutations.arena(), req.generation.liveVersion, mutation); } Standalone> annotations; annotations.emplace_back_deep(annotations.arena(), req.generation.liveVersion, req.annotation); wait(commitMutations(self, mutations, annotations, req.generation.liveVersion)); req.reply.send(Void()); return Void(); } ACTOR static Future serve(ConfigNodeImpl* self, ConfigTransactionInterface const* cti) { loop { choose { when(ConfigTransactionGetGenerationRequest req = waitNext(cti->getGeneration.getFuture())) { ++self->getGenerationRequests; wait(getNewGeneration(self, req)); } when(ConfigTransactionGetRequest req = waitNext(cti->get.getFuture())) { ++self->getValueRequests; wait(get(self, req)); } when(ConfigTransactionCommitRequest req = waitNext(cti->commit.getFuture())) { wait(commit(self, req)); } when(ConfigTransactionGetConfigClassesRequest req = waitNext(cti->getClasses.getFuture())) { wait(getConfigClasses(self, req)); } when(ConfigTransactionGetKnobsRequest req = waitNext(cti->getKnobs.getFuture())) { wait(getKnobs(self, req)); } when(wait(self->kvStore->getError())) { ASSERT(false); } } } } ACTOR static Future getSnapshotAndChanges(ConfigNodeImpl* self, ConfigFollowerGetSnapshotAndChangesRequest req) { state ConfigFollowerGetSnapshotAndChangesReply reply; RangeResult data = wait(self->kvStore->readRange(kvKeys)); for (const auto& kv : data) { reply .snapshot[BinaryReader::fromStringRef(kv.key.removePrefix(kvKeys.begin), IncludeVersion())] = ObjectReader::fromStringRef(kv.value, IncludeVersion()); } wait(store(reply.snapshotVersion, getLastCompactedVersion(self))); wait(store(reply.changes, getMutations(self, reply.snapshotVersion + 1, req.mostRecentVersion))); wait(store(reply.annotations, getAnnotations(self, reply.snapshotVersion + 1, req.mostRecentVersion))); TraceEvent(SevDebug, "ConfigNodeGettingSnapshot", self->id) .detail("SnapshotVersion", reply.snapshotVersion) .detail("SnapshotSize", reply.snapshot.size()) .detail("ChangesSize", reply.changes.size()); req.reply.send(reply); return Void(); } // Apply mutations from the WAL in mutationKeys into the kvKeys key space. // Periodic compaction prevents the database from growing too large, and improve read performance. // However, commit annotations for compacted mutations are lost ACTOR static Future compact(ConfigNodeImpl* self, ConfigFollowerCompactRequest req) { state Version lastCompactedVersion = wait(getLastCompactedVersion(self)); TraceEvent(SevDebug, "ConfigNodeCompacting", self->id) .detail("Version", req.version) .detail("LastCompacted", lastCompactedVersion); if (req.version <= lastCompactedVersion) { req.reply.send(Void()); return Void(); } Standalone> versionedMutations = wait(getMutations(self, lastCompactedVersion + 1, req.version)); self->kvStore->clear( KeyRangeRef(versionedMutationKey(lastCompactedVersion + 1, 0), versionedMutationKey(req.version + 1, 0))); self->kvStore->clear( KeyRangeRef(versionedAnnotationKey(lastCompactedVersion + 1), versionedAnnotationKey(req.version + 1))); for (const auto& versionedMutation : versionedMutations) { const auto& version = versionedMutation.version; const auto& mutation = versionedMutation.mutation; if (version > req.version) { break; } else { TraceEvent(SevDebug, "ConfigNodeCompactionApplyingMutation", self->id) .detail("IsSet", mutation.isSet()) .detail("MutationVersion", version) .detail("LastCompactedVersion", lastCompactedVersion) .detail("ReqVersion", req.version); auto serializedKey = BinaryWriter::toValue(mutation.getKey(), IncludeVersion()); if (mutation.isSet()) { self->kvStore->set(KeyValueRef(serializedKey.withPrefix(kvKeys.begin), ObjectWriter::toValue(mutation.getValue(), IncludeVersion()))); } else { self->kvStore->clear(singleKeyRange(serializedKey.withPrefix(kvKeys.begin))); } lastCompactedVersion = version; } } self->kvStore->set( KeyValueRef(lastCompactedVersionKey, BinaryWriter::toValue(lastCompactedVersion, IncludeVersion()))); wait(self->kvStore->commit()); req.reply.send(Void()); return Void(); } ACTOR static Future rollforward(ConfigNodeImpl* self, ConfigFollowerRollforwardRequest req) { Version lastCompactedVersion = wait(getLastCompactedVersion(self)); if (req.lastKnownCommitted < lastCompactedVersion) { req.reply.sendError(version_already_compacted()); return Void(); } state ConfigGeneration currentGeneration = wait(getGeneration(self)); if (req.lastKnownCommitted != currentGeneration.committedVersion) { req.reply.sendError(transaction_too_old()); return Void(); } TraceEvent("ConfigNodeRollforward") .detail("RollbackTo", req.rollback) .detail("Target", req.target) .detail("LastKnownCommitted", req.lastKnownCommitted) .detail("Committed", currentGeneration.committedVersion); // Rollback to prior known committed version to erase any commits not // made on a quorum. if (req.rollback.present() && req.rollback.get() < currentGeneration.committedVersion) { if (g_network->isSimulated()) { RangeResult mutationRange = wait(self->kvStore->readRange( KeyRangeRef(versionedMutationKey(req.rollback.get() + 1, 0), versionedMutationKey(currentGeneration.committedVersion + 1, 0)))); // assertCommitted(mutationRange, req.mutations, getVersionFromVersionedMutationKey); RangeResult annotationRange = wait(self->kvStore->readRange( KeyRangeRef(versionedAnnotationKey(req.rollback.get() + 1), versionedAnnotationKey(currentGeneration.committedVersion + 1)))); // assertCommitted(annotationRange, req.annotations, getVersionFromVersionedAnnotationKey); } self->kvStore->clear(KeyRangeRef(versionedMutationKey(req.rollback.get() + 1, 0), versionedMutationKey(currentGeneration.committedVersion + 1, 0))); self->kvStore->clear(KeyRangeRef(versionedAnnotationKey(req.rollback.get() + 1), versionedAnnotationKey(currentGeneration.committedVersion + 1))); currentGeneration.committedVersion = req.rollback.get(); // The mutation commit loop below should persist the new generation // to disk, so we don't need to do it here. } // Now rollforward by applying all mutations between last known // committed version and rollforward version. ASSERT_GT(req.mutations[0].version, currentGeneration.committedVersion); wait(commitMutations(self, req.mutations, req.annotations, req.target)); req.reply.send(Void()); return Void(); } ACTOR static Future getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) { ConfigGeneration generation = wait(getGeneration(self)); req.reply.send(ConfigFollowerGetCommittedVersionReply{ generation.committedVersion }); return Void(); } ACTOR static Future serve(ConfigNodeImpl* self, ConfigFollowerInterface const* cfi) { loop { choose { when(ConfigFollowerGetSnapshotAndChangesRequest req = waitNext(cfi->getSnapshotAndChanges.getFuture())) { ++self->snapshotRequests; wait(getSnapshotAndChanges(self, req)); } when(ConfigFollowerGetChangesRequest req = waitNext(cfi->getChanges.getFuture())) { wait(getChanges(self, req)); } when(ConfigFollowerCompactRequest req = waitNext(cfi->compact.getFuture())) { ++self->compactRequests; wait(compact(self, req)); } when(ConfigFollowerRollforwardRequest req = waitNext(cfi->rollforward.getFuture())) { ++self->rollforwardRequests; wait(rollforward(self, req)); } when(ConfigFollowerGetCommittedVersionRequest req = waitNext(cfi->getCommittedVersion.getFuture())) { ++self->getCommittedVersionRequests; wait(getCommittedVersion(self, req)); } when(wait(self->kvStore->getError())) { ASSERT(false); } } } } ACTOR static Future serve(ConfigNodeImpl* self, ConfigBroadcastInterface const* cbi, bool infinite) { loop { choose { when(state ConfigBroadcastRegisteredRequest req = waitNext(cbi->registered.getFuture())) { bool isRegistered = wait(registered(self)); req.reply.send(ConfigBroadcastRegisteredReply{ isRegistered }); } when(ConfigBroadcastReadyRequest readyReq = waitNext(cbi->ready.getFuture())) { readyReq.reply.send(ConfigBroadcastReadyReply{}); if (!infinite) { return Void(); } } } } } ACTOR static Future serve(ConfigNodeImpl* self, ConfigBroadcastInterface const* cbi, ConfigTransactionInterface const* cti, ConfigFollowerInterface const* cfi) { wait(serve(self, cbi, false)); self->kvStore->set(KeyValueRef(registeredKey, BinaryWriter::toValue(true, IncludeVersion()))); wait(self->kvStore->commit()); // Shouldn't return (coordinationServer will throw an error if it does). wait(serve(self, cbi, true) || serve(self, cti) || serve(self, cfi)); return Void(); } ACTOR static Future registered(ConfigNodeImpl* self) { Optional value = wait(self->kvStore->readValue(registeredKey)); return value.present(); } public: ConfigNodeImpl(std::string const& folder) : id(deterministicRandom()->randomUniqueID()), kvStore(folder, id, "globalconf-"), cc("ConfigNode"), compactRequests("CompactRequests", cc), rollbackRequests("RollbackRequests", cc), rollforwardRequests("RollforwardRequests", cc), successfulChangeRequests("SuccessfulChangeRequests", cc), failedChangeRequests("FailedChangeRequests", cc), snapshotRequests("SnapshotRequests", cc), getCommittedVersionRequests("GetCommittedVersionRequests", cc), successfulCommits("SuccessfulCommits", cc), failedCommits("FailedCommits", cc), setMutations("SetMutations", cc), clearMutations("ClearMutations", cc), getValueRequests("GetValueRequests", cc), getGenerationRequests("GetGenerationRequests", cc) { logger = traceCounters("ConfigNodeMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigNode"); TraceEvent(SevDebug, "StartingConfigNode", id).detail("KVStoreAlreadyExists", kvStore.exists()); } Future serve(ConfigBroadcastInterface const& cbi, ConfigTransactionInterface const& cti, ConfigFollowerInterface const& cfi) { return serve(this, &cbi, &cti, &cfi); } Future serve(ConfigTransactionInterface const& cti) { return serve(this, &cti); } Future serve(ConfigFollowerInterface const& cfi) { return serve(this, &cfi); } void close() { kvStore.close(); } Future onClosed() { return kvStore.onClosed(); } }; ConfigNode::ConfigNode(std::string const& folder) : impl(PImpl::create(folder)) {} ConfigNode::~ConfigNode() = default; Future ConfigNode::serve(ConfigBroadcastInterface const& cbi, ConfigTransactionInterface const& cti, ConfigFollowerInterface const& cfi) { return impl->serve(cbi, cti, cfi); } Future ConfigNode::serve(ConfigTransactionInterface const& cti) { return impl->serve(cti); } Future ConfigNode::serve(ConfigFollowerInterface const& cfi) { return impl->serve(cfi); } void ConfigNode::close() { impl->close(); } Future ConfigNode::onClosed() { return impl->onClosed(); }