Merge pull request #5222 from sfc-gh-tclinkenbeard/paxos-config-db

Simple (non-fault tolerant) implementation of replicated configuration database
This commit is contained in:
Trevor Clinkenbeard 2021-08-02 10:03:24 -07:00 committed by GitHub
commit 5e1639ad32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 712 additions and 539 deletions

View File

@ -20,12 +20,13 @@
#include "fdbclient/ConfigTransactionInterface.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/SystemData.h"
#include "flow/IRandom.h"
ConfigTransactionInterface::ConfigTransactionInterface() : _id(deterministicRandom()->randomUniqueID()) {}
void ConfigTransactionInterface::setupWellKnownEndpoints() {
getVersion.makeWellKnownEndpoint(WLTOKEN_CONFIGTXN_GETVERSION, TaskPriority::Coordination);
getGeneration.makeWellKnownEndpoint(WLTOKEN_CONFIGTXN_GETGENERATION, TaskPriority::Coordination);
get.makeWellKnownEndpoint(WLTOKEN_CONFIGTXN_GET, TaskPriority::Coordination);
getClasses.makeWellKnownEndpoint(WLTOKEN_CONFIGTXN_GETCLASSES, TaskPriority::Coordination);
getKnobs.makeWellKnownEndpoint(WLTOKEN_CONFIGTXN_GETKNOBS, TaskPriority::Coordination);
@ -33,8 +34,8 @@ void ConfigTransactionInterface::setupWellKnownEndpoints() {
}
ConfigTransactionInterface::ConfigTransactionInterface(NetworkAddress const& remote)
: getVersion(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GETVERSION)), get(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GET)),
getClasses(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GETCLASSES)),
: getGeneration(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GETGENERATION)),
get(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GET)), getClasses(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GETCLASSES)),
getKnobs(Endpoint({ remote }, WLTOKEN_CONFIGTXN_GETKNOBS)), commit(Endpoint({ remote }, WLTOKEN_CONFIGTXN_COMMIT)) {
}
@ -45,3 +46,30 @@ bool ConfigTransactionInterface::operator==(ConfigTransactionInterface const& rh
bool ConfigTransactionInterface::operator!=(ConfigTransactionInterface const& rhs) const {
return !(*this == rhs);
}
bool ConfigGeneration::operator==(ConfigGeneration const& rhs) const {
return liveVersion == rhs.liveVersion && committedVersion == rhs.committedVersion;
}
bool ConfigGeneration::operator!=(ConfigGeneration const& rhs) const {
return !(*this == rhs);
}
void ConfigTransactionCommitRequest::set(KeyRef key, ValueRef value) {
if (key == configTransactionDescriptionKey) {
annotation.description = KeyRef(arena, value);
} else {
ConfigKey configKey = ConfigKeyRef::decodeKey(key);
auto knobValue = IKnobCollection::parseKnobValue(
configKey.knobName.toString(), value.toString(), IKnobCollection::Type::TEST);
mutations.emplace_back_deep(arena, configKey, knobValue.contents());
}
}
void ConfigTransactionCommitRequest::clear(KeyRef key) {
if (key == configTransactionDescriptionKey) {
annotation.description = ""_sr;
} else {
mutations.emplace_back_deep(arena, ConfigKeyRef::decodeKey(key), Optional<KnobValueRef>{});
}
}

View File

@ -27,22 +27,38 @@
#include "fdbrpc/fdbrpc.h"
#include "flow/flow.h"
struct ConfigTransactionGetVersionReply {
static constexpr FileIdentifier file_identifier = 2934851;
ConfigTransactionGetVersionReply() = default;
explicit ConfigTransactionGetVersionReply(Version version) : version(version) {}
Version version;
struct ConfigGeneration {
// The live version of each node is monotonically increasing
Version liveVersion{ 0 };
// The committedVersion of each node is the version of the last commit made durable.
// Each committedVersion was previously given to clients as a liveVersion, prior to commit.
Version committedVersion{ 0 };
bool operator==(ConfigGeneration const&) const;
bool operator!=(ConfigGeneration const&) const;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version);
serializer(ar, liveVersion, committedVersion);
}
};
struct ConfigTransactionGetVersionRequest {
struct ConfigTransactionGetGenerationReply {
static constexpr FileIdentifier file_identifier = 2934851;
ConfigTransactionGetGenerationReply() = default;
explicit ConfigTransactionGetGenerationReply(ConfigGeneration generation) : generation(generation) {}
ConfigGeneration generation;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, generation);
}
};
struct ConfigTransactionGetGenerationRequest {
static constexpr FileIdentifier file_identifier = 138941;
ReplyPromise<ConfigTransactionGetVersionReply> reply;
ConfigTransactionGetVersionRequest() = default;
ReplyPromise<ConfigTransactionGetGenerationReply> reply;
ConfigTransactionGetGenerationRequest() = default;
template <class Ar>
void serialize(Ar& ar) {
@ -64,45 +80,36 @@ struct ConfigTransactionGetReply {
struct ConfigTransactionGetRequest {
static constexpr FileIdentifier file_identifier = 923040;
Version version;
ConfigGeneration generation;
ConfigKey key;
ReplyPromise<ConfigTransactionGetReply> reply;
ConfigTransactionGetRequest() = default;
explicit ConfigTransactionGetRequest(Version version, ConfigKey key) : version(version), key(key) {}
explicit ConfigTransactionGetRequest(ConfigGeneration generation, ConfigKey key)
: generation(generation), key(key) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, key, reply);
serializer(ar, generation, key, reply);
}
};
struct ConfigTransactionCommitRequest {
static constexpr FileIdentifier file_identifier = 103841;
Arena arena;
Version version{ ::invalidVersion };
ConfigGeneration generation{ ::invalidVersion, ::invalidVersion };
VectorRef<ConfigMutationRef> mutations;
ConfigCommitAnnotationRef annotation;
ReplyPromise<Void> reply;
size_t expectedSize() const { return mutations.expectedSize() + annotation.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, version, mutations, annotation, reply);
}
};
struct ConfigTransactionGetRangeReply {
static constexpr FileIdentifier file_identifier = 430263;
Standalone<RangeResultRef> range;
ConfigTransactionGetRangeReply() = default;
explicit ConfigTransactionGetRangeReply(Standalone<RangeResultRef> range) : range(range) {}
void set(KeyRef key, ValueRef value);
void clear(KeyRef key);
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, range);
serializer(ar, arena, generation, mutations, annotation, reply);
}
};
@ -122,15 +129,15 @@ struct ConfigTransactionGetConfigClassesReply {
struct ConfigTransactionGetConfigClassesRequest {
static constexpr FileIdentifier file_identifier = 7163400;
Version version;
ConfigGeneration generation;
ReplyPromise<ConfigTransactionGetConfigClassesReply> reply;
ConfigTransactionGetConfigClassesRequest() = default;
explicit ConfigTransactionGetConfigClassesRequest(Version version) : version(version) {}
explicit ConfigTransactionGetConfigClassesRequest(ConfigGeneration generation) : generation(generation) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version);
serializer(ar, generation);
}
};
@ -149,17 +156,17 @@ struct ConfigTransactionGetKnobsReply {
struct ConfigTransactionGetKnobsRequest {
static constexpr FileIdentifier file_identifier = 987410;
Version version;
ConfigGeneration generation;
Optional<Key> configClass;
ReplyPromise<ConfigTransactionGetKnobsReply> reply;
ConfigTransactionGetKnobsRequest() = default;
explicit ConfigTransactionGetKnobsRequest(Version version, Optional<Key> configClass)
: version(version), configClass(configClass) {}
explicit ConfigTransactionGetKnobsRequest(ConfigGeneration generation, Optional<Key> configClass)
: generation(generation), configClass(configClass) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, configClass, reply);
serializer(ar, generation, configClass, reply);
}
};
@ -172,7 +179,7 @@ struct ConfigTransactionInterface {
public:
static constexpr FileIdentifier file_identifier = 982485;
struct RequestStream<ConfigTransactionGetVersionRequest> getVersion;
struct RequestStream<ConfigTransactionGetGenerationRequest> getGeneration;
struct RequestStream<ConfigTransactionGetRequest> get;
struct RequestStream<ConfigTransactionGetConfigClassesRequest> getClasses;
struct RequestStream<ConfigTransactionGetKnobsRequest> getKnobs;
@ -188,6 +195,6 @@ public:
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, getVersion, get, getClasses, getKnobs, commit);
serializer(ar, getGeneration, get, getClasses, getKnobs, commit);
}
};

View File

@ -38,7 +38,7 @@ constexpr UID WLTOKEN_CLIENTLEADERREG_OPENDATABASE(-1, 3);
constexpr UID WLTOKEN_PROTOCOL_INFO(-1, 10);
constexpr UID WLTOKEN_CLIENTLEADERREG_DESCRIPTOR_MUTABLE(-1, 11);
constexpr UID WLTOKEN_CONFIGTXN_GETVERSION(-1, 12);
constexpr UID WLTOKEN_CONFIGTXN_GETGENERATION(-1, 12);
constexpr UID WLTOKEN_CONFIGTXN_GET(-1, 13);
constexpr UID WLTOKEN_CONFIGTXN_GETCLASSES(-1, 14);
constexpr UID WLTOKEN_CONFIGTXN_GETKNOBS(-1, 15);

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <vector>
#include "fdbclient/IConfigTransaction.h"
#include "fdbclient/SimpleConfigTransaction.h"
#include "fdbclient/PaxosConfigTransaction.h"
@ -25,3 +27,7 @@
Reference<IConfigTransaction> IConfigTransaction::createTestSimple(ConfigTransactionInterface const& cti) {
return makeReference<SimpleConfigTransaction>(cti);
}
Reference<IConfigTransaction> IConfigTransaction::createTestPaxos(std::vector<ConfigTransactionInterface> const& ctis) {
return makeReference<PaxosConfigTransaction>(ctis);
}

View File

@ -40,6 +40,7 @@ public:
virtual ~IConfigTransaction() = default;
static Reference<IConfigTransaction> createTestSimple(ConfigTransactionInterface const&);
static Reference<IConfigTransaction> createTestPaxos(std::vector<ConfigTransactionInterface> const&);
// Not implemented:
void setVersion(Version) override { throw client_invalid_operation(); }

View File

@ -52,16 +52,16 @@ public:
virtual Optional<Version> getCachedReadVersion() const = 0;
virtual Future<Optional<Value>> get(const Key& key, Snapshot = Snapshot::False) = 0;
virtual Future<Key> getKey(const KeySelector& key, Snapshot = Snapshot::False) = 0;
virtual Future<Standalone<RangeResultRef>> getRange(const KeySelector& begin,
const KeySelector& end,
int limit,
Snapshot = Snapshot::False,
Reverse = Reverse::False) = 0;
virtual Future<Standalone<RangeResultRef>> getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot = Snapshot::False,
Reverse = Reverse::False) = 0;
virtual Future<RangeResult> getRange(const KeySelector& begin,
const KeySelector& end,
int limit,
Snapshot = Snapshot::False,
Reverse = Reverse::False) = 0;
virtual Future<RangeResult> getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot = Snapshot::False,
Reverse = Reverse::False) = 0;
virtual Future<Standalone<VectorRef<const char*>>> getAddressesForKey(Key const& key) = 0;
virtual Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(KeyRange const& range, int64_t chunkSize) = 0;
virtual Future<int64_t> getEstimatedRangeSizeBytes(KeyRange const& keys) = 0;

View File

@ -18,118 +18,269 @@
* limitations under the License.
*/
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/PaxosConfigTransaction.h"
#include "flow/actorcompiler.h" // must be last include
class PaxosConfigTransactionImpl {};
class PaxosConfigTransactionImpl {
ConfigTransactionCommitRequest toCommit;
Future<ConfigGeneration> getGenerationFuture;
std::vector<ConfigTransactionInterface> ctis;
int numRetries{ 0 };
bool committed{ false };
Optional<UID> dID;
Database cx;
ACTOR static Future<ConfigGeneration> getGeneration(PaxosConfigTransactionImpl* self) {
state std::vector<Future<ConfigTransactionGetGenerationReply>> getGenerationFutures;
getGenerationFutures.reserve(self->ctis.size());
for (auto const& cti : self->ctis) {
getGenerationFutures.push_back(cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{}));
}
// FIXME: Must tolerate failures and disagreement
wait(waitForAll(getGenerationFutures));
return getGenerationFutures[0].get().generation;
}
ACTOR static Future<Optional<Value>> get(PaxosConfigTransactionImpl* self, Key key) {
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
state ConfigKey configKey = ConfigKey::decodeKey(key);
ConfigGeneration generation = wait(self->getGenerationFuture);
// TODO: Load balance
ConfigTransactionGetReply reply =
wait(self->ctis[0].get.getReply(ConfigTransactionGetRequest{ generation, configKey }));
if (reply.value.present()) {
return reply.value.get().toValue();
} else {
return Optional<Value>{};
}
}
ACTOR static Future<RangeResult> getConfigClasses(PaxosConfigTransactionImpl* self) {
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
ConfigGeneration generation = wait(self->getGenerationFuture);
// TODO: Load balance
ConfigTransactionGetConfigClassesReply reply =
wait(self->ctis[0].getClasses.getReply(ConfigTransactionGetConfigClassesRequest{ generation }));
RangeResult result;
result.reserve(result.arena(), reply.configClasses.size());
for (const auto& configClass : reply.configClasses) {
result.push_back_deep(result.arena(), KeyValueRef(configClass, ""_sr));
}
return result;
}
ACTOR static Future<RangeResult> getKnobs(PaxosConfigTransactionImpl* self, Optional<Key> configClass) {
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
ConfigGeneration generation = wait(self->getGenerationFuture);
// TODO: Load balance
ConfigTransactionGetKnobsReply reply =
wait(self->ctis[0].getKnobs.getReply(ConfigTransactionGetKnobsRequest{ generation, configClass }));
RangeResult result;
result.reserve(result.arena(), reply.knobNames.size());
for (const auto& knobName : reply.knobNames) {
result.push_back_deep(result.arena(), KeyValueRef(knobName, ""_sr));
}
return result;
}
ACTOR static Future<Void> commit(PaxosConfigTransactionImpl* self) {
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
wait(store(self->toCommit.generation, self->getGenerationFuture));
self->toCommit.annotation.timestamp = now();
std::vector<Future<Void>> commitFutures;
commitFutures.reserve(self->ctis.size());
for (const auto& cti : self->ctis) {
commitFutures.push_back(cti.commit.getReply(self->toCommit));
}
// FIXME: Must tolerate failures and disagreement
wait(quorum(commitFutures, commitFutures.size() / 2 + 1));
self->committed = true;
return Void();
}
public:
Future<Version> getReadVersion() {
if (!getGenerationFuture.isValid()) {
getGenerationFuture = getGeneration(this);
}
return map(getGenerationFuture, [](auto const& gen) { return gen.committedVersion; });
}
Optional<Version> getCachedReadVersion() const {
if (getGenerationFuture.isValid() && getGenerationFuture.isReady() && !getGenerationFuture.isError()) {
return getGenerationFuture.get().committedVersion;
} else {
return {};
}
}
Version getCommittedVersion() const { return committed ? getGenerationFuture.get().liveVersion : ::invalidVersion; }
int64_t getApproximateSize() const { return toCommit.expectedSize(); }
void set(KeyRef key, ValueRef value) { toCommit.set(key, value); }
void clear(KeyRef key) { toCommit.clear(key); }
Future<Optional<Value>> get(Key const& key) { return get(this, key); }
Future<RangeResult> getRange(KeyRangeRef keys) {
if (keys == configClassKeys) {
return getConfigClasses(this);
} else if (keys == globalConfigKnobKeys) {
return getKnobs(this, {});
} else if (configKnobKeys.contains(keys) && keys.singleKeyRange()) {
const auto configClass = keys.begin.removePrefix(configKnobKeys.begin);
return getKnobs(this, configClass);
} else {
throw invalid_config_db_range_read();
}
}
Future<Void> onError(Error const& e) {
// TODO: Improve this:
if (e.code() == error_code_transaction_too_old) {
reset();
return delay((1 << numRetries++) * 0.01 * deterministicRandom()->random01());
}
throw e;
}
void debugTransaction(UID dID) { this->dID = dID; }
void reset() {
getGenerationFuture = Future<ConfigGeneration>{};
toCommit = {};
committed = false;
}
void fullReset() {
numRetries = 0;
dID = {};
reset();
}
void checkDeferredError(Error const& deferredError) const {
if (deferredError.code() != invalid_error_code) {
throw deferredError;
}
if (cx.getPtr()) {
cx->checkDeferredError();
}
}
Future<Void> commit() { return commit(this); }
PaxosConfigTransactionImpl(Database const& cx) : cx(cx) {
auto coordinators = cx->getConnectionFile()->getConnectionString().coordinators();
ctis.reserve(coordinators.size());
for (const auto& coordinator : coordinators) {
ctis.emplace_back(coordinator);
}
}
PaxosConfigTransactionImpl(std::vector<ConfigTransactionInterface> const& ctis) : ctis(ctis) {}
};
Future<Version> PaxosConfigTransaction::getReadVersion() {
// TODO: Implement
return ::invalidVersion;
return impl().getReadVersion();
}
Optional<Version> PaxosConfigTransaction::getCachedReadVersion() const {
// TODO: Implement
return ::invalidVersion;
return impl().getCachedReadVersion();
}
Future<Optional<Value>> PaxosConfigTransaction::get(Key const& key, Snapshot snapshot) {
// TODO: Implement
return Optional<Value>{};
Future<Optional<Value>> PaxosConfigTransaction::get(Key const& key, Snapshot) {
return impl().get(key);
}
Future<Standalone<RangeResultRef>> PaxosConfigTransaction::getRange(KeySelector const& begin,
KeySelector const& end,
int limit,
Snapshot snapshot,
Reverse reverse) {
// TODO: Implement
ASSERT(false);
return Standalone<RangeResultRef>{};
Future<RangeResult> PaxosConfigTransaction::getRange(KeySelector const& begin,
KeySelector const& end,
int limit,
Snapshot snapshot,
Reverse reverse) {
if (reverse) {
throw client_invalid_operation();
}
return impl().getRange(KeyRangeRef(begin.getKey(), end.getKey()));
}
Future<Standalone<RangeResultRef>> PaxosConfigTransaction::getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot snapshot,
Reverse reverse) {
// TODO: Implement
ASSERT(false);
return Standalone<RangeResultRef>{};
Future<RangeResult> PaxosConfigTransaction::getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot snapshot,
Reverse reverse) {
if (reverse) {
throw client_invalid_operation();
}
return impl().getRange(KeyRangeRef(begin.getKey(), end.getKey()));
}
void PaxosConfigTransaction::set(KeyRef const& key, ValueRef const& value) {
// TODO: Implememnt
ASSERT(false);
return impl().set(key, value);
}
void PaxosConfigTransaction::clear(KeyRef const& key) {
// TODO: Implememnt
ASSERT(false);
return impl().clear(key);
}
Future<Void> PaxosConfigTransaction::commit() {
// TODO: Implememnt
ASSERT(false);
return Void();
return impl().commit();
}
Version PaxosConfigTransaction::getCommittedVersion() const {
// TODO: Implement
ASSERT(false);
return ::invalidVersion;
return impl().getCommittedVersion();
}
int64_t PaxosConfigTransaction::getApproximateSize() const {
// TODO: Implement
ASSERT(false);
return 0;
return impl().getApproximateSize();
}
void PaxosConfigTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
// TODO: Implement
ASSERT(false);
// TODO: Support using this option to determine atomicity
}
Future<Void> PaxosConfigTransaction::onError(Error const& e) {
// TODO: Implement
ASSERT(false);
return Void();
return impl().onError(e);
}
void PaxosConfigTransaction::cancel() {
// TODO: Implement
ASSERT(false);
// TODO: Implement someday
throw client_invalid_operation();
}
void PaxosConfigTransaction::reset() {
// TODO: Implement
ASSERT(false);
impl().reset();
}
void PaxosConfigTransaction::fullReset() {
// TODO: Implement
ASSERT(false);
impl().fullReset();
}
void PaxosConfigTransaction::debugTransaction(UID dID) {
// TODO: Implement
ASSERT(false);
impl().debugTransaction(dID);
}
void PaxosConfigTransaction::checkDeferredError() const {
// TODO: Implement
ASSERT(false);
impl().checkDeferredError(deferredError);
}
PaxosConfigTransaction::PaxosConfigTransaction() {
// TODO: Implement
ASSERT(false);
}
PaxosConfigTransaction::PaxosConfigTransaction(std::vector<ConfigTransactionInterface> const& ctis)
: _impl(std::make_unique<PaxosConfigTransactionImpl>(ctis)) {}
PaxosConfigTransaction::PaxosConfigTransaction() = default;
PaxosConfigTransaction::~PaxosConfigTransaction() = default;
void PaxosConfigTransaction::setDatabase(Database const& cx) {
// TODO: Implement
ASSERT(false);
_impl = std::make_unique<PaxosConfigTransactionImpl>(cx);
}

View File

@ -33,6 +33,7 @@ class PaxosConfigTransaction final : public IConfigTransaction, public FastAlloc
PaxosConfigTransactionImpl& impl() { return *_impl; }
public:
PaxosConfigTransaction(std::vector<ConfigTransactionInterface> const&);
PaxosConfigTransaction();
~PaxosConfigTransaction();
void setDatabase(Database const&) override;
@ -40,16 +41,16 @@ public:
Optional<Version> getCachedReadVersion() const override;
Future<Optional<Value>> get(Key const& key, Snapshot = Snapshot::False) override;
Future<Standalone<RangeResultRef>> getRange(KeySelector const& begin,
KeySelector const& end,
int limit,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<Standalone<RangeResultRef>> getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<RangeResult> getRange(KeySelector const& begin,
KeySelector const& end,
int limit,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<RangeResult> getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
void set(KeyRef const& key, ValueRef const& value) override;
void clear(KeyRangeRef const&) override { throw client_invalid_operation(); }
void clear(KeyRef const&) override;

View File

@ -74,20 +74,20 @@ public:
Optional<Version> getCachedReadVersion() const override { return tr.getCachedReadVersion(); }
Future<Optional<Value>> get(const Key& key, Snapshot = Snapshot::False) override;
Future<Key> getKey(const KeySelector& key, Snapshot = Snapshot::False) override;
Future<Standalone<RangeResultRef>> getRange(const KeySelector& begin,
const KeySelector& end,
int limit,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<Standalone<RangeResultRef>> getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<Standalone<RangeResultRef>> getRange(const KeyRange& keys,
int limit,
Snapshot snapshot = Snapshot::False,
Reverse reverse = Reverse::False) {
Future<RangeResult> getRange(const KeySelector& begin,
const KeySelector& end,
int limit,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<RangeResult> getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<RangeResult> getRange(const KeyRange& keys,
int limit,
Snapshot snapshot = Snapshot::False,
Reverse reverse = Reverse::False) {
return getRange(KeySelector(firstGreaterOrEqual(keys.begin), keys.arena()),
KeySelector(firstGreaterOrEqual(keys.end), keys.arena()),
limit,

View File

@ -30,39 +30,40 @@
class SimpleConfigTransactionImpl {
ConfigTransactionCommitRequest toCommit;
Future<Version> getVersionFuture;
Future<ConfigGeneration> getGenerationFuture;
ConfigTransactionInterface cti;
int numRetries{ 0 };
bool committed{ false };
Optional<UID> dID;
Database cx;
ACTOR static Future<Version> getReadVersion(SimpleConfigTransactionImpl* self) {
ACTOR static Future<ConfigGeneration> getGeneration(SimpleConfigTransactionImpl* self) {
if (self->dID.present()) {
TraceEvent("SimpleConfigTransactionGettingReadVersion", self->dID.get());
}
ConfigTransactionGetVersionRequest req;
ConfigTransactionGetVersionReply reply =
wait(self->cti.getVersion.getReply(ConfigTransactionGetVersionRequest{}));
ConfigTransactionGetGenerationRequest req;
ConfigTransactionGetGenerationReply reply =
wait(self->cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{}));
if (self->dID.present()) {
TraceEvent("SimpleConfigTransactionGotReadVersion", self->dID.get()).detail("Version", reply.version);
TraceEvent("SimpleConfigTransactionGotReadVersion", self->dID.get())
.detail("Version", reply.generation.liveVersion);
}
return reply.version;
return reply.generation;
}
ACTOR static Future<Optional<Value>> get(SimpleConfigTransactionImpl* self, KeyRef key) {
if (!self->getVersionFuture.isValid()) {
self->getVersionFuture = getReadVersion(self);
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
state ConfigKey configKey = ConfigKey::decodeKey(key);
Version version = wait(self->getVersionFuture);
ConfigGeneration generation = wait(self->getGenerationFuture);
if (self->dID.present()) {
TraceEvent("SimpleConfigTransactionGettingValue", self->dID.get())
.detail("ConfigClass", configKey.configClass)
.detail("KnobName", configKey.knobName);
}
ConfigTransactionGetReply reply =
wait(self->cti.get.getReply(ConfigTransactionGetRequest{ version, configKey }));
wait(self->cti.get.getReply(ConfigTransactionGetRequest{ generation, configKey }));
if (self->dID.present()) {
TraceEvent("SimpleConfigTransactionGotValue", self->dID.get())
.detail("Value", reply.value.get().toString());
@ -74,29 +75,28 @@ class SimpleConfigTransactionImpl {
}
}
ACTOR static Future<Standalone<RangeResultRef>> getConfigClasses(SimpleConfigTransactionImpl* self) {
if (!self->getVersionFuture.isValid()) {
self->getVersionFuture = getReadVersion(self);
ACTOR static Future<RangeResult> getConfigClasses(SimpleConfigTransactionImpl* self) {
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
Version version = wait(self->getVersionFuture);
ConfigGeneration generation = wait(self->getGenerationFuture);
ConfigTransactionGetConfigClassesReply reply =
wait(self->cti.getClasses.getReply(ConfigTransactionGetConfigClassesRequest{ version }));
Standalone<RangeResultRef> result;
wait(self->cti.getClasses.getReply(ConfigTransactionGetConfigClassesRequest{ generation }));
RangeResult result;
for (const auto& configClass : reply.configClasses) {
result.push_back_deep(result.arena(), KeyValueRef(configClass, ""_sr));
}
return result;
}
ACTOR static Future<Standalone<RangeResultRef>> getKnobs(SimpleConfigTransactionImpl* self,
Optional<Key> configClass) {
if (!self->getVersionFuture.isValid()) {
self->getVersionFuture = getReadVersion(self);
ACTOR static Future<RangeResult> getKnobs(SimpleConfigTransactionImpl* self, Optional<Key> configClass) {
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
Version version = wait(self->getVersionFuture);
ConfigGeneration generation = wait(self->getGenerationFuture);
ConfigTransactionGetKnobsReply reply =
wait(self->cti.getKnobs.getReply(ConfigTransactionGetKnobsRequest{ version, configClass }));
Standalone<RangeResultRef> result;
wait(self->cti.getKnobs.getReply(ConfigTransactionGetKnobsRequest{ generation, configClass }));
RangeResult result;
for (const auto& knobName : reply.knobNames) {
result.push_back_deep(result.arena(), KeyValueRef(knobName, ""_sr));
}
@ -104,10 +104,10 @@ class SimpleConfigTransactionImpl {
}
ACTOR static Future<Void> commit(SimpleConfigTransactionImpl* self) {
if (!self->getVersionFuture.isValid()) {
self->getVersionFuture = getReadVersion(self);
if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self);
}
wait(store(self->toCommit.version, self->getVersionFuture));
wait(store(self->toCommit.generation, self->getGenerationFuture));
self->toCommit.annotation.timestamp = now();
wait(self->cti.commit.getReply(self->toCommit));
self->committed = true;
@ -123,29 +123,13 @@ public:
SimpleConfigTransactionImpl(ConfigTransactionInterface const& cti) : cti(cti) {}
void set(KeyRef key, ValueRef value) {
if (key == configTransactionDescriptionKey) {
toCommit.annotation.description = KeyRef(toCommit.arena, value);
} else {
ConfigKey configKey = ConfigKeyRef::decodeKey(key);
auto knobValue = IKnobCollection::parseKnobValue(
configKey.knobName.toString(), value.toString(), IKnobCollection::Type::TEST);
toCommit.mutations.emplace_back_deep(toCommit.arena, configKey, knobValue.contents());
}
}
void set(KeyRef key, ValueRef value) { toCommit.set(key, value); }
void clear(KeyRef key) {
if (key == configTransactionDescriptionKey) {
toCommit.annotation.description = ""_sr;
} else {
toCommit.mutations.emplace_back_deep(
toCommit.arena, ConfigKeyRef::decodeKey(key), Optional<KnobValueRef>{});
}
}
void clear(KeyRef key) { toCommit.clear(key); }
Future<Optional<Value>> get(KeyRef key) { return get(this, key); }
Future<Standalone<RangeResultRef>> getRange(KeyRangeRef keys) {
Future<RangeResult> getRange(KeyRangeRef keys) {
if (keys == configClassKeys) {
return getConfigClasses(this);
} else if (keys == globalConfigKnobKeys) {
@ -170,23 +154,23 @@ public:
}
Future<Version> getReadVersion() {
if (!getVersionFuture.isValid())
getVersionFuture = getReadVersion(this);
return getVersionFuture;
if (!getGenerationFuture.isValid())
getGenerationFuture = getGeneration(this);
return map(getGenerationFuture, [](auto const& gen) { return gen.committedVersion; });
}
Optional<Version> getCachedReadVersion() const {
if (getVersionFuture.isValid() && getVersionFuture.isReady() && !getVersionFuture.isError()) {
return getVersionFuture.get();
if (getGenerationFuture.isValid() && getGenerationFuture.isReady() && !getGenerationFuture.isError()) {
return getGenerationFuture.get().committedVersion;
} else {
return {};
}
}
Version getCommittedVersion() const { return committed ? getVersionFuture.get() : ::invalidVersion; }
Version getCommittedVersion() const { return committed ? getGenerationFuture.get().liveVersion : ::invalidVersion; }
void reset() {
getVersionFuture = Future<Version>{};
getGenerationFuture = Future<ConfigGeneration>{};
toCommit = {};
committed = false;
}
@ -225,19 +209,25 @@ Future<Optional<Value>> SimpleConfigTransaction::get(Key const& key, Snapshot sn
return impl().get(key);
}
Future<Standalone<RangeResultRef>> SimpleConfigTransaction::getRange(KeySelector const& begin,
KeySelector const& end,
int limit,
Snapshot snapshot,
Reverse reverse) {
Future<RangeResult> SimpleConfigTransaction::getRange(KeySelector const& begin,
KeySelector const& end,
int limit,
Snapshot snapshot,
Reverse reverse) {
if (reverse) {
throw client_invalid_operation();
}
return impl().getRange(KeyRangeRef(begin.getKey(), end.getKey()));
}
Future<Standalone<RangeResultRef>> SimpleConfigTransaction::getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot snapshot,
Reverse reverse) {
Future<RangeResult> SimpleConfigTransaction::getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot snapshot,
Reverse reverse) {
if (reverse) {
throw client_invalid_operation();
}
return impl().getRange(KeyRangeRef(begin.getKey(), end.getKey()));
}

View File

@ -50,16 +50,16 @@ public:
Optional<Version> getCachedReadVersion() const override;
Future<Optional<Value>> get(Key const& key, Snapshot = Snapshot::False) override;
Future<Standalone<RangeResultRef>> getRange(KeySelector const& begin,
KeySelector const& end,
int limit,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<Standalone<RangeResultRef>> getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<RangeResult> getRange(KeySelector const& begin,
KeySelector const& end,
int limit,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<RangeResult> getRange(KeySelector begin,
KeySelector end,
GetRangeLimits limits,
Snapshot = Snapshot::False,
Reverse = Reverse::False) override;
Future<Void> commit() override;
Version getCommittedVersion() const override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;

View File

@ -51,7 +51,7 @@ constexpr UID WLTOKEN_PING_PACKET(-1, 1);
constexpr int PACKET_LEN_WIDTH = sizeof(uint32_t);
const uint64_t TOKEN_STREAM_FLAG = 1;
static constexpr int WLTOKEN_COUNTS = 20; // number of wellKnownEndpoints
static constexpr int WLTOKEN_COUNTS = 21; // number of wellKnownEndpoints
class EndpointMap : NonCopyable {
public:

View File

@ -11,6 +11,8 @@ set(FDBSERVER_SRCS
ConfigDatabaseUnitTests.actor.cpp
ConfigFollowerInterface.cpp
ConfigFollowerInterface.h
ConfigNode.actor.cpp
ConfigNode.h
ConflictSet.h
CoordinatedState.actor.cpp
CoordinatedState.h
@ -28,8 +30,6 @@ set(FDBSERVER_SRCS
FDBExecHelper.actor.cpp
FDBExecHelper.actor.h
GrvProxyServer.actor.cpp
IConfigDatabaseNode.cpp
IConfigDatabaseNode.h
IConfigConsumer.cpp
IConfigConsumer.h
IDiskQueue.h
@ -72,8 +72,6 @@ set(FDBSERVER_SRCS
OnDemandStore.h
PaxosConfigConsumer.actor.cpp
PaxosConfigConsumer.h
PaxosConfigDatabaseNode.actor.cpp
PaxosConfigDatabaseNode.h
ProxyCommitData.actor.h
pubsub.actor.cpp
pubsub.h
@ -105,7 +103,6 @@ set(FDBSERVER_SRCS
ServerDBInfo.h
SimpleConfigConsumer.actor.cpp
SimpleConfigConsumer.h
SimpleConfigDatabaseNode.actor.cpp
SimulatedCluster.actor.cpp
SimulatedCluster.h
SkipList.cpp

View File

@ -22,7 +22,7 @@
#include "fdbclient/IConfigTransaction.h"
#include "fdbclient/TestKnobCollection.h"
#include "fdbserver/ConfigBroadcaster.h"
#include "fdbserver/IConfigDatabaseNode.h"
#include "fdbserver/ConfigNode.h"
#include "fdbserver/LocalConfiguration.h"
#include "fdbclient/Tuple.h"
#include "flow/UnitTest.h"
@ -55,7 +55,7 @@ class WriteToTransactionEnvironment {
std::string dataDir;
ConfigTransactionInterface cti;
ConfigFollowerInterface cfi;
Reference<IConfigDatabaseNode> node;
Reference<ConfigNode> node;
Future<Void> ctiServer;
Future<Void> cfiServer;
Version lastWrittenVersion{ 0 };
@ -65,11 +65,10 @@ class WriteToTransactionEnvironment {
return StringRef(reinterpret_cast<uint8_t const*>(s.c_str()), s.size());
}
ACTOR template <class T>
static Future<Void> set(WriteToTransactionEnvironment* self,
Optional<KeyRef> configClass,
T value,
KeyRef knobName) {
ACTOR static Future<Void> set(WriteToTransactionEnvironment* self,
Optional<KeyRef> configClass,
int64_t value,
KeyRef knobName) {
state Reference<IConfigTransaction> tr = IConfigTransaction::createTestSimple(self->cti);
auto configKey = encodeConfigKey(configClass, knobName);
tr->set(configKey, longToValue(value));
@ -94,13 +93,12 @@ class WriteToTransactionEnvironment {
public:
WriteToTransactionEnvironment(std::string const& dataDir)
: dataDir(dataDir), node(IConfigDatabaseNode::createSimple(dataDir)) {
: dataDir(dataDir), node(makeReference<ConfigNode>(dataDir)) {
platform::eraseDirectoryRecursive(dataDir);
setup();
}
template <class T>
Future<Void> set(Optional<KeyRef> configClass, T value, KeyRef knobName = "test_long"_sr) {
Future<Void> set(Optional<KeyRef> configClass, int64_t value, KeyRef knobName = "test_long"_sr) {
return set(this, configClass, value, knobName);
}
@ -111,7 +109,7 @@ public:
void restartNode() {
cfiServer.cancel();
ctiServer.cancel();
node = IConfigDatabaseNode::createSimple(dataDir);
node = makeReference<ConfigNode>(dataDir);
setup();
}
@ -293,7 +291,7 @@ class TransactionEnvironment {
IConfigTransaction::createTestSimple(self->writeTo.getTransactionInterface());
state KeySelector begin = firstGreaterOrEqual(configClassKeys.begin);
state KeySelector end = firstGreaterOrEqual(configClassKeys.end);
Standalone<RangeResultRef> range = wait(tr->getRange(begin, end, 1000));
RangeResult range = wait(tr->getRange(begin, end, 1000));
Standalone<VectorRef<KeyRef>> result;
for (const auto& kv : range) {
result.push_back_deep(result.arena(), kv.key);
@ -312,7 +310,7 @@ class TransactionEnvironment {
}
KeySelector begin = firstGreaterOrEqual(keys.begin);
KeySelector end = firstGreaterOrEqual(keys.end);
Standalone<RangeResultRef> range = wait(tr->getRange(begin, end, 1000));
RangeResult range = wait(tr->getRange(begin, end, 1000));
Standalone<VectorRef<KeyRef>> result;
for (const auto& kv : range) {
result.push_back_deep(result.arena(), kv.key);

View File

@ -27,6 +27,7 @@ void ConfigFollowerInterface::setupWellKnownEndpoints() {
TaskPriority::Coordination);
getChanges.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_GETCHANGES, TaskPriority::Coordination);
compact.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_COMPACT, TaskPriority::Coordination);
getCommittedVersion.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_GETCOMMITTEDVERSION, TaskPriority::Coordination);
}
ConfigFollowerInterface::ConfigFollowerInterface() : _id(deterministicRandom()->randomUniqueID()) {}
@ -35,7 +36,8 @@ ConfigFollowerInterface::ConfigFollowerInterface(NetworkAddress const& remote)
: _id(deterministicRandom()->randomUniqueID()),
getSnapshotAndChanges(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOTANDCHANGES)),
getChanges(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETCHANGES)),
compact(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_COMPACT)) {}
compact(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_COMPACT)),
getCommittedVersion(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETCOMMITTEDVERSION)) {}
bool ConfigFollowerInterface::operator==(ConfigFollowerInterface const& rhs) const {
return _id == rhs._id;

View File

@ -66,7 +66,6 @@ using VersionedConfigCommitAnnotation = Standalone<VersionedConfigCommitAnnotati
struct ConfigFollowerGetSnapshotAndChangesReply {
static constexpr FileIdentifier file_identifier = 1734095;
Version snapshotVersion;
Version changesVersion;
std::map<ConfigKey, KnobValue> snapshot;
// TODO: Share arena
Standalone<VectorRef<VersionedConfigMutationRef>> changes;
@ -76,61 +75,64 @@ struct ConfigFollowerGetSnapshotAndChangesReply {
template <class Snapshot>
explicit ConfigFollowerGetSnapshotAndChangesReply(
Version snapshotVersion,
Version changesVersion,
Snapshot&& snapshot,
Standalone<VectorRef<VersionedConfigMutationRef>> changes,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations)
: snapshotVersion(snapshotVersion), changesVersion(changesVersion), snapshot(std::forward<Snapshot>(snapshot)),
changes(changes), annotations(annotations) {
ASSERT_GE(changesVersion, snapshotVersion);
}
: snapshotVersion(snapshotVersion), snapshot(std::forward<Snapshot>(snapshot)), changes(changes),
annotations(annotations) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, snapshotVersion, changesVersion, snapshot, changes);
serializer(ar, snapshotVersion, snapshot, changes);
}
};
struct ConfigFollowerGetSnapshotAndChangesRequest {
static constexpr FileIdentifier file_identifier = 294811;
ReplyPromise<ConfigFollowerGetSnapshotAndChangesReply> reply;
Version mostRecentVersion;
ConfigFollowerGetSnapshotAndChangesRequest() = default;
explicit ConfigFollowerGetSnapshotAndChangesRequest(Version mostRecentVersion)
: mostRecentVersion(mostRecentVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
serializer(ar, reply, mostRecentVersion);
}
};
struct ConfigFollowerGetChangesReply {
static constexpr FileIdentifier file_identifier = 234859;
Version mostRecentVersion;
// TODO: Share arena
Standalone<VectorRef<VersionedConfigMutationRef>> changes;
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations;
ConfigFollowerGetChangesReply() : mostRecentVersion(0) {}
ConfigFollowerGetChangesReply() = default;
explicit ConfigFollowerGetChangesReply(Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations)
: mostRecentVersion(mostRecentVersion), changes(changes), annotations(annotations) {}
: changes(changes), annotations(annotations) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mostRecentVersion, changes, annotations);
serializer(ar, changes, annotations);
}
};
struct ConfigFollowerGetChangesRequest {
static constexpr FileIdentifier file_identifier = 178935;
Version lastSeenVersion{ 0 };
Version mostRecentVersion{ 0 };
ReplyPromise<ConfigFollowerGetChangesReply> reply;
ConfigFollowerGetChangesRequest() = default;
explicit ConfigFollowerGetChangesRequest(Version lastSeenVersion) : lastSeenVersion(lastSeenVersion) {}
explicit ConfigFollowerGetChangesRequest(Version lastSeenVersion, Version mostRecentVersion)
: lastSeenVersion(lastSeenVersion), mostRecentVersion(mostRecentVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, lastSeenVersion, reply);
serializer(ar, lastSeenVersion, mostRecentVersion, reply);
}
};
@ -148,6 +150,29 @@ struct ConfigFollowerCompactRequest {
}
};
struct ConfigFollowerGetCommittedVersionReply {
static constexpr FileIdentifier file_identifier = 9214735;
Version version;
ConfigFollowerGetCommittedVersionReply() = default;
explicit ConfigFollowerGetCommittedVersionReply(Version version) : version(version) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version);
}
};
struct ConfigFollowerGetCommittedVersionRequest {
static constexpr FileIdentifier file_identifier = 1093472;
ReplyPromise<ConfigFollowerGetCommittedVersionReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
/*
* Configuration database nodes serve a ConfigFollowerInterface which contains well known endpoints,
* used by workers to receive configuration database updates
@ -160,6 +185,7 @@ public:
RequestStream<ConfigFollowerGetSnapshotAndChangesRequest> getSnapshotAndChanges;
RequestStream<ConfigFollowerGetChangesRequest> getChanges;
RequestStream<ConfigFollowerCompactRequest> compact;
RequestStream<ConfigFollowerGetCommittedVersionRequest> getCommittedVersion;
ConfigFollowerInterface();
void setupWellKnownEndpoints();
@ -170,6 +196,6 @@ public:
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact);
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, getCommittedVersion);
}
};

View File

@ -1,5 +1,5 @@
/*
* SimpleConfigDatabaseNode.actor.cpp
* ConfigNode.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
@ -21,7 +21,7 @@
#include <map>
#include "fdbclient/SystemData.h"
#include "fdbserver/SimpleConfigDatabaseNode.h"
#include "fdbserver/ConfigNode.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/OnDemandStore.h"
#include "flow/Arena.h"
@ -33,8 +33,7 @@
namespace {
const KeyRef lastCompactedVersionKey = "lastCompactedVersion"_sr;
const KeyRef liveTransactionVersionKey = "liveTransactionVersion"_sr;
const KeyRef committedVersionKey = "committedVersion"_sr;
const KeyRef currentGenerationKey = "currentGeneration"_sr;
const KeyRangeRef kvKeys = KeyRangeRef("kv/"_sr, "kv0"_sr);
const KeyRangeRef mutationKeys = KeyRangeRef("mutation/"_sr, "mutation0"_sr);
const KeyRangeRef annotationKeys = KeyRangeRef("annotation/"_sr, "annotation0"_sr);
@ -65,9 +64,9 @@ Version getVersionFromVersionedMutationKey(KeyRef versionedMutationKey) {
return fromBigEndian64(bigEndianResult);
}
} //namespace
} // namespace
TEST_CASE("/fdbserver/ConfigDB/SimpleConfigDatabaseNode/Internal/versionedMutationKeys") {
TEST_CASE("/fdbserver/ConfigDB/ConfigNode/Internal/versionedMutationKeys") {
std::vector<Key> keys;
for (Version version = 0; version < 1000; ++version) {
for (int index = 0; index < 5; ++index) {
@ -80,7 +79,7 @@ TEST_CASE("/fdbserver/ConfigDB/SimpleConfigDatabaseNode/Internal/versionedMutati
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/SimpleConfigDatabaseNode/Internal/versionedMutationKeyOrdering") {
TEST_CASE("/fdbserver/ConfigDB/ConfigNode/Internal/versionedMutationKeyOrdering") {
Standalone<VectorRef<KeyRef>> keys;
for (Version version = 0; version < 1000; ++version) {
for (auto index = 0; index < 5; ++index) {
@ -94,7 +93,7 @@ TEST_CASE("/fdbserver/ConfigDB/SimpleConfigDatabaseNode/Internal/versionedMutati
return Void();
}
class SimpleConfigDatabaseNodeImpl {
class ConfigNodeImpl {
UID id;
OnDemandStore kvStore;
CounterCollection cc;
@ -104,6 +103,7 @@ class SimpleConfigDatabaseNodeImpl {
Counter successfulChangeRequests;
Counter failedChangeRequests;
Counter snapshotRequests;
Counter getCommittedVersionRequests;
// Transaction counters
Counter successfulCommits;
@ -114,31 +114,19 @@ class SimpleConfigDatabaseNodeImpl {
Counter newVersionRequests;
Future<Void> logger;
ACTOR static Future<Version> getLiveTransactionVersion(SimpleConfigDatabaseNodeImpl *self) {
Optional<Value> value = wait(self->kvStore->readValue(liveTransactionVersionKey));
state Version liveTransactionVersion = 0;
ACTOR static Future<ConfigGeneration> getGeneration(ConfigNodeImpl* self) {
state ConfigGeneration generation;
Optional<Value> value = wait(self->kvStore->readValue(currentGenerationKey));
if (value.present()) {
liveTransactionVersion = BinaryReader::fromStringRef<Version>(value.get(), IncludeVersion());
generation = BinaryReader::fromStringRef<ConfigGeneration>(value.get(), IncludeVersion());
} else {
self->kvStore->set(KeyValueRef(liveTransactionVersionKey, BinaryWriter::toValue(liveTransactionVersion, IncludeVersion())));
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion())));
wait(self->kvStore->commit());
}
return liveTransactionVersion;
return generation;
}
ACTOR static Future<Version> getCommittedVersion(SimpleConfigDatabaseNodeImpl *self) {
Optional<Value> value = wait(self->kvStore->readValue(committedVersionKey));
state Version committedVersion = 0;
if (value.present()) {
committedVersion = BinaryReader::fromStringRef<Version>(value.get(), IncludeVersion());
} else {
self->kvStore->set(KeyValueRef(committedVersionKey, BinaryWriter::toValue(committedVersion, IncludeVersion())));
wait(self->kvStore->commit());
}
return committedVersion;
}
ACTOR static Future<Version> getLastCompactedVersion(SimpleConfigDatabaseNodeImpl* self) {
ACTOR static Future<Version> getLastCompactedVersion(ConfigNodeImpl* self) {
Optional<Value> value = wait(self->kvStore->readValue(lastCompactedVersionKey));
state Version lastCompactedVersion = 0;
if (value.present()) {
@ -152,12 +140,13 @@ class SimpleConfigDatabaseNodeImpl {
}
// Returns all commit annotations between for commits with version in [startVersion, endVersion]
ACTOR static Future<Standalone<VectorRef<VersionedConfigCommitAnnotationRef>>>
getAnnotations(SimpleConfigDatabaseNodeImpl* self, Version startVersion, Version endVersion) {
ACTOR static Future<Standalone<VectorRef<VersionedConfigCommitAnnotationRef>>> getAnnotations(ConfigNodeImpl* self,
Version startVersion,
Version endVersion) {
Key startKey = versionedAnnotationKey(startVersion);
Key endKey = versionedAnnotationKey(endVersion + 1);
state KeyRangeRef keys(startKey, endKey);
Standalone<RangeResultRef> range = wait(self->kvStore->readRange(keys));
RangeResult range = wait(self->kvStore->readRange(keys));
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> result;
for (const auto& kv : range) {
auto version = getVersionFromVersionedAnnotationKey(kv.key);
@ -169,14 +158,15 @@ class SimpleConfigDatabaseNodeImpl {
}
// Returns all mutations with version in [startVersion, endVersion]
ACTOR static Future<Standalone<VectorRef<VersionedConfigMutationRef>>>
getMutations(SimpleConfigDatabaseNodeImpl* self, Version startVersion, Version endVersion) {
ACTOR static Future<Standalone<VectorRef<VersionedConfigMutationRef>>> getMutations(ConfigNodeImpl* self,
Version startVersion,
Version endVersion) {
Key startKey = versionedMutationKey(startVersion, 0);
Key endKey = versionedMutationKey(endVersion + 1, 0);
state KeyRangeRef keys(startKey, endKey);
Standalone<RangeResultRef> range = wait(self->kvStore->readRange(keys));
RangeResult range = wait(self->kvStore->readRange(keys));
Standalone<VectorRef<VersionedConfigMutationRef>> result;
for (const auto &kv : range) {
for (const auto& kv : range) {
auto version = getVersionFromVersionedMutationKey(kv.key);
ASSERT_LE(version, endVersion);
auto mutation = ObjectReader::fromStringRef<ConfigMutation>(kv.value, IncludeVersion());
@ -185,19 +175,20 @@ class SimpleConfigDatabaseNodeImpl {
return result;
}
ACTOR static Future<Void> getChanges(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetChangesRequest req) {
ACTOR static Future<Void> getChanges(ConfigNodeImpl* self, ConfigFollowerGetChangesRequest req) {
Version lastCompactedVersion = wait(getLastCompactedVersion(self));
if (req.lastSeenVersion < lastCompactedVersion) {
++self->failedChangeRequests;
req.reply.sendError(version_already_compacted());
return Void();
}
state Version committedVersion = wait(getCommittedVersion(self));
state Version committedVersion =
wait(map(getGeneration(self), [](auto const& gen) { return gen.committedVersion; }));
state Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
wait(getMutations(self, req.lastSeenVersion + 1, committedVersion));
state Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> versionedAnnotations =
wait(getAnnotations(self, req.lastSeenVersion + 1, committedVersion));
TraceEvent(SevDebug, "ConfigDatabaseNodeSendingChanges")
TraceEvent(SevDebug, "ConfigDatabaseNodeSendingChanges", self->id)
.detail("ReqLastSeenVersion", req.lastSeenVersion)
.detail("CommittedVersion", committedVersion)
.detail("NumMutations", versionedMutations.size())
@ -209,17 +200,19 @@ class SimpleConfigDatabaseNodeImpl {
// New transactions increment the database's current live version. This effectively serves as a lock, providing
// serializability
ACTOR static Future<Void> getNewVersion(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetVersionRequest req) {
state Version currentVersion = wait(getLiveTransactionVersion(self));
self->kvStore->set(KeyValueRef(liveTransactionVersionKey, BinaryWriter::toValue(++currentVersion, IncludeVersion())));
ACTOR static Future<Void> getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) {
state ConfigGeneration generation = wait(getGeneration(self));
++generation.liveVersion;
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion())));
wait(self->kvStore->commit());
req.reply.send(ConfigTransactionGetVersionReply(currentVersion));
req.reply.send(ConfigTransactionGetGenerationReply{ generation });
return Void();
}
ACTOR static Future<Void> get(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetRequest req) {
Version currentVersion = wait(getLiveTransactionVersion(self));
if (req.version != currentVersion) {
ACTOR static Future<Void> get(ConfigNodeImpl* self, ConfigTransactionGetRequest req) {
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation != currentGeneration) {
// TODO: Also send information about highest seen version
req.reply.sendError(transaction_too_old());
return Void();
}
@ -229,9 +222,10 @@ class SimpleConfigDatabaseNodeImpl {
if (serializedValue.present()) {
value = ObjectReader::fromStringRef<KnobValue>(serializedValue.get(), IncludeVersion());
}
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations = wait(getMutations(self, 0, req.version));
for (const auto &versionedMutation : versionedMutations) {
const auto &mutation = versionedMutation.mutation;
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
wait(getMutations(self, 0, req.generation.committedVersion));
for (const auto& versionedMutation : versionedMutations) {
const auto& mutation = versionedMutation.mutation;
if (mutation.getKey() == req.key) {
if (mutation.isSet()) {
value = mutation.getValue();
@ -247,14 +241,13 @@ class SimpleConfigDatabaseNodeImpl {
// Retrieve all configuration classes that contain explicitly defined knobs
// TODO: Currently it is possible that extra configuration classes may be returned, we
// may want to fix this to clean up the contract
ACTOR static Future<Void> getConfigClasses(SimpleConfigDatabaseNodeImpl* self,
ConfigTransactionGetConfigClassesRequest req) {
Version currentVersion = wait(getLiveTransactionVersion(self));
if (req.version != currentVersion) {
ACTOR static Future<Void> getConfigClasses(ConfigNodeImpl* self, ConfigTransactionGetConfigClassesRequest req) {
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation != currentGeneration) {
req.reply.sendError(transaction_too_old());
return Void();
}
state Standalone<RangeResultRef> snapshot = wait(self->kvStore->readRange(kvKeys));
state RangeResult snapshot = wait(self->kvStore->readRange(kvKeys));
state std::set<Key> configClassesSet;
for (const auto& kv : snapshot) {
auto configKey =
@ -265,7 +258,7 @@ class SimpleConfigDatabaseNodeImpl {
}
state Version lastCompactedVersion = wait(getLastCompactedVersion(self));
state Standalone<VectorRef<VersionedConfigMutationRef>> mutations =
wait(getMutations(self, lastCompactedVersion + 1, req.version));
wait(getMutations(self, lastCompactedVersion + 1, req.generation.committedVersion));
for (const auto& versionedMutation : mutations) {
auto configClass = versionedMutation.mutation.getConfigClass();
if (configClass.present()) {
@ -281,14 +274,14 @@ class SimpleConfigDatabaseNodeImpl {
}
// Retrieve all knobs explicitly defined for the specified configuration class
ACTOR static Future<Void> getKnobs(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetKnobsRequest req) {
Version currentVersion = wait(getLiveTransactionVersion(self));
if (req.version != currentVersion) {
ACTOR static Future<Void> getKnobs(ConfigNodeImpl* self, ConfigTransactionGetKnobsRequest req) {
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation != currentGeneration) {
req.reply.sendError(transaction_too_old());
return Void();
}
// FIXME: Filtering after reading from disk is very inefficient
state Standalone<RangeResultRef> snapshot = wait(self->kvStore->readRange(kvKeys));
state RangeResult snapshot = wait(self->kvStore->readRange(kvKeys));
state std::set<Key> knobSet;
for (const auto& kv : snapshot) {
auto configKey =
@ -299,7 +292,7 @@ class SimpleConfigDatabaseNodeImpl {
}
state Version lastCompactedVersion = wait(getLastCompactedVersion(self));
state Standalone<VectorRef<VersionedConfigMutationRef>> mutations =
wait(getMutations(self, lastCompactedVersion + 1, req.version));
wait(getMutations(self, lastCompactedVersion + 1, req.generation.committedVersion));
for (const auto& versionedMutation : mutations) {
if (versionedMutation.mutation.getConfigClass().template castTo<Key>() == req.configClass) {
if (versionedMutation.mutation.isSet()) {
@ -317,44 +310,45 @@ class SimpleConfigDatabaseNodeImpl {
return Void();
}
ACTOR static Future<Void> commit(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionCommitRequest req) {
Version currentVersion = wait(getLiveTransactionVersion(self));
if (req.version != currentVersion) {
ACTOR static Future<Void> commit(ConfigNodeImpl* self, ConfigTransactionCommitRequest req) {
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation != currentGeneration) {
++self->failedCommits;
req.reply.sendError(transaction_too_old());
return Void();
}
int index = 0;
for (const auto &mutation : req.mutations) {
Key key = versionedMutationKey(req.version, index++);
for (const auto& mutation : req.mutations) {
Key key = versionedMutationKey(req.generation.liveVersion, index++);
Value value = ObjectWriter::toValue(mutation, IncludeVersion());
if (mutation.isSet()) {
TraceEvent("SimpleConfigDatabaseNodeSetting")
TraceEvent("ConfigNodeSetting")
.detail("ConfigClass", mutation.getConfigClass())
.detail("KnobName", mutation.getKnobName())
.detail("Value", mutation.getValue().toString())
.detail("Version", req.version);
.detail("Version", req.generation.liveVersion);
++self->setMutations;
} else {
++self->clearMutations;
}
self->kvStore->set(KeyValueRef(key, value));
}
self->kvStore->set(
KeyValueRef(versionedAnnotationKey(req.version), BinaryWriter::toValue(req.annotation, IncludeVersion())));
self->kvStore->set(KeyValueRef(committedVersionKey, BinaryWriter::toValue(req.version, IncludeVersion())));
self->kvStore->set(KeyValueRef(versionedAnnotationKey(req.generation.liveVersion),
BinaryWriter::toValue(req.annotation, IncludeVersion())));
ConfigGeneration newGeneration = { req.generation.liveVersion, req.generation.liveVersion };
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(newGeneration, IncludeVersion())));
wait(self->kvStore->commit());
++self->successfulCommits;
req.reply.send(Void());
return Void();
}
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionInterface const* cti) {
ACTOR static Future<Void> serve(ConfigNodeImpl* self, ConfigTransactionInterface const* cti) {
loop {
choose {
when(ConfigTransactionGetVersionRequest req = waitNext(cti->getVersion.getFuture())) {
when(ConfigTransactionGetGenerationRequest req = waitNext(cti->getGeneration.getFuture())) {
++self->newVersionRequests;
wait(getNewVersion(self, req));
wait(getNewGeneration(self, req));
}
when(ConfigTransactionGetRequest req = waitNext(cti->get.getFuture())) {
++self->getValueRequests;
@ -374,22 +368,20 @@ class SimpleConfigDatabaseNodeImpl {
}
}
ACTOR static Future<Void> getSnapshotAndChanges(SimpleConfigDatabaseNodeImpl* self,
ACTOR static Future<Void> getSnapshotAndChanges(ConfigNodeImpl* self,
ConfigFollowerGetSnapshotAndChangesRequest req) {
state ConfigFollowerGetSnapshotAndChangesReply reply;
Standalone<RangeResultRef> data = wait(self->kvStore->readRange(kvKeys));
RangeResult data = wait(self->kvStore->readRange(kvKeys));
for (const auto& kv : data) {
reply
.snapshot[BinaryReader::fromStringRef<ConfigKey>(kv.key.removePrefix(kvKeys.begin), IncludeVersion())] =
ObjectReader::fromStringRef<KnobValue>(kv.value, IncludeVersion());
}
wait(store(reply.snapshotVersion, getLastCompactedVersion(self)));
wait(store(reply.changesVersion, getCommittedVersion(self)));
wait(store(reply.changes, getMutations(self, reply.snapshotVersion + 1, reply.changesVersion)));
wait(store(reply.annotations, getAnnotations(self, reply.snapshotVersion + 1, reply.changesVersion)));
wait(store(reply.changes, getMutations(self, reply.snapshotVersion + 1, req.mostRecentVersion)));
wait(store(reply.annotations, getAnnotations(self, reply.snapshotVersion + 1, req.mostRecentVersion)));
TraceEvent(SevDebug, "ConfigDatabaseNodeGettingSnapshot", self->id)
.detail("SnapshotVersion", reply.snapshotVersion)
.detail("ChangesVersion", reply.changesVersion)
.detail("SnapshotSize", reply.snapshot.size())
.detail("ChangesSize", reply.changes.size())
.detail("AnnotationsSize", reply.annotations.size());
@ -400,7 +392,7 @@ class SimpleConfigDatabaseNodeImpl {
// Apply mutations from the WAL in mutationKeys into the kvKeys key space.
// Periodic compaction prevents the database from growing too large, and improve read performance.
// However, commit annotations for compacted mutations are lost
ACTOR static Future<Void> compact(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerCompactRequest req) {
ACTOR static Future<Void> compact(ConfigNodeImpl* self, ConfigFollowerCompactRequest req) {
state Version lastCompactedVersion = wait(getLastCompactedVersion(self));
TraceEvent(SevDebug, "ConfigDatabaseNodeCompacting", self->id)
.detail("Version", req.version)
@ -443,7 +435,13 @@ class SimpleConfigDatabaseNodeImpl {
return Void();
}
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerInterface const* cfi) {
ACTOR static Future<Void> getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) {
ConfigGeneration generation = wait(getGeneration(self));
req.reply.send(ConfigFollowerGetCommittedVersionReply{ generation.committedVersion });
return Void();
}
ACTOR static Future<Void> serve(ConfigNodeImpl* self, ConfigFollowerInterface const* cfi) {
loop {
choose {
when(ConfigFollowerGetSnapshotAndChangesRequest req =
@ -458,22 +456,26 @@ class SimpleConfigDatabaseNodeImpl {
++self->compactRequests;
wait(compact(self, req));
}
when(ConfigFollowerGetCommittedVersionRequest req = waitNext(cfi->getCommittedVersion.getFuture())) {
++self->getCommittedVersionRequests;
wait(getCommittedVersion(self, req));
}
when(wait(self->kvStore->getError())) { ASSERT(false); }
}
}
}
public:
SimpleConfigDatabaseNodeImpl(std::string const& folder)
ConfigNodeImpl(std::string const& folder)
: id(deterministicRandom()->randomUniqueID()), kvStore(folder, id, "globalconf-"), cc("ConfigDatabaseNode"),
compactRequests("CompactRequests", cc), successfulChangeRequests("SuccessfulChangeRequests", cc),
failedChangeRequests("FailedChangeRequests", cc), snapshotRequests("SnapshotRequests", cc),
successfulCommits("SuccessfulCommits", cc), failedCommits("FailedCommits", cc),
setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
getCommittedVersionRequests("GetCommittedVersionRequests", cc), successfulCommits("SuccessfulCommits", cc),
failedCommits("FailedCommits", cc), setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
getValueRequests("GetValueRequests", cc), newVersionRequests("NewVersionRequests", cc) {
logger = traceCounters(
"ConfigDatabaseNodeMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigDatabaseNode");
TraceEvent(SevDebug, "StartingSimpleConfigDatabaseNode", id).detail("KVStoreAlreadyExists", kvStore.exists());
TraceEvent(SevDebug, "StartingConfigNode", id).detail("KVStoreAlreadyExists", kvStore.exists());
}
Future<Void> serve(ConfigTransactionInterface const& cti) { return serve(this, &cti); }
@ -481,15 +483,14 @@ public:
Future<Void> serve(ConfigFollowerInterface const& cfi) { return serve(this, &cfi); }
};
SimpleConfigDatabaseNode::SimpleConfigDatabaseNode(std::string const& folder)
: _impl(std::make_unique<SimpleConfigDatabaseNodeImpl>(folder)) {}
ConfigNode::ConfigNode(std::string const& folder) : _impl(std::make_unique<ConfigNodeImpl>(folder)) {}
SimpleConfigDatabaseNode::~SimpleConfigDatabaseNode() = default;
ConfigNode::~ConfigNode() = default;
Future<Void> SimpleConfigDatabaseNode::serve(ConfigTransactionInterface const& cti) {
Future<Void> ConfigNode::serve(ConfigTransactionInterface const& cti) {
return impl().serve(cti);
}
Future<Void> SimpleConfigDatabaseNode::serve(ConfigFollowerInterface const& cfi) {
Future<Void> ConfigNode::serve(ConfigFollowerInterface const& cfi) {
return impl().serve(cfi);
}

View File

@ -1,5 +1,5 @@
/*
* PaxosConfigDatabaseNode.h
* ConfigNode.h
*
* This source file is part of the FoundationDB open source project
*
@ -20,17 +20,19 @@
#pragma once
#include "fdbserver/IConfigDatabaseNode.h"
#include <string>
/*
* Fault-tolerant configuration database node implementation
*/
class PaxosConfigDatabaseNode : public IConfigDatabaseNode {
std::unique_ptr<class PaxosConfigDatabaseNodeImpl> impl;
#include "fdbclient/ConfigTransactionInterface.h"
#include "fdbserver/ConfigFollowerInterface.h"
class ConfigNode : public ReferenceCounted<ConfigNode> {
std::unique_ptr<class ConfigNodeImpl> _impl;
ConfigNodeImpl const& impl() const { return *_impl; }
ConfigNodeImpl& impl() { return *_impl; }
public:
PaxosConfigDatabaseNode(std::string const& folder);
~PaxosConfigDatabaseNode();
Future<Void> serve(ConfigTransactionInterface const&) override;
Future<Void> serve(ConfigFollowerInterface const&) override;
ConfigNode(std::string const& folder);
~ConfigNode();
Future<Void> serve(ConfigTransactionInterface const&);
Future<Void> serve(ConfigFollowerInterface const&);
};

View File

@ -20,7 +20,7 @@
#include "fdbclient/ConfigTransactionInterface.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/IConfigDatabaseNode.h"
#include "fdbserver/ConfigNode.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/OnDemandStore.h"
@ -727,7 +727,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder,
state OnDemandStore store(dataFolder, myID, "coordination-");
state ConfigTransactionInterface configTransactionInterface;
state ConfigFollowerInterface configFollowerInterface;
state Reference<IConfigDatabaseNode> configDatabaseNode;
state Reference<ConfigNode> configNode;
state Future<Void> configDatabaseServer = Never();
TraceEvent("CoordinationServer", myID)
.detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress())
@ -736,13 +736,9 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder,
if (useConfigDB != UseConfigDB::DISABLED) {
configTransactionInterface.setupWellKnownEndpoints();
configFollowerInterface.setupWellKnownEndpoints();
if (useConfigDB == UseConfigDB::SIMPLE) {
configDatabaseNode = IConfigDatabaseNode::createSimple(dataFolder);
} else {
configDatabaseNode = IConfigDatabaseNode::createPaxos(dataFolder);
}
configNode = makeReference<ConfigNode>(dataFolder);
configDatabaseServer =
configDatabaseNode->serve(configTransactionInterface) || configDatabaseNode->serve(configFollowerInterface);
configNode->serve(configTransactionInterface) || configNode->serve(configFollowerInterface);
}
try {

View File

@ -35,6 +35,7 @@ constexpr UID WLTOKEN_GENERATIONREG_WRITE(-1, 9);
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOTANDCHANGES(-1, 17);
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETCHANGES(-1, 18);
constexpr UID WLTOKEN_CONFIGFOLLOWER_COMPACT(-1, 19);
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETCOMMITTEDVERSION(-1, 20);
struct GenerationRegInterface {
constexpr static FileIdentifier file_identifier = 16726744;

View File

@ -1,31 +0,0 @@
/*
* IConfigDatabaseNode.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/IConfigDatabaseNode.h"
#include "fdbserver/PaxosConfigDatabaseNode.h"
#include "fdbserver/SimpleConfigDatabaseNode.h"
Reference<IConfigDatabaseNode> IConfigDatabaseNode::createSimple(std::string const& folder) {
return makeReference<SimpleConfigDatabaseNode>(folder);
}
Reference<IConfigDatabaseNode> IConfigDatabaseNode::createPaxos(std::string const& folder) {
return makeReference<PaxosConfigDatabaseNode>(folder);
}

View File

@ -1,41 +0,0 @@
/*
* IConfigDatabaseNode.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/ConfigTransactionInterface.h"
#include "fdbserver/ConfigFollowerInterface.h"
#include "flow/FastRef.h"
#include "flow/flow.h"
#include <memory>
/*
* Interface for a single node in the configuration database, run on coordinators
*/
class IConfigDatabaseNode : public ReferenceCounted<IConfigDatabaseNode> {
public:
virtual ~IConfigDatabaseNode() = default;
virtual Future<Void> serve(ConfigTransactionInterface const&) = 0;
virtual Future<Void> serve(ConfigFollowerInterface const&) = 0;
static Reference<IConfigDatabaseNode> createSimple(std::string const& folder);
static Reference<IConfigDatabaseNode> createPaxos(std::string const& folder);
};

View File

@ -215,7 +215,7 @@ class LocalConfigurationImpl {
self->updateInMemoryState(lastSeenVersion);
return Void();
}
Standalone<RangeResultRef> range = wait(self->kvStore->readRange(knobOverrideKeys));
RangeResult range = wait(self->kvStore->readRange(knobOverrideKeys));
for (const auto& kv : range) {
auto configKey =
BinaryReader::fromStringRef<ConfigKey>(kv.key.removePrefix(knobOverrideKeys.begin), IncludeVersion());

View File

@ -20,25 +20,131 @@
#include "fdbserver/PaxosConfigConsumer.h"
class PaxosConfigConsumerImpl {};
class PaxosConfigConsumerImpl {
std::vector<ConfigFollowerInterface> cfis;
Version lastSeenVersion{ 0 };
double pollingInterval;
Optional<double> compactionInterval;
UID id;
PaxosConfigConsumer::PaxosConfigConsumer(ServerCoordinators const& cfi,
Optional<double> pollingInterval,
Optional<double> compactionInterval) {
// TODO: Implement
ASSERT(false);
}
ACTOR static Future<Version> getCommittedVersion(PaxosConfigConsumerImpl* self) {
state std::vector<Future<ConfigFollowerGetCommittedVersionReply>> committedVersionFutures;
committedVersionFutures.reserve(self->cfis.size());
for (const auto& cfi : self->cfis) {
committedVersionFutures.push_back(
cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}));
}
// FIXME: Must tolerate failure and disagreement
wait(waitForAll(committedVersionFutures));
return committedVersionFutures[0].get().version;
}
ACTOR static Future<Void> compactor(PaxosConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
if (!self->compactionInterval.present()) {
wait(Never());
return Void();
}
loop {
state Version compactionVersion = self->lastSeenVersion;
wait(delayJittered(self->compactionInterval.get()));
std::vector<Future<Void>> compactionRequests;
compactionRequests.reserve(compactionRequests.size());
for (const auto& cfi : self->cfis) {
compactionRequests.push_back(cfi.compact.getReply(ConfigFollowerCompactRequest{ compactionVersion }));
}
try {
wait(timeoutError(waitForAll(compactionRequests), 1.0));
} catch (Error& e) {
TraceEvent(SevWarn, "ErrorSendingCompactionRequest").error(e);
}
}
}
ACTOR static Future<Void> getSnapshotAndChanges(PaxosConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
state Version committedVersion = wait(getCommittedVersion(self));
// TODO: Load balance
ConfigFollowerGetSnapshotAndChangesReply reply = wait(self->cfis[0].getSnapshotAndChanges.getReply(
ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }));
TraceEvent(SevDebug, "ConfigConsumerGotSnapshotAndChanges", self->id)
.detail("SnapshotVersion", reply.snapshotVersion)
.detail("SnapshotSize", reply.snapshot.size())
.detail("ChangesVersion", committedVersion)
.detail("ChangesSize", reply.changes.size())
.detail("AnnotationsSize", reply.annotations.size());
ASSERT_GE(committedVersion, self->lastSeenVersion);
self->lastSeenVersion = committedVersion;
broadcaster->applySnapshotAndChanges(
std::move(reply.snapshot), reply.snapshotVersion, reply.changes, committedVersion, reply.annotations);
return Void();
}
ACTOR static Future<Void> fetchChanges(PaxosConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
wait(getSnapshotAndChanges(self, broadcaster));
loop {
try {
state Version committedVersion = wait(getCommittedVersion(self));
ASSERT_GE(committedVersion, self->lastSeenVersion);
if (committedVersion > self->lastSeenVersion) {
// TODO: Load balance
ConfigFollowerGetChangesReply reply = wait(self->cfis[0].getChanges.getReply(
ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }));
for (const auto& versionedMutation : reply.changes) {
TraceEvent te(SevDebug, "ConsumerFetchedMutation", self->id);
te.detail("Version", versionedMutation.version)
.detail("ConfigClass", versionedMutation.mutation.getConfigClass())
.detail("KnobName", versionedMutation.mutation.getKnobName());
if (versionedMutation.mutation.isSet()) {
te.detail("Op", "Set")
.detail("KnobValue", versionedMutation.mutation.getValue().toString());
} else {
te.detail("Op", "Clear");
}
}
self->lastSeenVersion = committedVersion;
broadcaster->applyChanges(reply.changes, committedVersion, reply.annotations);
}
wait(delayJittered(self->pollingInterval));
} catch (Error& e) {
if (e.code() == error_code_version_already_compacted) {
TEST(true); // SimpleConfigConsumer get version_already_compacted error
wait(getSnapshotAndChanges(self, broadcaster));
} else {
throw e;
}
}
}
}
public:
Future<Void> consume(ConfigBroadcaster& broadcaster) {
return fetchChanges(this, &broadcaster) || compactor(this, &broadcaster);
}
UID getID() const { return id; }
PaxosConfigConsumerImpl(std::vector<ConfigFollowerInterface> const& cfis,
double pollingInterval,
Optional<double> compactionInterval)
: cfis(cfis), pollingInterval(pollingInterval), compactionInterval(compactionInterval),
id(deterministicRandom()->randomUniqueID()) {}
};
PaxosConfigConsumer::PaxosConfigConsumer(std::vector<ConfigFollowerInterface> const& cfis,
double pollingInterval,
Optional<double> compactionInterval)
: _impl(std::make_unique<PaxosConfigConsumerImpl>(cfis, pollingInterval, compactionInterval)) {}
PaxosConfigConsumer::PaxosConfigConsumer(ServerCoordinators const& coordinators,
double pollingInterval,
Optional<double> compactionInterval)
: _impl(std::make_unique<PaxosConfigConsumerImpl>(coordinators.configServers, pollingInterval, compactionInterval)) {}
PaxosConfigConsumer::~PaxosConfigConsumer() = default;
Future<Void> PaxosConfigConsumer::consume(ConfigBroadcaster& broadcaster) {
// TODO: Implement
ASSERT(false);
return Void();
return impl().consume(broadcaster);
}
UID PaxosConfigConsumer::getID() const {
// TODO: Implement
ASSERT(false);
return {};
return impl().getID();
}

View File

@ -31,10 +31,15 @@ class PaxosConfigConsumer : public IConfigConsumer {
PaxosConfigConsumerImpl& impl() { return *_impl; }
public:
PaxosConfigConsumer(ServerCoordinators const& cfi,
Optional<double> pollingInterval,
PaxosConfigConsumer(ServerCoordinators const& coordinators,
double pollingInterval,
Optional<double> compactionInterval);
~PaxosConfigConsumer();
Future<Void> consume(ConfigBroadcaster& broadcaster) override;
UID getID() const override;
public: // Testing
PaxosConfigConsumer(std::vector<ConfigFollowerInterface> const& cfis,
double pollingInterval,
Optional<double> compactionInterval);
};

View File

@ -1,42 +0,0 @@
/*
* PaxosConfigDatabaseNode.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/PaxosConfigDatabaseNode.h"
class PaxosConfigDatabaseNodeImpl {};
PaxosConfigDatabaseNode::PaxosConfigDatabaseNode(std::string const& folder) {
// TODO: Implement
ASSERT(false);
}
PaxosConfigDatabaseNode::~PaxosConfigDatabaseNode() = default;
Future<Void> PaxosConfigDatabaseNode::serve(ConfigTransactionInterface const& cti) {
// TODO: Implement
ASSERT(false);
return Void();
}
Future<Void> PaxosConfigDatabaseNode::serve(ConfigFollowerInterface const& cfi) {
// TODO: Implement
ASSERT(false);
return Void();
}

View File

@ -49,28 +49,36 @@ class SimpleConfigConsumerImpl {
}
}
ACTOR static Future<Version> getCommittedVersion(SimpleConfigConsumerImpl* self) {
ConfigFollowerGetCommittedVersionReply committedVersionReply =
wait(self->cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}));
return committedVersionReply.version;
}
ACTOR static Future<Void> fetchChanges(SimpleConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
wait(getSnapshotAndChanges(self, broadcaster));
loop {
try {
ConfigFollowerGetChangesReply reply =
wait(self->cfi.getChanges.getReply(ConfigFollowerGetChangesRequest{ self->lastSeenVersion }));
++self->successfulChangeRequest;
for (const auto& versionedMutation : reply.changes) {
TraceEvent te(SevDebug, "ConsumerFetchedMutation", self->id);
te.detail("Version", versionedMutation.version)
.detail("ConfigClass", versionedMutation.mutation.getConfigClass())
.detail("KnobName", versionedMutation.mutation.getKnobName());
if (versionedMutation.mutation.isSet()) {
te.detail("Op", "Set").detail("KnobValue", versionedMutation.mutation.getValue().toString());
} else {
te.detail("Op", "Clear");
state Version committedVersion = wait(getCommittedVersion(self));
ASSERT_GE(committedVersion, self->lastSeenVersion);
if (committedVersion > self->lastSeenVersion) {
ConfigFollowerGetChangesReply reply = wait(self->cfi.getChanges.getReply(
ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }));
++self->successfulChangeRequest;
for (const auto& versionedMutation : reply.changes) {
TraceEvent te(SevDebug, "ConsumerFetchedMutation", self->id);
te.detail("Version", versionedMutation.version)
.detail("ConfigClass", versionedMutation.mutation.getConfigClass())
.detail("KnobName", versionedMutation.mutation.getKnobName());
if (versionedMutation.mutation.isSet()) {
te.detail("Op", "Set")
.detail("KnobValue", versionedMutation.mutation.getValue().toString());
} else {
te.detail("Op", "Clear");
}
}
}
ASSERT_GE(reply.mostRecentVersion, self->lastSeenVersion);
if (reply.mostRecentVersion > self->lastSeenVersion) {
self->lastSeenVersion = reply.mostRecentVersion;
broadcaster->applyChanges(reply.changes, reply.mostRecentVersion, reply.annotations);
self->lastSeenVersion = committedVersion;
broadcaster->applyChanges(reply.changes, committedVersion, reply.annotations);
}
wait(delayJittered(self->pollingInterval));
} catch (Error& e) {
@ -86,19 +94,20 @@ class SimpleConfigConsumerImpl {
}
ACTOR static Future<Void> getSnapshotAndChanges(SimpleConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
ConfigFollowerGetSnapshotAndChangesReply reply =
wait(self->cfi.getSnapshotAndChanges.getReply(ConfigFollowerGetSnapshotAndChangesRequest{}));
state Version committedVersion = wait(getCommittedVersion(self));
ConfigFollowerGetSnapshotAndChangesReply reply = wait(
self->cfi.getSnapshotAndChanges.getReply(ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }));
++self->snapshotRequest;
TraceEvent(SevDebug, "ConfigConsumerGotSnapshotAndChanges", self->id)
.detail("SnapshotVersion", reply.snapshotVersion)
.detail("SnapshotSize", reply.snapshot.size())
.detail("ChangesVersion", reply.changesVersion)
.detail("ChangesVersion", committedVersion)
.detail("ChangesSize", reply.changes.size())
.detail("AnnotationsSize", reply.annotations.size());
ASSERT_GE(committedVersion, self->lastSeenVersion);
self->lastSeenVersion = committedVersion;
broadcaster->applySnapshotAndChanges(
std::move(reply.snapshot), reply.snapshotVersion, reply.changes, reply.changesVersion, reply.annotations);
ASSERT_GE(reply.changesVersion, self->lastSeenVersion);
self->lastSeenVersion = reply.changesVersion;
std::move(reply.snapshot), reply.snapshotVersion, reply.changes, committedVersion, reply.annotations);
return Void();
}

View File

@ -1,40 +0,0 @@
/*
* SimpleConfigDatabaseNode.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbserver/IConfigDatabaseNode.h"
/*
* A test-only configuration database node implementation that assumes all data is stored on a single coordinator.
* As such, there is no need to handle rolling forward or rolling back mutations, because this one node is considered
* the source of truth.
*/
class SimpleConfigDatabaseNode : public IConfigDatabaseNode {
std::unique_ptr<class SimpleConfigDatabaseNodeImpl> _impl;
SimpleConfigDatabaseNodeImpl const& impl() const { return *_impl; }
SimpleConfigDatabaseNodeImpl& impl() { return *_impl; }
public:
SimpleConfigDatabaseNode(std::string const& folder);
~SimpleConfigDatabaseNode();
Future<Void> serve(ConfigTransactionInterface const&) override;
Future<Void> serve(ConfigFollowerInterface const&) override;
};