diff --git a/fdbclient/ConfigDatabaseInterface.h b/fdbclient/ConfigDatabaseInterface.h index 5c479d6863..4d1355877b 100644 --- a/fdbclient/ConfigDatabaseInterface.h +++ b/fdbclient/ConfigDatabaseInterface.h @@ -79,11 +79,11 @@ struct ConfigDatabaseGetRequest { struct ConfigDatabaseCommitRequest { static constexpr FileIdentifier file_identifier = 103841; Version version; - VectorRef mutations; + Standalone> mutations; ReplyPromise reply; ConfigDatabaseCommitRequest() = default; - ConfigDatabaseCommitRequest(Version version, VectorRef mutations) + ConfigDatabaseCommitRequest(Version version, Standalone> mutations) : version(version), mutations(mutations) {} template @@ -98,7 +98,9 @@ struct ConfigDatabaseInterface { struct RequestStream get; struct RequestStream commit; - ConfigDatabaseInterface() { + ConfigDatabaseInterface() = default; + + void setupWellKnownEndpoints() { getVersion.makeWellKnownEndpoint(WLTOKEN_CONFIGDB_GETVERSION, TaskPriority::Coordination); get.makeWellKnownEndpoint(WLTOKEN_CONFIGDB_GET, TaskPriority::Coordination); commit.makeWellKnownEndpoint(WLTOKEN_CONFIGDB_COMMIT, TaskPriority::Coordination); @@ -106,7 +108,7 @@ struct ConfigDatabaseInterface { ConfigDatabaseInterface(NetworkAddress const& remote) : getVersion(Endpoint({ remote }, WLTOKEN_CONFIGDB_GETVERSION)), get(Endpoint({ remote }, WLTOKEN_CONFIGDB_GET)), - commit(Endpoint({ remote }, WLTOKEN_CONFIGDB_GET)) {} + commit(Endpoint({ remote }, WLTOKEN_CONFIGDB_COMMIT)) {} template void serialize(Ar& ar) { diff --git a/fdbclient/IConfigurationDatabase.h b/fdbclient/IConfigurationDatabase.h index f7aa904c58..bea7842285 100644 --- a/fdbclient/IConfigurationDatabase.h +++ b/fdbclient/IConfigurationDatabase.h @@ -35,6 +35,7 @@ public: virtual Future> get(KeyRef) = 0; virtual Future commit() = 0; virtual Future onError(Error const&) = 0; + virtual void reset() = 0; }; class SimpleConfigurationTransaction : public IConfigurationTransaction { @@ -48,4 +49,5 @@ public: Future> get(KeyRef) override; Future commit() override; Future onError(Error const&) override; + void reset() override; }; diff --git a/fdbclient/SimpleConfigurationDatabase.actor.cpp b/fdbclient/SimpleConfigurationDatabase.actor.cpp index 2772fff0cb..5dd3afc707 100644 --- a/fdbclient/SimpleConfigurationDatabase.actor.cpp +++ b/fdbclient/SimpleConfigurationDatabase.actor.cpp @@ -26,8 +26,7 @@ #include "flow/actorcompiler.h" // This must be the last #include. class SimpleConfigurationTransactionImpl { - Arena arena; - VectorRef mutations; + Standalone> mutations; Future version; ConfigDatabaseInterface cdbi; @@ -62,17 +61,24 @@ public: cdbi = ConfigDatabaseInterface(coordinators[0]); } - void set(KeyRef key, ValueRef value) { mutations.emplace_back(arena, MutationRef::Type::SetValue, key, value); } + void set(KeyRef key, ValueRef value) { + mutations.emplace_back_deep(mutations.arena(), MutationRef::Type::SetValue, key, value); + } void clearRange(KeyRef begin, KeyRef end) { - mutations.emplace_back(arena, MutationRef::Type::ClearRange, begin, end); + mutations.emplace_back_deep(mutations.arena(), MutationRef::Type::ClearRange, begin, end); } Future> get(KeyRef key) { return get(this, key); } Future commit() { return commit(this); } - Future onError(Error const& e) { return Void(); } + Future onError(Error const& e) { throw e; } + + void reset() { + version.cancel(); + mutations = Standalone>{}; + } }; void SimpleConfigurationTransaction::set(KeyRef key, ValueRef value) { @@ -95,6 +101,10 @@ Future SimpleConfigurationTransaction::onError(Error const& e) { return impl->onError(e); } +void SimpleConfigurationTransaction::reset() { + return impl->reset(); +} + SimpleConfigurationTransaction::SimpleConfigurationTransaction(ClusterConnectionString const& ccs) : impl(std::make_unique(ccs)) {} diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index ec38714d15..08a4706408 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -20,6 +20,7 @@ #include "fdbclient/ConfigDatabaseInterface.h" #include "fdbserver/CoordinationInterface.h" +#include "fdbserver/IConfigDatabaseNode.h" #include "fdbserver/IKeyValueStore.h" #include "fdbserver/Knobs.h" #include "fdbserver/WorkerInterface.actor.h" @@ -618,6 +619,8 @@ ACTOR Future coordinationServer(std::string dataFolder) { state GenerationRegInterface myInterface(g_network); state OnDemandStore store(dataFolder, myID); state ConfigDatabaseInterface configDatabaseInterface; + configDatabaseInterface.setupWellKnownEndpoints(); + state Reference configDatabaseNode = makeReference(dataFolder); TraceEvent("CoordinationServer", myID) .detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress()) @@ -625,7 +628,7 @@ ACTOR Future coordinationServer(std::string dataFolder) { try { wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID) || - store.getError()); + store.getError() || configDatabaseNode->serve(configDatabaseInterface)); throw internal_error(); } catch (Error& e) { TraceEvent("CoordinationServerError", myID).error(e, true); diff --git a/fdbserver/IConfigDatabaseNode.h b/fdbserver/IConfigDatabaseNode.h index b799bc79e1..5db473bf99 100644 --- a/fdbserver/IConfigDatabaseNode.h +++ b/fdbserver/IConfigDatabaseNode.h @@ -21,11 +21,12 @@ #pragma once #include "fdbclient/ConfigDatabaseInterface.h" +#include "flow/FastRef.h" #include "flow/flow.h" #include -class IConfigDatabaseNode { +class IConfigDatabaseNode : public ReferenceCounted { public: virtual Future serve(ConfigDatabaseInterface&) = 0; }; @@ -34,7 +35,7 @@ class SimpleConfigDatabaseNode : public IConfigDatabaseNode { std::unique_ptr impl; public: - SimpleConfigDatabaseNode(std::string const& fileName); + SimpleConfigDatabaseNode(std::string const& dataFolder); ~SimpleConfigDatabaseNode(); Future serve(ConfigDatabaseInterface&) override; }; diff --git a/fdbserver/SimpleConfigDatabaseNode.actor.cpp b/fdbserver/SimpleConfigDatabaseNode.actor.cpp index dd17f1af65..61cb071b51 100644 --- a/fdbserver/SimpleConfigDatabaseNode.actor.cpp +++ b/fdbserver/SimpleConfigDatabaseNode.actor.cpp @@ -75,8 +75,11 @@ class SimpleConfigDatabaseNodeImpl { for (; index < req.mutations.size(); ++index) { const auto& mutation = req.mutations[index]; if (mutation.type == MutationRef::SetValue) { + self->config[mutation.param1.toString()] = mutation.param2.toString(); self->kvStore->set(KeyValueRef(mutation.param1, mutation.param2)); } else if (mutation.type == MutationRef::ClearRange) { + self->config.erase(self->config.find(mutation.param1.toString()), + self->config.find(mutation.param2.toString())); self->kvStore->clear(KeyRangeRef(mutation.param1, mutation.param2)); } else { ASSERT(false); @@ -89,8 +92,9 @@ class SimpleConfigDatabaseNodeImpl { } ACTOR static Future readKVStoreIntoMemory(SimpleConfigDatabaseNodeImpl* self) { + wait(self->kvStore->init()); state Optional onDiskVersion = wait(self->kvStore->readValue(versionKey)); - if (onDiskVersion.present()) { + if (!onDiskVersion.present()) { // Brand new database self->currentVersion = 0; BinaryWriter wr(IncludeVersion()); @@ -109,7 +113,6 @@ class SimpleConfigDatabaseNodeImpl { return Void(); } -public: ACTOR static Future serve(SimpleConfigDatabaseNodeImpl* self, ConfigDatabaseInterface* cdbi) { wait(readKVStoreIntoMemory(self)); loop { @@ -129,14 +132,17 @@ public: } public: - SimpleConfigDatabaseNodeImpl(std::string const& fileName) - : kvStore(keyValueStoreSQLite(fileName, UID{}, KeyValueStoreType::SSD_BTREE_V2, true, true)) {} + SimpleConfigDatabaseNodeImpl(std::string const& dataFolder) { + platform::createDirectory(dataFolder); + kvStore = + keyValueStoreMemory(joinPath(dataFolder, "globalconf-"), deterministicRandom()->randomUniqueID(), 500e6); + } Future serve(ConfigDatabaseInterface& cdbi) { return serve(this, &cdbi); } }; -SimpleConfigDatabaseNode::SimpleConfigDatabaseNode(std::string const& fileName) - : impl(std::make_unique(fileName)) {} +SimpleConfigDatabaseNode::SimpleConfigDatabaseNode(std::string const& dataFolder) + : impl(std::make_unique(dataFolder)) {} SimpleConfigDatabaseNode::~SimpleConfigDatabaseNode() = default; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 7e8ddbaf79..3caefd4bc1 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -275,6 +275,7 @@ StringRef fileStoragePrefix = LiteralStringRef("storage-"); StringRef fileLogDataPrefix = LiteralStringRef("log-"); StringRef fileVersionedLogDataPrefix = LiteralStringRef("log2-"); StringRef fileLogQueuePrefix = LiteralStringRef("logqueue-"); +StringRef globalConfPrefix = LiteralStringRef("globalconf-"); StringRef tlogQueueExtension = LiteralStringRef("fdq"); enum class FilesystemCheck { diff --git a/fdbserver/workloads/ConfigurationDatabase.actor.cpp b/fdbserver/workloads/ConfigurationDatabase.actor.cpp index 03aae7ea25..321f4df630 100644 --- a/fdbserver/workloads/ConfigurationDatabase.actor.cpp +++ b/fdbserver/workloads/ConfigurationDatabase.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include "fdbclient/IConfigurationDatabase.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" @@ -25,10 +26,33 @@ #include "flow/actorcompiler.h" // has to be last include class ConfigurationDatabaseWorkload : public TestWorkload { + ACTOR static Future start(ConfigurationDatabaseWorkload* self, Database cx) { + state SimpleConfigurationTransaction tr(cx->getConnectionFile()->getConnectionString()); + state Key k = LiteralStringRef("config/x"); + state Key v = LiteralStringRef("x"); + { + Optional currentValue = wait(tr.get(k)); + ASSERT(!currentValue.present()); + } + wait(delay(1)); + { + tr.reset(); + tr.set(k, v); + wait(tr.commit()); + } + wait(delay(1)); + { + tr.reset(); + Optional currentValue = wait(tr.get(k)); + ASSERT(currentValue.get() == v); + } + return Void(); + } + public: ConfigurationDatabaseWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {} Future setup(Database const& cx) override { return Void(); } - Future start(Database const& cx) override { return Void(); } + Future start(Database const& cx) override { return clientId ? Void() : start(this, cx); } Future check(Database const& cx) override { return true; } std::string description() const override { return "ConfigurationDatabase"; } void getMetrics(std::vector& m) override {}