/* * LocalConfiguration.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbclient/IKnobCollection.h" #include "fdbrpc/Stats.h" #include "fdbserver/ConfigBroadcastFollowerInterface.h" #include "fdbserver/IKeyValueStore.h" #include "fdbserver/LocalConfiguration.h" #include "fdbserver/OnDemandStore.h" #include "flow/UnitTest.h" #include "flow/actorcompiler.h" // This must be the last #include. namespace { const KeyRef configPathKey = "configPath"_sr; const KeyRef lastSeenVersionKey = "lastSeenVersion"_sr; const KeyRangeRef knobOverrideKeys = KeyRangeRef("knobOverride/"_sr, "knobOverride0"_sr); KeyRef stringToKeyRef(std::string const& s) { return StringRef(reinterpret_cast(s.c_str()), s.size()); } class ConfigKnobOverrides { Standalone> configPath; std::map, std::map> configClassToKnobToValue; public: ConfigKnobOverrides() = default; explicit ConfigKnobOverrides(std::string const& paramString) { configClassToKnobToValue[{}] = {}; if (std::all_of(paramString.begin(), paramString.end(), [](char c) { return isalpha(c) || isdigit(c) || c == '/' || c == '-'; })) { StringRef s = stringToKeyRef(paramString); while (s.size()) { configPath.push_back_deep(configPath.arena(), s.eat("/"_sr)); configClassToKnobToValue[configPath.back()] = {}; } } else { fprintf(stderr, "WARNING: Invalid configuration path: `%s'\n", paramString.c_str()); } } ConfigClassSet getConfigClassSet() const { return ConfigClassSet(configPath); } void set(Optional configClass, KeyRef knobName, KnobValueRef value) { configClassToKnobToValue[configClass.castTo()][knobName] = value; } void remove(Optional configClass, KeyRef knobName) { configClassToKnobToValue[configClass.castTo()].erase(knobName); } void update(IKnobCollection& knobCollection) const { // Apply global overrides const auto& knobToValue = configClassToKnobToValue.at({}); for (const auto& [knobName, knobValue] : knobToValue) { knobCollection.setKnob(knobName.toString(), knobValue); } // Apply specific overrides for (const auto& configClass : configPath) { const auto& knobToValue = configClassToKnobToValue.at(configClass); for (const auto& [knobName, knobValue] : knobToValue) { knobCollection.setKnob(knobName.toString(), knobValue); } } } bool hasSameConfigPath(ConfigKnobOverrides const& other) const { return configPath == other.configPath; } template void serialize(Ar& ar) { serializer(ar, configPath); } }; class ManualKnobOverrides { std::map overrides; public: explicit ManualKnobOverrides(std::map const& overrides) { for (const auto& [knobName, knobValueString] : overrides) { try { auto knobValue = IKnobCollection::parseKnobValue(knobName, knobValueString, IKnobCollection::Type::TEST); this->overrides[stringToKeyRef(knobName)] = knobValue; } catch (Error& e) { if (e.code() == error_code_invalid_option) { fprintf(stderr, "WARNING: Unrecognized knob option '%s'\n", knobName.c_str()); TraceEvent(SevWarnAlways, "UnrecognizedKnobOption").detail("Knob", printable(knobName)); } else { throw e; } } } } void update(IKnobCollection& knobCollection) { for (const auto& [knobName, knobValue] : overrides) { knobCollection.setKnob(knobName.toString(), knobValue); } } }; } // namespace class LocalConfigurationImpl { UID id; OnDemandStore kvStore; Future initFuture; Version lastSeenVersion{ 0 }; ManualKnobOverrides manualKnobOverrides; ConfigKnobOverrides configKnobOverrides; std::unique_ptr testKnobCollection; IKnobCollection& getKnobs() { return testKnobCollection ? *testKnobCollection : IKnobCollection::getMutableGlobalKnobCollection(); } IKnobCollection const& getKnobs() const { return testKnobCollection ? *testKnobCollection : IKnobCollection::getGlobalKnobCollection(); } CounterCollection cc; Counter broadcasterChanges; Counter snapshots; Counter changeRequestsFetched; Counter mutations; Future logger; ACTOR static Future saveConfigPath(LocalConfigurationImpl* self) { self->kvStore->set( KeyValueRef(configPathKey, BinaryWriter::toValue(self->configKnobOverrides, IncludeVersion()))); wait(self->kvStore->commit()); return Void(); } ACTOR static Future clearKVStore(LocalConfigurationImpl* self) { self->kvStore->clear(singleKeyRange(configPathKey)); self->kvStore->clear(knobOverrideKeys); wait(self->kvStore->commit()); return Void(); } ACTOR static Future getLastSeenVersion(LocalConfigurationImpl* self) { state Version result = 0; state Optional lastSeenVersionValue = wait(self->kvStore->readValue(lastSeenVersionKey)); if (!lastSeenVersionValue.present()) { self->kvStore->set(KeyValueRef(lastSeenVersionKey, BinaryWriter::toValue(result, IncludeVersion()))); wait(self->kvStore->commit()); } else { result = BinaryReader::fromStringRef(lastSeenVersionValue.get(), IncludeVersion()); } return result; } ACTOR static Future initialize(LocalConfigurationImpl* self) { state Version lastSeenVersion = wait(getLastSeenVersion(self)); state Optional storedConfigPathValue = wait(self->kvStore->readValue(configPathKey)); if (!storedConfigPathValue.present()) { wait(saveConfigPath(self)); self->updateInMemoryState(lastSeenVersion); return Void(); } state ConfigKnobOverrides storedConfigPath = BinaryReader::fromStringRef(storedConfigPathValue.get(), IncludeVersion()); if (!storedConfigPath.hasSameConfigPath(self->configKnobOverrides)) { // All local information is outdated wait(clearKVStore(self)); wait(saveConfigPath(self)); self->updateInMemoryState(lastSeenVersion); return Void(); } Standalone range = wait(self->kvStore->readRange(knobOverrideKeys)); for (const auto& kv : range) { auto configKey = BinaryReader::fromStringRef(kv.key.removePrefix(knobOverrideKeys.begin), IncludeVersion()); self->configKnobOverrides.set(configKey.configClass, configKey.knobName, ObjectReader::fromStringRef(kv.value, IncludeVersion())); } self->updateInMemoryState(lastSeenVersion); return Void(); } void updateInMemoryState(Version lastSeenVersion) { this->lastSeenVersion = lastSeenVersion; // TODO: Support randomization? getKnobs().reset(Randomize::NO, g_network->isSimulated() ? IsSimulated::YES : IsSimulated::NO); configKnobOverrides.update(getKnobs()); manualKnobOverrides.update(getKnobs()); // Must reinitialize in order to update dependent knobs getKnobs().initialize(Randomize::NO, g_network->isSimulated() ? IsSimulated::YES : IsSimulated::NO); } ACTOR static Future setSnapshot(LocalConfigurationImpl* self, std::map snapshot, Version snapshotVersion) { // TODO: Concurrency control? ASSERT(self->initFuture.isValid() && self->initFuture.isReady()); ++self->snapshots; self->kvStore->clear(knobOverrideKeys); for (const auto& [configKey, knobValue] : snapshot) { self->configKnobOverrides.set(configKey.configClass, configKey.knobName, knobValue); self->kvStore->set( KeyValueRef(BinaryWriter::toValue(configKey, IncludeVersion()).withPrefix(knobOverrideKeys.begin), ObjectWriter::toValue(knobValue, IncludeVersion()))); } ASSERT_GE(snapshotVersion, self->lastSeenVersion); self->kvStore->set(KeyValueRef(lastSeenVersionKey, BinaryWriter::toValue(snapshotVersion, IncludeVersion()))); wait(self->kvStore->commit()); self->updateInMemoryState(snapshotVersion); return Void(); } ACTOR static Future addChanges(LocalConfigurationImpl* self, Standalone> changes, Version mostRecentVersion) { // TODO: Concurrency control? ASSERT(self->initFuture.isValid() && self->initFuture.isReady()); ++self->changeRequestsFetched; for (const auto& versionedMutation : changes) { ++self->mutations; const auto& mutation = versionedMutation.mutation; auto serializedKey = BinaryWriter::toValue(mutation.getKey(), IncludeVersion()); if (mutation.isSet()) { self->kvStore->set(KeyValueRef(serializedKey.withPrefix(knobOverrideKeys.begin), ObjectWriter::toValue(mutation.getValue(), IncludeVersion()))); self->configKnobOverrides.set(mutation.getConfigClass(), mutation.getKnobName(), mutation.getValue()); } else { self->kvStore->clear(singleKeyRange(serializedKey.withPrefix(knobOverrideKeys.begin))); self->configKnobOverrides.remove(mutation.getConfigClass(), mutation.getKnobName()); } } self->kvStore->set(KeyValueRef(lastSeenVersionKey, BinaryWriter::toValue(mostRecentVersion, IncludeVersion()))); wait(self->kvStore->commit()); self->updateInMemoryState(mostRecentVersion); return Void(); } ACTOR static Future consumeInternal(LocalConfigurationImpl* self, ConfigBroadcastFollowerInterface broadcaster) { loop { try { state ConfigBroadcastFollowerGetChangesReply changesReply = wait(broadcaster.getChanges.getReply(ConfigBroadcastFollowerGetChangesRequest{ self->lastSeenVersion, self->configKnobOverrides.getConfigClassSet() })); TraceEvent(SevDebug, "LocalConfigGotChanges", self->id) .detail("Size", changesReply.changes.size()) .detail("Version", changesReply.mostRecentVersion); wait(self->addChanges(changesReply.changes, changesReply.mostRecentVersion)); } catch (Error& e) { if (e.code() == error_code_version_already_compacted) { state ConfigBroadcastFollowerGetSnapshotReply snapshotReply = wait(broadcaster.getSnapshot.getReply( ConfigBroadcastFollowerGetSnapshotRequest{ self->configKnobOverrides.getConfigClassSet() })); ASSERT_GT(snapshotReply.version, self->lastSeenVersion); ++self->snapshots; wait(setSnapshot(self, std::move(snapshotReply.snapshot), snapshotReply.version)); } else { throw e; } } wait(yield()); // Necessary to not immediately trigger retry? } } ACTOR static Future consume( LocalConfigurationImpl* self, Reference const> broadcaster) { ASSERT(self->initFuture.isValid() && self->initFuture.isReady()); loop { choose { when(wait(brokenPromiseToNever(consumeInternal(self, broadcaster->get())))) { ASSERT(false); } when(wait(broadcaster->onChange())) { ++self->broadcasterChanges; } when(wait(self->kvStore->getError())) { ASSERT(false); } } } } public: LocalConfigurationImpl(std::string const& dataFolder, std::string const& configPath, std::map const& manualKnobOverrides, IsTest isTest) : id(deterministicRandom()->randomUniqueID()), kvStore(dataFolder, id, "localconf-"), cc("LocalConfiguration"), broadcasterChanges("BroadcasterChanges", cc), snapshots("Snapshots", cc), changeRequestsFetched("ChangeRequestsFetched", cc), mutations("Mutations", cc), configKnobOverrides(configPath), manualKnobOverrides(manualKnobOverrides) { if (isTest == IsTest::YES) { testKnobCollection = IKnobCollection::create(IKnobCollection::Type::TEST, Randomize::NO, g_network->isSimulated() ? IsSimulated::YES : IsSimulated::NO); } logger = traceCounters( "LocalConfigurationMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "LocalConfigurationMetrics"); } Future initialize() { ASSERT(!initFuture.isValid()); initFuture = initialize(this); return initFuture; } Future addChanges(Standalone> changes, Version mostRecentVersion) { return addChanges(this, changes, mostRecentVersion); } FlowKnobs const& getFlowKnobs() const { ASSERT(initFuture.isValid() && initFuture.isReady()); return getKnobs().getFlowKnobs(); } ClientKnobs const& getClientKnobs() const { ASSERT(initFuture.isValid() && initFuture.isReady()); return getKnobs().getClientKnobs(); } ServerKnobs const& getServerKnobs() const { ASSERT(initFuture.isValid() && initFuture.isReady()); return getKnobs().getServerKnobs(); } TestKnobs const& getTestKnobs() const { ASSERT(initFuture.isValid() && initFuture.isReady()); return getKnobs().getTestKnobs(); } Future consume(Reference const> const& broadcaster) { return consume(this, broadcaster); } UID getID() const { return id; } }; LocalConfiguration::LocalConfiguration(std::string const& dataFolder, std::string const& configPath, std::map const& manualKnobOverrides, IsTest isTest) : _impl(std::make_unique(dataFolder, configPath, manualKnobOverrides, isTest)) {} LocalConfiguration::LocalConfiguration(LocalConfiguration&&) = default; LocalConfiguration& LocalConfiguration::operator=(LocalConfiguration&&) = default; LocalConfiguration::~LocalConfiguration() = default; Future LocalConfiguration::initialize() { return impl().initialize(); } FlowKnobs const& LocalConfiguration::getFlowKnobs() const { return impl().getFlowKnobs(); } ClientKnobs const& LocalConfiguration::getClientKnobs() const { return impl().getClientKnobs(); } ServerKnobs const& LocalConfiguration::getServerKnobs() const { return impl().getServerKnobs(); } TestKnobs const& LocalConfiguration::getTestKnobs() const { return impl().getTestKnobs(); } Future LocalConfiguration::consume( Reference const> const& broadcaster) { return impl().consume(broadcaster); } Future LocalConfiguration::addChanges(Standalone> changes, Version mostRecentVersion) { return impl().addChanges(changes, mostRecentVersion); } UID LocalConfiguration::getID() const { return impl().getID(); }