mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 02:18:39 +08:00
Create configuration database node files on demand
This commit is contained in:
parent
34681fb052
commit
739d8813dd
@ -70,6 +70,8 @@ set(FDBSERVER_SRCS
|
|||||||
OldTLogServer_4_6.actor.cpp
|
OldTLogServer_4_6.actor.cpp
|
||||||
OldTLogServer_6_0.actor.cpp
|
OldTLogServer_6_0.actor.cpp
|
||||||
OldTLogServer_6_2.actor.cpp
|
OldTLogServer_6_2.actor.cpp
|
||||||
|
OnDemandStore.actor.cpp
|
||||||
|
OnDemandStore.h
|
||||||
Orderer.actor.h
|
Orderer.actor.h
|
||||||
PaxosConfigConsumer.actor.cpp
|
PaxosConfigConsumer.actor.cpp
|
||||||
PaxosConfigConsumer.h
|
PaxosConfigConsumer.h
|
||||||
|
@ -59,18 +59,10 @@ class WriteToTransactionEnvironment {
|
|||||||
ConfigTransactionInterface cti;
|
ConfigTransactionInterface cti;
|
||||||
ConfigFollowerInterface cfi;
|
ConfigFollowerInterface cfi;
|
||||||
Reference<IConfigDatabaseNode> node;
|
Reference<IConfigDatabaseNode> node;
|
||||||
UID nodeID;
|
|
||||||
Future<Void> ctiServer;
|
Future<Void> ctiServer;
|
||||||
Future<Void> cfiServer;
|
Future<Void> cfiServer;
|
||||||
Version lastWrittenVersion{ 0 };
|
Version lastWrittenVersion{ 0 };
|
||||||
|
|
||||||
ACTOR static Future<Void> setupNode(WriteToTransactionEnvironment* self) {
|
|
||||||
wait(self->node->initialize("./", self->nodeID));
|
|
||||||
self->ctiServer = self->node->serve(self->cti);
|
|
||||||
self->cfiServer = self->node->serve(self->cfi);
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR static Future<Void> set(WriteToTransactionEnvironment* self, Optional<KeyRef> configClass, int64_t value) {
|
ACTOR static Future<Void> set(WriteToTransactionEnvironment* self, Optional<KeyRef> configClass, int64_t value) {
|
||||||
state Reference<IConfigTransaction> tr = IConfigTransaction::createSimple(self->cti);
|
state Reference<IConfigTransaction> tr = IConfigTransaction::createSimple(self->cti);
|
||||||
auto configKey = encodeConfigKey(configClass, "test_long"_sr);
|
auto configKey = encodeConfigKey(configClass, "test_long"_sr);
|
||||||
@ -89,11 +81,13 @@ class WriteToTransactionEnvironment {
|
|||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
void setup() {
|
||||||
WriteToTransactionEnvironment()
|
ctiServer = node->serve(cti);
|
||||||
: node(IConfigDatabaseNode::createSimple()), nodeID(deterministicRandom()->randomUniqueID()) {}
|
cfiServer = node->serve(cfi);
|
||||||
|
}
|
||||||
|
|
||||||
Future<Void> setup() { return setupNode(this); }
|
public:
|
||||||
|
WriteToTransactionEnvironment() : node(IConfigDatabaseNode::createSimple("./")) { setup(); }
|
||||||
|
|
||||||
Future<Void> set(Optional<KeyRef> configClass, int64_t value) { return set(this, configClass, value); }
|
Future<Void> set(Optional<KeyRef> configClass, int64_t value) { return set(this, configClass, value); }
|
||||||
|
|
||||||
@ -101,11 +95,11 @@ public:
|
|||||||
|
|
||||||
Future<Void> compact() { return cfi.compact.getReply(ConfigFollowerCompactRequest{ lastWrittenVersion }); }
|
Future<Void> compact() { return cfi.compact.getReply(ConfigFollowerCompactRequest{ lastWrittenVersion }); }
|
||||||
|
|
||||||
Future<Void> restartNode() {
|
void restartNode() {
|
||||||
cfiServer.cancel();
|
cfiServer.cancel();
|
||||||
ctiServer.cancel();
|
ctiServer.cancel();
|
||||||
node = IConfigDatabaseNode::createSimple();
|
node = IConfigDatabaseNode::createSimple("./");
|
||||||
return setupNode(this);
|
setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
ConfigTransactionInterface getTransactionInterface() const { return cti; }
|
ConfigTransactionInterface getTransactionInterface() const { return cti; }
|
||||||
@ -268,10 +262,10 @@ class TransactionEnvironment {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Future<Void> setup() { return writeTo.setup(); }
|
// TODO: Remove this?
|
||||||
|
Future<Void> setup() { return Void(); }
|
||||||
Future<Void> restartNode() { return writeTo.restartNode(); }
|
|
||||||
|
|
||||||
|
void restartNode() { writeTo.restartNode(); }
|
||||||
Future<Void> set(Optional<KeyRef> configClass, int64_t value) { return writeTo.set(configClass, value); }
|
Future<Void> set(Optional<KeyRef> configClass, int64_t value) { return writeTo.set(configClass, value); }
|
||||||
Future<Void> clear(Optional<KeyRef> configClass) { return writeTo.clear(configClass); }
|
Future<Void> clear(Optional<KeyRef> configClass) { return writeTo.clear(configClass); }
|
||||||
Future<Void> check(Optional<KeyRef> configClass, Optional<int64_t> expected) {
|
Future<Void> check(Optional<KeyRef> configClass, Optional<int64_t> expected) {
|
||||||
@ -289,7 +283,6 @@ class TransactionToLocalConfigEnvironment {
|
|||||||
Future<Void> broadcastServer;
|
Future<Void> broadcastServer;
|
||||||
|
|
||||||
ACTOR static Future<Void> setup(TransactionToLocalConfigEnvironment* self) {
|
ACTOR static Future<Void> setup(TransactionToLocalConfigEnvironment* self) {
|
||||||
wait(self->writeTo.setup());
|
|
||||||
wait(self->readFrom.setup());
|
wait(self->readFrom.setup());
|
||||||
self->readFrom.connectToBroadcaster(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(self->cbfi));
|
self->readFrom.connectToBroadcaster(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(self->cbfi));
|
||||||
self->broadcastServer = self->broadcaster.serve(self->cbfi->get());
|
self->broadcastServer = self->broadcaster.serve(self->cbfi->get());
|
||||||
@ -303,7 +296,7 @@ public:
|
|||||||
|
|
||||||
Future<Void> setup() { return setup(this); }
|
Future<Void> setup() { return setup(this); }
|
||||||
|
|
||||||
Future<Void> restartNode() { return writeTo.restartNode(); }
|
void restartNode() { writeTo.restartNode(); }
|
||||||
|
|
||||||
void changeBroadcaster() {
|
void changeBroadcaster() {
|
||||||
broadcastServer.cancel();
|
broadcastServer.cancel();
|
||||||
@ -554,7 +547,7 @@ TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/RestartNode") {
|
|||||||
state TransactionToLocalConfigEnvironment env("class-A");
|
state TransactionToLocalConfigEnvironment env("class-A");
|
||||||
wait(env.setup());
|
wait(env.setup());
|
||||||
wait(set(env, "class-A"_sr, 1));
|
wait(set(env, "class-A"_sr, 1));
|
||||||
wait(env.restartNode());
|
env.restartNode();
|
||||||
wait(check(env, 1));
|
wait(check(env, 1));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
@ -593,7 +586,7 @@ TEST_CASE("/fdbserver/ConfigDB/Transaction/Restart") {
|
|||||||
state TransactionEnvironment env;
|
state TransactionEnvironment env;
|
||||||
wait(env.setup());
|
wait(env.setup());
|
||||||
wait(set(env, "class-A"_sr, 1));
|
wait(set(env, "class-A"_sr, 1));
|
||||||
wait(env.restartNode());
|
env.restartNode();
|
||||||
wait(check(env, "class-A"_sr, 1));
|
wait(check(env, "class-A"_sr, 1));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
#include "fdbserver/IConfigDatabaseNode.h"
|
#include "fdbserver/IConfigDatabaseNode.h"
|
||||||
#include "fdbserver/IKeyValueStore.h"
|
#include "fdbserver/IKeyValueStore.h"
|
||||||
#include "fdbserver/Knobs.h"
|
#include "fdbserver/Knobs.h"
|
||||||
|
#include "fdbserver/OnDemandStore.h"
|
||||||
#include "fdbserver/WorkerInterface.actor.h"
|
#include "fdbserver/WorkerInterface.actor.h"
|
||||||
#include "fdbserver/Status.h"
|
#include "fdbserver/Status.h"
|
||||||
#include "flow/ActorCollection.h"
|
#include "flow/ActorCollection.h"
|
||||||
@ -78,51 +79,6 @@ ServerCoordinators::ServerCoordinators(Reference<ClusterConnectionFile> cf) : Cl
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// The coordination server wants to create its key value store only if it is actually used
|
|
||||||
struct OnDemandStore {
|
|
||||||
public:
|
|
||||||
OnDemandStore(std::string folder, UID myID) : folder(folder), store(nullptr), myID(myID) {}
|
|
||||||
~OnDemandStore() {
|
|
||||||
if (store)
|
|
||||||
store->close();
|
|
||||||
}
|
|
||||||
|
|
||||||
IKeyValueStore* get() {
|
|
||||||
if (!store)
|
|
||||||
open();
|
|
||||||
return store;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool exists() const {
|
|
||||||
if (store)
|
|
||||||
return true;
|
|
||||||
return fileExists(joinPath(folder, "coordination-0.fdq")) ||
|
|
||||||
fileExists(joinPath(folder, "coordination-1.fdq")) || fileExists(joinPath(folder, "coordination.fdb"));
|
|
||||||
}
|
|
||||||
|
|
||||||
IKeyValueStore* operator->() { return get(); }
|
|
||||||
|
|
||||||
Future<Void> getError() const { return onErr(err.getFuture()); }
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::string folder;
|
|
||||||
UID myID;
|
|
||||||
IKeyValueStore* store;
|
|
||||||
Promise<Future<Void>> err;
|
|
||||||
|
|
||||||
ACTOR static Future<Void> onErr(Future<Future<Void>> e) {
|
|
||||||
Future<Void> f = wait(e);
|
|
||||||
wait(f);
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
void open() {
|
|
||||||
platform::createDirectory(folder);
|
|
||||||
store = keyValueStoreMemory(joinPath(folder, "coordination-"), myID, 500e6);
|
|
||||||
err.send(store->getError());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ACTOR Future<Void> localGenerationReg(GenerationRegInterface interf, OnDemandStore* pstore) {
|
ACTOR Future<Void> localGenerationReg(GenerationRegInterface interf, OnDemandStore* pstore) {
|
||||||
state GenerationRegVal v;
|
state GenerationRegVal v;
|
||||||
state OnDemandStore& store = *pstore;
|
state OnDemandStore& store = *pstore;
|
||||||
@ -180,7 +136,8 @@ ACTOR Future<Void> localGenerationReg(GenerationRegInterface interf, OnDemandSto
|
|||||||
TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
|
TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
|
||||||
state GenerationRegInterface reg;
|
state GenerationRegInterface reg;
|
||||||
state OnDemandStore store("simfdb/unittests/", //< FIXME
|
state OnDemandStore store("simfdb/unittests/", //< FIXME
|
||||||
deterministicRandom()->randomUniqueID());
|
deterministicRandom()->randomUniqueID(),
|
||||||
|
"coordination");
|
||||||
state Future<Void> actor = localGenerationReg(reg, &store);
|
state Future<Void> actor = localGenerationReg(reg, &store);
|
||||||
state Key the_key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(0, 10)));
|
state Key the_key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(0, 10)));
|
||||||
|
|
||||||
@ -618,7 +575,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder, Optional<bool> use
|
|||||||
state UID myID = deterministicRandom()->randomUniqueID();
|
state UID myID = deterministicRandom()->randomUniqueID();
|
||||||
state LeaderElectionRegInterface myLeaderInterface(g_network);
|
state LeaderElectionRegInterface myLeaderInterface(g_network);
|
||||||
state GenerationRegInterface myInterface(g_network);
|
state GenerationRegInterface myInterface(g_network);
|
||||||
state OnDemandStore store(dataFolder, myID);
|
state OnDemandStore store(dataFolder, myID, "coordination");
|
||||||
state ConfigTransactionInterface configTransactionInterface;
|
state ConfigTransactionInterface configTransactionInterface;
|
||||||
state ConfigFollowerInterface configFollowerInterface;
|
state ConfigFollowerInterface configFollowerInterface;
|
||||||
state Reference<IConfigDatabaseNode> configDatabaseNode;
|
state Reference<IConfigDatabaseNode> configDatabaseNode;
|
||||||
@ -631,11 +588,10 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder, Optional<bool> use
|
|||||||
configTransactionInterface.setupWellKnownEndpoints();
|
configTransactionInterface.setupWellKnownEndpoints();
|
||||||
configFollowerInterface.setupWellKnownEndpoints();
|
configFollowerInterface.setupWellKnownEndpoints();
|
||||||
if (useTestConfigDB.get()) {
|
if (useTestConfigDB.get()) {
|
||||||
configDatabaseNode = IConfigDatabaseNode::createSimple();
|
configDatabaseNode = IConfigDatabaseNode::createSimple(dataFolder);
|
||||||
} else {
|
} else {
|
||||||
configDatabaseNode = IConfigDatabaseNode::createPaxos();
|
configDatabaseNode = IConfigDatabaseNode::createPaxos(dataFolder);
|
||||||
}
|
}
|
||||||
wait(configDatabaseNode->initialize(dataFolder, UID{}));
|
|
||||||
configDatabaseServer =
|
configDatabaseServer =
|
||||||
configDatabaseNode->serve(configTransactionInterface) || configDatabaseNode->serve(configFollowerInterface);
|
configDatabaseNode->serve(configTransactionInterface) || configDatabaseNode->serve(configFollowerInterface);
|
||||||
}
|
}
|
||||||
|
@ -22,10 +22,10 @@
|
|||||||
#include "fdbserver/PaxosConfigDatabaseNode.h"
|
#include "fdbserver/PaxosConfigDatabaseNode.h"
|
||||||
#include "fdbserver/SimpleConfigDatabaseNode.h"
|
#include "fdbserver/SimpleConfigDatabaseNode.h"
|
||||||
|
|
||||||
Reference<IConfigDatabaseNode> IConfigDatabaseNode::createSimple() {
|
Reference<IConfigDatabaseNode> IConfigDatabaseNode::createSimple(std::string const& folder) {
|
||||||
return makeReference<SimpleConfigDatabaseNode>();
|
return makeReference<SimpleConfigDatabaseNode>(folder);
|
||||||
}
|
}
|
||||||
|
|
||||||
Reference<IConfigDatabaseNode> IConfigDatabaseNode::createPaxos() {
|
Reference<IConfigDatabaseNode> IConfigDatabaseNode::createPaxos(std::string const& folder) {
|
||||||
return makeReference<PaxosConfigDatabaseNode>();
|
return makeReference<PaxosConfigDatabaseNode>(folder);
|
||||||
}
|
}
|
||||||
|
@ -31,8 +31,7 @@ class IConfigDatabaseNode : public ReferenceCounted<IConfigDatabaseNode> {
|
|||||||
public:
|
public:
|
||||||
virtual Future<Void> serve(ConfigTransactionInterface const&) = 0;
|
virtual Future<Void> serve(ConfigTransactionInterface const&) = 0;
|
||||||
virtual Future<Void> serve(ConfigFollowerInterface const&) = 0;
|
virtual Future<Void> serve(ConfigFollowerInterface const&) = 0;
|
||||||
virtual Future<Void> initialize(std::string const& dataFolder, UID id) = 0;
|
|
||||||
|
|
||||||
static Reference<IConfigDatabaseNode> createSimple();
|
static Reference<IConfigDatabaseNode> createSimple(std::string const& folder);
|
||||||
static Reference<IConfigDatabaseNode> createPaxos();
|
static Reference<IConfigDatabaseNode> createPaxos(std::string const& folder);
|
||||||
};
|
};
|
||||||
|
63
fdbserver/OnDemandStore.actor.cpp
Normal file
63
fdbserver/OnDemandStore.actor.cpp
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
/*
|
||||||
|
* OnDemandStore.actor.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "fdbserver/OnDemandStore.h"
|
||||||
|
#include "flow/actorcompiler.h" // must be last include
|
||||||
|
|
||||||
|
ACTOR static Future<Void> onErr(Future<Future<Void>> e) {
|
||||||
|
Future<Void> f = wait(e);
|
||||||
|
wait(f);
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
void OnDemandStore::open() {
|
||||||
|
platform::createDirectory(folder);
|
||||||
|
store = keyValueStoreMemory(joinPath(folder, prefix), myID, 500e6);
|
||||||
|
err.send(store->getError());
|
||||||
|
}
|
||||||
|
|
||||||
|
OnDemandStore::OnDemandStore(std::string const& folder, UID myID, std::string const& prefix)
|
||||||
|
: folder(folder), prefix(prefix), store(nullptr), myID(myID) {}
|
||||||
|
|
||||||
|
OnDemandStore::~OnDemandStore() {
|
||||||
|
if (store) {
|
||||||
|
store->close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
IKeyValueStore* OnDemandStore::get() {
|
||||||
|
if (!store) {
|
||||||
|
open();
|
||||||
|
}
|
||||||
|
return store;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool OnDemandStore::exists() const {
|
||||||
|
return store || fileExists(joinPath(folder, prefix + "-0.fdq")) ||
|
||||||
|
fileExists(joinPath(folder, prefix + "-1.fdq")) || fileExists(joinPath(folder, prefix + ".fdb"));
|
||||||
|
}
|
||||||
|
|
||||||
|
IKeyValueStore* OnDemandStore::operator->() {
|
||||||
|
return get();
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> OnDemandStore::getError() const {
|
||||||
|
return onErr(err.getFuture());
|
||||||
|
}
|
44
fdbserver/OnDemandStore.h
Normal file
44
fdbserver/OnDemandStore.h
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
/*
|
||||||
|
* OnDemandStore.h
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "flow/Arena.h"
|
||||||
|
#include "flow/IRandom.h"
|
||||||
|
#include "flow/Platform.h"
|
||||||
|
#include "fdbserver/IKeyValueStore.h"
|
||||||
|
|
||||||
|
// Create a key value store if and only if it is actually used
|
||||||
|
class OnDemandStore : NonCopyable {
|
||||||
|
std::string folder;
|
||||||
|
UID myID;
|
||||||
|
IKeyValueStore* store;
|
||||||
|
Promise<Future<Void>> err;
|
||||||
|
std::string prefix;
|
||||||
|
void open();
|
||||||
|
|
||||||
|
public:
|
||||||
|
OnDemandStore(std::string const& folder, UID myID, std::string const& prefix);
|
||||||
|
~OnDemandStore();
|
||||||
|
IKeyValueStore* get();
|
||||||
|
bool exists() const;
|
||||||
|
IKeyValueStore* operator->();
|
||||||
|
Future<Void> getError() const;
|
||||||
|
};
|
@ -22,7 +22,10 @@
|
|||||||
|
|
||||||
class PaxosConfigDatabaseNodeImpl {};
|
class PaxosConfigDatabaseNodeImpl {};
|
||||||
|
|
||||||
PaxosConfigDatabaseNode::PaxosConfigDatabaseNode() : impl(std::make_unique<PaxosConfigDatabaseNodeImpl>()) {}
|
PaxosConfigDatabaseNode::PaxosConfigDatabaseNode(std::string const& folder) {
|
||||||
|
// TODO: Implement
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
|
||||||
PaxosConfigDatabaseNode::~PaxosConfigDatabaseNode() = default;
|
PaxosConfigDatabaseNode::~PaxosConfigDatabaseNode() = default;
|
||||||
|
|
||||||
@ -37,9 +40,3 @@ Future<Void> PaxosConfigDatabaseNode::serve(ConfigFollowerInterface const& cfi)
|
|||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> PaxosConfigDatabaseNode::initialize(std::string const& dataFolder, UID id) {
|
|
||||||
// TODO: Implement
|
|
||||||
ASSERT(false);
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
@ -26,9 +26,8 @@ class PaxosConfigDatabaseNode : public IConfigDatabaseNode {
|
|||||||
std::unique_ptr<class PaxosConfigDatabaseNodeImpl> impl;
|
std::unique_ptr<class PaxosConfigDatabaseNodeImpl> impl;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
PaxosConfigDatabaseNode();
|
PaxosConfigDatabaseNode(std::string const& folder);
|
||||||
~PaxosConfigDatabaseNode();
|
~PaxosConfigDatabaseNode();
|
||||||
Future<Void> serve(ConfigTransactionInterface const&) override;
|
Future<Void> serve(ConfigTransactionInterface const&) override;
|
||||||
Future<Void> serve(ConfigFollowerInterface const&) override;
|
Future<Void> serve(ConfigFollowerInterface const&) override;
|
||||||
Future<Void> initialize(std::string const& dataFolder, UID id) override;
|
|
||||||
};
|
};
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
|
|
||||||
#include "fdbserver/SimpleConfigDatabaseNode.h"
|
#include "fdbserver/SimpleConfigDatabaseNode.h"
|
||||||
#include "fdbserver/IKeyValueStore.h"
|
#include "fdbserver/IKeyValueStore.h"
|
||||||
|
#include "fdbserver/OnDemandStore.h"
|
||||||
#include "flow/Arena.h"
|
#include "flow/Arena.h"
|
||||||
#include "flow/genericactors.actor.h"
|
#include "flow/genericactors.actor.h"
|
||||||
#include "flow/UnitTest.h"
|
#include "flow/UnitTest.h"
|
||||||
@ -82,11 +83,11 @@ TEST_CASE("/fdbserver/ConfigDB/SimpleConfigDatabaseNode/Internal/versionedMutati
|
|||||||
}
|
}
|
||||||
|
|
||||||
class SimpleConfigDatabaseNodeImpl {
|
class SimpleConfigDatabaseNodeImpl {
|
||||||
IKeyValueStore* kvStore; // FIXME: Prevent leak
|
|
||||||
std::map<std::string, std::string> config;
|
|
||||||
Future<Void> initFuture;
|
|
||||||
|
|
||||||
UID id;
|
UID id;
|
||||||
|
// TODO: Listen for errors
|
||||||
|
OnDemandStore kvStore;
|
||||||
|
std::map<std::string, std::string> config;
|
||||||
|
|
||||||
CounterCollection cc;
|
CounterCollection cc;
|
||||||
|
|
||||||
// Follower counters
|
// Follower counters
|
||||||
@ -282,7 +283,6 @@ class SimpleConfigDatabaseNodeImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionInterface const* cti) {
|
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionInterface const* cti) {
|
||||||
ASSERT(self->initFuture.isValid() && self->initFuture.isReady());
|
|
||||||
loop {
|
loop {
|
||||||
//wait(traceQueuedMutations(self));
|
//wait(traceQueuedMutations(self));
|
||||||
choose {
|
choose {
|
||||||
@ -367,7 +367,6 @@ class SimpleConfigDatabaseNodeImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerInterface const* cfi) {
|
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerInterface const* cfi) {
|
||||||
ASSERT(self->initFuture.isValid() && self->initFuture.isReady());
|
|
||||||
loop {
|
loop {
|
||||||
choose {
|
choose {
|
||||||
when(ConfigFollowerGetSnapshotAndChangesRequest req =
|
when(ConfigFollowerGetSnapshotAndChangesRequest req =
|
||||||
@ -387,35 +386,24 @@ class SimpleConfigDatabaseNodeImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
SimpleConfigDatabaseNodeImpl()
|
SimpleConfigDatabaseNodeImpl(std::string const& folder)
|
||||||
: cc("ConfigDatabaseNode"), compactRequests("CompactRequests", cc),
|
: id(deterministicRandom()->randomUniqueID()), kvStore(folder, id, "global-conf"), cc("ConfigDatabaseNode"),
|
||||||
successfulChangeRequests("SuccessfulChangeRequests", cc), failedChangeRequests("FailedChangeRequests", cc),
|
compactRequests("CompactRequests", cc), successfulChangeRequests("SuccessfulChangeRequests", cc),
|
||||||
snapshotRequests("SnapshotRequests", cc), successfulCommits("SuccessfulCommits", cc),
|
failedChangeRequests("FailedChangeRequests", cc), snapshotRequests("SnapshotRequests", cc),
|
||||||
failedCommits("FailedCommits", cc), setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
|
successfulCommits("SuccessfulCommits", cc), failedCommits("FailedCommits", cc),
|
||||||
getValueRequests("GetValueRequests", cc), newVersionRequests("NewVersionRequests", cc) {}
|
setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
|
||||||
|
getValueRequests("GetValueRequests", cc), newVersionRequests("NewVersionRequests", cc) {
|
||||||
~SimpleConfigDatabaseNodeImpl() {
|
logger = traceCounters(
|
||||||
if (kvStore) {
|
"ConfigDatabaseNodeMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigDatabaseNode");
|
||||||
kvStore->close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> serve(ConfigTransactionInterface const& cti) { return serve(this, &cti); }
|
Future<Void> serve(ConfigTransactionInterface const& cti) { return serve(this, &cti); }
|
||||||
|
|
||||||
Future<Void> serve(ConfigFollowerInterface const& cfi) { return serve(this, &cfi); }
|
Future<Void> serve(ConfigFollowerInterface const& cfi) { return serve(this, &cfi); }
|
||||||
|
|
||||||
Future<Void> initialize(std::string const& dataFolder, UID id) {
|
|
||||||
platform::createDirectory(dataFolder);
|
|
||||||
this->id = id;
|
|
||||||
kvStore = keyValueStoreMemory(joinPath(dataFolder, "globalconf-" + id.toString()), id, 500e6);
|
|
||||||
logger = traceCounters(
|
|
||||||
"ConfigDatabaseNodeMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigDatabaseNode");
|
|
||||||
initFuture = kvStore->init();
|
|
||||||
return initFuture;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
SimpleConfigDatabaseNode::SimpleConfigDatabaseNode() : impl(std::make_unique<SimpleConfigDatabaseNodeImpl>()) {}
|
SimpleConfigDatabaseNode::SimpleConfigDatabaseNode(std::string const& folder)
|
||||||
|
: impl(std::make_unique<SimpleConfigDatabaseNodeImpl>(folder)) {}
|
||||||
|
|
||||||
SimpleConfigDatabaseNode::~SimpleConfigDatabaseNode() = default;
|
SimpleConfigDatabaseNode::~SimpleConfigDatabaseNode() = default;
|
||||||
|
|
||||||
@ -426,7 +414,3 @@ Future<Void> SimpleConfigDatabaseNode::serve(ConfigTransactionInterface const& c
|
|||||||
Future<Void> SimpleConfigDatabaseNode::serve(ConfigFollowerInterface const& cfi) {
|
Future<Void> SimpleConfigDatabaseNode::serve(ConfigFollowerInterface const& cfi) {
|
||||||
return impl->serve(cfi);
|
return impl->serve(cfi);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> SimpleConfigDatabaseNode::initialize(std::string const& dataFolder, UID id) {
|
|
||||||
return impl->initialize(dataFolder, id);
|
|
||||||
}
|
|
||||||
|
@ -26,9 +26,8 @@ class SimpleConfigDatabaseNode : public IConfigDatabaseNode {
|
|||||||
std::unique_ptr<class SimpleConfigDatabaseNodeImpl> impl;
|
std::unique_ptr<class SimpleConfigDatabaseNodeImpl> impl;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
SimpleConfigDatabaseNode();
|
SimpleConfigDatabaseNode(std::string const& folder);
|
||||||
~SimpleConfigDatabaseNode();
|
~SimpleConfigDatabaseNode();
|
||||||
Future<Void> serve(ConfigTransactionInterface const&) override;
|
Future<Void> serve(ConfigTransactionInterface const&) override;
|
||||||
Future<Void> serve(ConfigFollowerInterface const&) override;
|
Future<Void> serve(ConfigFollowerInterface const&) override;
|
||||||
Future<Void> initialize(std::string const& dataFolder, UID id) override;
|
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user