From ca3b7f5ef168d69fd04802baaa38d65e9df0215b Mon Sep 17 00:00:00 2001
From: sfc-gh-tclinkenbeard <trevor.clinkenbeard@snowflake.com>
Date: Thu, 15 Apr 2021 12:44:45 -0700
Subject: [PATCH] Implemented simple ConfigurationDatabase workload, run simple
 configuration database node on coordinators

---
 fdbclient/ConfigDatabaseInterface.h           | 10 ++++---
 fdbclient/IConfigurationDatabase.h            |  2 ++
 .../SimpleConfigurationDatabase.actor.cpp     | 20 ++++++++++----
 fdbserver/Coordination.actor.cpp              |  5 +++-
 fdbserver/IConfigDatabaseNode.h               |  5 ++--
 fdbserver/SimpleConfigDatabaseNode.actor.cpp  | 18 ++++++++-----
 fdbserver/worker.actor.cpp                    |  1 +
 .../workloads/ConfigurationDatabase.actor.cpp | 26 ++++++++++++++++++-
 8 files changed, 68 insertions(+), 19 deletions(-)

diff --git a/fdbclient/ConfigDatabaseInterface.h b/fdbclient/ConfigDatabaseInterface.h
index 5c479d6863..4d1355877b 100644
--- a/fdbclient/ConfigDatabaseInterface.h
+++ b/fdbclient/ConfigDatabaseInterface.h
@@ -79,11 +79,11 @@ struct ConfigDatabaseGetRequest {
 struct ConfigDatabaseCommitRequest {
 	static constexpr FileIdentifier file_identifier = 103841;
 	Version version;
-	VectorRef<MutationRef> mutations;
+	Standalone<VectorRef<MutationRef>> mutations;
 	ReplyPromise<Void> reply;
 
 	ConfigDatabaseCommitRequest() = default;
-	ConfigDatabaseCommitRequest(Version version, VectorRef<MutationRef> mutations)
+	ConfigDatabaseCommitRequest(Version version, Standalone<VectorRef<MutationRef>> mutations)
 	  : version(version), mutations(mutations) {}
 
 	template <class Ar>
@@ -98,7 +98,9 @@ struct ConfigDatabaseInterface {
 	struct RequestStream<ConfigDatabaseGetRequest> get;
 	struct RequestStream<ConfigDatabaseCommitRequest> commit;
 
-	ConfigDatabaseInterface() {
+	ConfigDatabaseInterface() = default;
+
+	void setupWellKnownEndpoints() {
 		getVersion.makeWellKnownEndpoint(WLTOKEN_CONFIGDB_GETVERSION, TaskPriority::Coordination);
 		get.makeWellKnownEndpoint(WLTOKEN_CONFIGDB_GET, TaskPriority::Coordination);
 		commit.makeWellKnownEndpoint(WLTOKEN_CONFIGDB_COMMIT, TaskPriority::Coordination);
@@ -106,7 +108,7 @@ struct ConfigDatabaseInterface {
 
 	ConfigDatabaseInterface(NetworkAddress const& remote)
 	  : getVersion(Endpoint({ remote }, WLTOKEN_CONFIGDB_GETVERSION)), get(Endpoint({ remote }, WLTOKEN_CONFIGDB_GET)),
-	    commit(Endpoint({ remote }, WLTOKEN_CONFIGDB_GET)) {}
+	    commit(Endpoint({ remote }, WLTOKEN_CONFIGDB_COMMIT)) {}
 
 	template <class Ar>
 	void serialize(Ar& ar) {
diff --git a/fdbclient/IConfigurationDatabase.h b/fdbclient/IConfigurationDatabase.h
index f7aa904c58..bea7842285 100644
--- a/fdbclient/IConfigurationDatabase.h
+++ b/fdbclient/IConfigurationDatabase.h
@@ -35,6 +35,7 @@ public:
 	virtual Future<Optional<Value>> get(KeyRef) = 0;
 	virtual Future<Void> commit() = 0;
 	virtual Future<Void> onError(Error const&) = 0;
+	virtual void reset() = 0;
 };
 
 class SimpleConfigurationTransaction : public IConfigurationTransaction {
@@ -48,4 +49,5 @@ public:
 	Future<Optional<Value>> get(KeyRef) override;
 	Future<Void> commit() override;
 	Future<Void> onError(Error const&) override;
+	void reset() override;
 };
diff --git a/fdbclient/SimpleConfigurationDatabase.actor.cpp b/fdbclient/SimpleConfigurationDatabase.actor.cpp
index 2772fff0cb..5dd3afc707 100644
--- a/fdbclient/SimpleConfigurationDatabase.actor.cpp
+++ b/fdbclient/SimpleConfigurationDatabase.actor.cpp
@@ -26,8 +26,7 @@
 #include "flow/actorcompiler.h" // This must be the last #include.
 
 class SimpleConfigurationTransactionImpl {
-	Arena arena;
-	VectorRef<MutationRef> mutations;
+	Standalone<VectorRef<MutationRef>> mutations;
 	Future<Version> version;
 	ConfigDatabaseInterface cdbi;
 
@@ -62,17 +61,24 @@ public:
 		cdbi = ConfigDatabaseInterface(coordinators[0]);
 	}
 
-	void set(KeyRef key, ValueRef value) { mutations.emplace_back(arena, MutationRef::Type::SetValue, key, value); }
+	void set(KeyRef key, ValueRef value) {
+		mutations.emplace_back_deep(mutations.arena(), MutationRef::Type::SetValue, key, value);
+	}
 
 	void clearRange(KeyRef begin, KeyRef end) {
-		mutations.emplace_back(arena, MutationRef::Type::ClearRange, begin, end);
+		mutations.emplace_back_deep(mutations.arena(), MutationRef::Type::ClearRange, begin, end);
 	}
 
 	Future<Optional<Value>> get(KeyRef key) { return get(this, key); }
 
 	Future<Void> commit() { return commit(this); }
 
-	Future<Void> onError(Error const& e) { return Void(); }
+	Future<Void> onError(Error const& e) { throw e; }
+
+	void reset() {
+		version.cancel();
+		mutations = Standalone<VectorRef<MutationRef>>{};
+	}
 };
 
 void SimpleConfigurationTransaction::set(KeyRef key, ValueRef value) {
@@ -95,6 +101,10 @@ Future<Void> SimpleConfigurationTransaction::onError(Error const& e) {
 	return impl->onError(e);
 }
 
+void SimpleConfigurationTransaction::reset() {
+	return impl->reset();
+}
+
 SimpleConfigurationTransaction::SimpleConfigurationTransaction(ClusterConnectionString const& ccs)
   : impl(std::make_unique<SimpleConfigurationTransactionImpl>(ccs)) {}
 
diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp
index ec38714d15..08a4706408 100644
--- a/fdbserver/Coordination.actor.cpp
+++ b/fdbserver/Coordination.actor.cpp
@@ -20,6 +20,7 @@
 
 #include "fdbclient/ConfigDatabaseInterface.h"
 #include "fdbserver/CoordinationInterface.h"
+#include "fdbserver/IConfigDatabaseNode.h"
 #include "fdbserver/IKeyValueStore.h"
 #include "fdbserver/Knobs.h"
 #include "fdbserver/WorkerInterface.actor.h"
@@ -618,6 +619,8 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder) {
 	state GenerationRegInterface myInterface(g_network);
 	state OnDemandStore store(dataFolder, myID);
 	state ConfigDatabaseInterface configDatabaseInterface;
+	configDatabaseInterface.setupWellKnownEndpoints();
+	state Reference<IConfigDatabaseNode> configDatabaseNode = makeReference<SimpleConfigDatabaseNode>(dataFolder);
 
 	TraceEvent("CoordinationServer", myID)
 	    .detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress())
@@ -625,7 +628,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder) {
 
 	try {
 		wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID) ||
-		     store.getError());
+		     store.getError() || configDatabaseNode->serve(configDatabaseInterface));
 		throw internal_error();
 	} catch (Error& e) {
 		TraceEvent("CoordinationServerError", myID).error(e, true);
diff --git a/fdbserver/IConfigDatabaseNode.h b/fdbserver/IConfigDatabaseNode.h
index b799bc79e1..5db473bf99 100644
--- a/fdbserver/IConfigDatabaseNode.h
+++ b/fdbserver/IConfigDatabaseNode.h
@@ -21,11 +21,12 @@
 #pragma once
 
 #include "fdbclient/ConfigDatabaseInterface.h"
+#include "flow/FastRef.h"
 #include "flow/flow.h"
 
 #include <memory>
 
-class IConfigDatabaseNode {
+class IConfigDatabaseNode : public ReferenceCounted<IConfigDatabaseNode> {
 public:
 	virtual Future<Void> serve(ConfigDatabaseInterface&) = 0;
 };
@@ -34,7 +35,7 @@ class SimpleConfigDatabaseNode : public IConfigDatabaseNode {
 	std::unique_ptr<class SimpleConfigDatabaseNodeImpl> impl;
 
 public:
-	SimpleConfigDatabaseNode(std::string const& fileName);
+	SimpleConfigDatabaseNode(std::string const& dataFolder);
 	~SimpleConfigDatabaseNode();
 	Future<Void> serve(ConfigDatabaseInterface&) override;
 };
diff --git a/fdbserver/SimpleConfigDatabaseNode.actor.cpp b/fdbserver/SimpleConfigDatabaseNode.actor.cpp
index dd17f1af65..61cb071b51 100644
--- a/fdbserver/SimpleConfigDatabaseNode.actor.cpp
+++ b/fdbserver/SimpleConfigDatabaseNode.actor.cpp
@@ -75,8 +75,11 @@ class SimpleConfigDatabaseNodeImpl {
 		for (; index < req.mutations.size(); ++index) {
 			const auto& mutation = req.mutations[index];
 			if (mutation.type == MutationRef::SetValue) {
+				self->config[mutation.param1.toString()] = mutation.param2.toString();
 				self->kvStore->set(KeyValueRef(mutation.param1, mutation.param2));
 			} else if (mutation.type == MutationRef::ClearRange) {
+				self->config.erase(self->config.find(mutation.param1.toString()),
+				                   self->config.find(mutation.param2.toString()));
 				self->kvStore->clear(KeyRangeRef(mutation.param1, mutation.param2));
 			} else {
 				ASSERT(false);
@@ -89,8 +92,9 @@ class SimpleConfigDatabaseNodeImpl {
 	}
 
 	ACTOR static Future<Void> readKVStoreIntoMemory(SimpleConfigDatabaseNodeImpl* self) {
+		wait(self->kvStore->init());
 		state Optional<Value> onDiskVersion = wait(self->kvStore->readValue(versionKey));
-		if (onDiskVersion.present()) {
+		if (!onDiskVersion.present()) {
 			// Brand new database
 			self->currentVersion = 0;
 			BinaryWriter wr(IncludeVersion());
@@ -109,7 +113,6 @@ class SimpleConfigDatabaseNodeImpl {
 		return Void();
 	}
 
-public:
 	ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigDatabaseInterface* cdbi) {
 		wait(readKVStoreIntoMemory(self));
 		loop {
@@ -129,14 +132,17 @@ public:
 	}
 
 public:
-	SimpleConfigDatabaseNodeImpl(std::string const& fileName)
-	  : kvStore(keyValueStoreSQLite(fileName, UID{}, KeyValueStoreType::SSD_BTREE_V2, true, true)) {}
+	SimpleConfigDatabaseNodeImpl(std::string const& dataFolder) {
+		platform::createDirectory(dataFolder);
+		kvStore =
+		    keyValueStoreMemory(joinPath(dataFolder, "globalconf-"), deterministicRandom()->randomUniqueID(), 500e6);
+	}
 
 	Future<Void> serve(ConfigDatabaseInterface& cdbi) { return serve(this, &cdbi); }
 };
 
-SimpleConfigDatabaseNode::SimpleConfigDatabaseNode(std::string const& fileName)
-  : impl(std::make_unique<SimpleConfigDatabaseNodeImpl>(fileName)) {}
+SimpleConfigDatabaseNode::SimpleConfigDatabaseNode(std::string const& dataFolder)
+  : impl(std::make_unique<SimpleConfigDatabaseNodeImpl>(dataFolder)) {}
 
 SimpleConfigDatabaseNode::~SimpleConfigDatabaseNode() = default;
 
diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp
index 7e8ddbaf79..3caefd4bc1 100644
--- a/fdbserver/worker.actor.cpp
+++ b/fdbserver/worker.actor.cpp
@@ -275,6 +275,7 @@ StringRef fileStoragePrefix = LiteralStringRef("storage-");
 StringRef fileLogDataPrefix = LiteralStringRef("log-");
 StringRef fileVersionedLogDataPrefix = LiteralStringRef("log2-");
 StringRef fileLogQueuePrefix = LiteralStringRef("logqueue-");
+StringRef globalConfPrefix = LiteralStringRef("globalconf-");
 StringRef tlogQueueExtension = LiteralStringRef("fdq");
 
 enum class FilesystemCheck {
diff --git a/fdbserver/workloads/ConfigurationDatabase.actor.cpp b/fdbserver/workloads/ConfigurationDatabase.actor.cpp
index 03aae7ea25..321f4df630 100644
--- a/fdbserver/workloads/ConfigurationDatabase.actor.cpp
+++ b/fdbserver/workloads/ConfigurationDatabase.actor.cpp
@@ -18,6 +18,7 @@
  * limitations under the License.
  */
 
+#include "fdbclient/IConfigurationDatabase.h"
 #include "fdbclient/NativeAPI.actor.h"
 #include "fdbserver/TesterInterface.actor.h"
 #include "fdbserver/workloads/workloads.actor.h"
@@ -25,10 +26,33 @@
 #include "flow/actorcompiler.h" // has to be last include
 
 class ConfigurationDatabaseWorkload : public TestWorkload {
+	ACTOR static Future<Void> start(ConfigurationDatabaseWorkload* self, Database cx) {
+		state SimpleConfigurationTransaction tr(cx->getConnectionFile()->getConnectionString());
+		state Key k = LiteralStringRef("config/x");
+		state Key v = LiteralStringRef("x");
+		{
+			Optional<Value> currentValue = wait(tr.get(k));
+			ASSERT(!currentValue.present());
+		}
+		wait(delay(1));
+		{
+			tr.reset();
+			tr.set(k, v);
+			wait(tr.commit());
+		}
+		wait(delay(1));
+		{
+			tr.reset();
+			Optional<Value> currentValue = wait(tr.get(k));
+			ASSERT(currentValue.get() == v);
+		}
+		return Void();
+	}
+
 public:
 	ConfigurationDatabaseWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
 	Future<Void> setup(Database const& cx) override { return Void(); }
-	Future<Void> start(Database const& cx) override { return Void(); }
+	Future<Void> start(Database const& cx) override { return clientId ? Void() : start(this, cx); }
 	Future<bool> check(Database const& cx) override { return true; }
 	std::string description() const override { return "ConfigurationDatabase"; }
 	void getMetrics(std::vector<PerfMetric>& m) override {}