Implemented simple ConfigurationDatabase workload, run simple configuration database node on coordinators

This commit is contained in:
sfc-gh-tclinkenbeard 2021-04-15 12:44:45 -07:00
parent 18f17a4ea2
commit ca3b7f5ef1
8 changed files with 68 additions and 19 deletions

View File

@ -79,11 +79,11 @@ struct ConfigDatabaseGetRequest {
struct ConfigDatabaseCommitRequest {
static constexpr FileIdentifier file_identifier = 103841;
Version version;
VectorRef<MutationRef> mutations;
Standalone<VectorRef<MutationRef>> mutations;
ReplyPromise<Void> reply;
ConfigDatabaseCommitRequest() = default;
ConfigDatabaseCommitRequest(Version version, VectorRef<MutationRef> mutations)
ConfigDatabaseCommitRequest(Version version, Standalone<VectorRef<MutationRef>> mutations)
: version(version), mutations(mutations) {}
template <class Ar>
@ -98,7 +98,9 @@ struct ConfigDatabaseInterface {
struct RequestStream<ConfigDatabaseGetRequest> get;
struct RequestStream<ConfigDatabaseCommitRequest> commit;
ConfigDatabaseInterface() {
ConfigDatabaseInterface() = default;
void setupWellKnownEndpoints() {
getVersion.makeWellKnownEndpoint(WLTOKEN_CONFIGDB_GETVERSION, TaskPriority::Coordination);
get.makeWellKnownEndpoint(WLTOKEN_CONFIGDB_GET, TaskPriority::Coordination);
commit.makeWellKnownEndpoint(WLTOKEN_CONFIGDB_COMMIT, TaskPriority::Coordination);
@ -106,7 +108,7 @@ struct ConfigDatabaseInterface {
ConfigDatabaseInterface(NetworkAddress const& remote)
: getVersion(Endpoint({ remote }, WLTOKEN_CONFIGDB_GETVERSION)), get(Endpoint({ remote }, WLTOKEN_CONFIGDB_GET)),
commit(Endpoint({ remote }, WLTOKEN_CONFIGDB_GET)) {}
commit(Endpoint({ remote }, WLTOKEN_CONFIGDB_COMMIT)) {}
template <class Ar>
void serialize(Ar& ar) {

View File

@ -35,6 +35,7 @@ public:
virtual Future<Optional<Value>> get(KeyRef) = 0;
virtual Future<Void> commit() = 0;
virtual Future<Void> onError(Error const&) = 0;
virtual void reset() = 0;
};
class SimpleConfigurationTransaction : public IConfigurationTransaction {
@ -48,4 +49,5 @@ public:
Future<Optional<Value>> get(KeyRef) override;
Future<Void> commit() override;
Future<Void> onError(Error const&) override;
void reset() override;
};

View File

@ -26,8 +26,7 @@
#include "flow/actorcompiler.h" // This must be the last #include.
class SimpleConfigurationTransactionImpl {
Arena arena;
VectorRef<MutationRef> mutations;
Standalone<VectorRef<MutationRef>> mutations;
Future<Version> version;
ConfigDatabaseInterface cdbi;
@ -62,17 +61,24 @@ public:
cdbi = ConfigDatabaseInterface(coordinators[0]);
}
void set(KeyRef key, ValueRef value) { mutations.emplace_back(arena, MutationRef::Type::SetValue, key, value); }
void set(KeyRef key, ValueRef value) {
mutations.emplace_back_deep(mutations.arena(), MutationRef::Type::SetValue, key, value);
}
void clearRange(KeyRef begin, KeyRef end) {
mutations.emplace_back(arena, MutationRef::Type::ClearRange, begin, end);
mutations.emplace_back_deep(mutations.arena(), MutationRef::Type::ClearRange, begin, end);
}
Future<Optional<Value>> get(KeyRef key) { return get(this, key); }
Future<Void> commit() { return commit(this); }
Future<Void> onError(Error const& e) { return Void(); }
Future<Void> onError(Error const& e) { throw e; }
void reset() {
version.cancel();
mutations = Standalone<VectorRef<MutationRef>>{};
}
};
void SimpleConfigurationTransaction::set(KeyRef key, ValueRef value) {
@ -95,6 +101,10 @@ Future<Void> SimpleConfigurationTransaction::onError(Error const& e) {
return impl->onError(e);
}
void SimpleConfigurationTransaction::reset() {
return impl->reset();
}
SimpleConfigurationTransaction::SimpleConfigurationTransaction(ClusterConnectionString const& ccs)
: impl(std::make_unique<SimpleConfigurationTransactionImpl>(ccs)) {}

View File

@ -20,6 +20,7 @@
#include "fdbclient/ConfigDatabaseInterface.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/IConfigDatabaseNode.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -618,6 +619,8 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder) {
state GenerationRegInterface myInterface(g_network);
state OnDemandStore store(dataFolder, myID);
state ConfigDatabaseInterface configDatabaseInterface;
configDatabaseInterface.setupWellKnownEndpoints();
state Reference<IConfigDatabaseNode> configDatabaseNode = makeReference<SimpleConfigDatabaseNode>(dataFolder);
TraceEvent("CoordinationServer", myID)
.detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress())
@ -625,7 +628,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder) {
try {
wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID) ||
store.getError());
store.getError() || configDatabaseNode->serve(configDatabaseInterface));
throw internal_error();
} catch (Error& e) {
TraceEvent("CoordinationServerError", myID).error(e, true);

View File

@ -21,11 +21,12 @@
#pragma once
#include "fdbclient/ConfigDatabaseInterface.h"
#include "flow/FastRef.h"
#include "flow/flow.h"
#include <memory>
class IConfigDatabaseNode {
class IConfigDatabaseNode : public ReferenceCounted<IConfigDatabaseNode> {
public:
virtual Future<Void> serve(ConfigDatabaseInterface&) = 0;
};
@ -34,7 +35,7 @@ class SimpleConfigDatabaseNode : public IConfigDatabaseNode {
std::unique_ptr<class SimpleConfigDatabaseNodeImpl> impl;
public:
SimpleConfigDatabaseNode(std::string const& fileName);
SimpleConfigDatabaseNode(std::string const& dataFolder);
~SimpleConfigDatabaseNode();
Future<Void> serve(ConfigDatabaseInterface&) override;
};

View File

@ -75,8 +75,11 @@ class SimpleConfigDatabaseNodeImpl {
for (; index < req.mutations.size(); ++index) {
const auto& mutation = req.mutations[index];
if (mutation.type == MutationRef::SetValue) {
self->config[mutation.param1.toString()] = mutation.param2.toString();
self->kvStore->set(KeyValueRef(mutation.param1, mutation.param2));
} else if (mutation.type == MutationRef::ClearRange) {
self->config.erase(self->config.find(mutation.param1.toString()),
self->config.find(mutation.param2.toString()));
self->kvStore->clear(KeyRangeRef(mutation.param1, mutation.param2));
} else {
ASSERT(false);
@ -89,8 +92,9 @@ class SimpleConfigDatabaseNodeImpl {
}
ACTOR static Future<Void> readKVStoreIntoMemory(SimpleConfigDatabaseNodeImpl* self) {
wait(self->kvStore->init());
state Optional<Value> onDiskVersion = wait(self->kvStore->readValue(versionKey));
if (onDiskVersion.present()) {
if (!onDiskVersion.present()) {
// Brand new database
self->currentVersion = 0;
BinaryWriter wr(IncludeVersion());
@ -109,7 +113,6 @@ class SimpleConfigDatabaseNodeImpl {
return Void();
}
public:
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigDatabaseInterface* cdbi) {
wait(readKVStoreIntoMemory(self));
loop {
@ -129,14 +132,17 @@ public:
}
public:
SimpleConfigDatabaseNodeImpl(std::string const& fileName)
: kvStore(keyValueStoreSQLite(fileName, UID{}, KeyValueStoreType::SSD_BTREE_V2, true, true)) {}
SimpleConfigDatabaseNodeImpl(std::string const& dataFolder) {
platform::createDirectory(dataFolder);
kvStore =
keyValueStoreMemory(joinPath(dataFolder, "globalconf-"), deterministicRandom()->randomUniqueID(), 500e6);
}
Future<Void> serve(ConfigDatabaseInterface& cdbi) { return serve(this, &cdbi); }
};
SimpleConfigDatabaseNode::SimpleConfigDatabaseNode(std::string const& fileName)
: impl(std::make_unique<SimpleConfigDatabaseNodeImpl>(fileName)) {}
SimpleConfigDatabaseNode::SimpleConfigDatabaseNode(std::string const& dataFolder)
: impl(std::make_unique<SimpleConfigDatabaseNodeImpl>(dataFolder)) {}
SimpleConfigDatabaseNode::~SimpleConfigDatabaseNode() = default;

View File

@ -275,6 +275,7 @@ StringRef fileStoragePrefix = LiteralStringRef("storage-");
StringRef fileLogDataPrefix = LiteralStringRef("log-");
StringRef fileVersionedLogDataPrefix = LiteralStringRef("log2-");
StringRef fileLogQueuePrefix = LiteralStringRef("logqueue-");
StringRef globalConfPrefix = LiteralStringRef("globalconf-");
StringRef tlogQueueExtension = LiteralStringRef("fdq");
enum class FilesystemCheck {

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/IConfigurationDatabase.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -25,10 +26,33 @@
#include "flow/actorcompiler.h" // has to be last include
class ConfigurationDatabaseWorkload : public TestWorkload {
ACTOR static Future<Void> start(ConfigurationDatabaseWorkload* self, Database cx) {
state SimpleConfigurationTransaction tr(cx->getConnectionFile()->getConnectionString());
state Key k = LiteralStringRef("config/x");
state Key v = LiteralStringRef("x");
{
Optional<Value> currentValue = wait(tr.get(k));
ASSERT(!currentValue.present());
}
wait(delay(1));
{
tr.reset();
tr.set(k, v);
wait(tr.commit());
}
wait(delay(1));
{
tr.reset();
Optional<Value> currentValue = wait(tr.get(k));
ASSERT(currentValue.get() == v);
}
return Void();
}
public:
ConfigurationDatabaseWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return clientId ? Void() : start(this, cx); }
Future<bool> check(Database const& cx) override { return true; }
std::string description() const override { return "ConfigurationDatabase"; }
void getMetrics(std::vector<PerfMetric>& m) override {}