1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-28 02:48:09 +08:00

Add rollforward capability to ConfigNode

This commit is contained in:
Lukas Joswiak 2021-08-25 16:21:22 -07:00
parent 919d2566e0
commit a79aea108c
3 changed files with 125 additions and 21 deletions

@ -110,6 +110,10 @@ public:
Future<Void> rollback(Version version) { return cfi.rollback.getReply(ConfigFollowerRollbackRequest{ version }); }
Future<Void> rollforward(Version version, Standalone<VectorRef<VersionedConfigMutationRef>> mutations) {
return cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{ version, mutations });
}
void restartNode() {
cfiServer.cancel();
ctiServer.cancel();
@ -407,6 +411,9 @@ public:
Future<Void> compact() { return writeTo.compact(); }
Future<Void> rollback(Version version) { return writeTo.rollback(version); }
Future<Void> rollforward(Version version, Standalone<VectorRef<VersionedConfigMutationRef>> mutations) {
return writeTo.rollforward(version, mutations);
}
Future<Void> getError() const { return writeTo.getError(); }
};
@ -518,6 +525,10 @@ template <class Env, class... Args>
Future<Void> rollback(Env& env, Args&&... args) {
return waitOrError(env.rollback(std::forward<Args>(args)...), env.getError());
}
template <class Env, class... Args>
Future<Void> rollforward(Env& env, Args&&... args) {
return waitOrError(env.rollforward(std::forward<Args>(args)...), env.getError());
}
ACTOR template <class Env>
Future<Void> testRestartLocalConfig(UnitTestParameters params) {
@ -941,6 +952,44 @@ TEST_CASE("/fdbserver/ConfigDB/Transaction/RollbackToNewerVersion") {
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/Rollforward") {
state TransactionEnvironment env(params.getDataDir());
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
appendVersionedMutation(
mutations, 0, "class-A"_sr, "test_long_v0"_sr, KnobValueRef::create(int64_t{ 1 }).contents());
appendVersionedMutation(
mutations, 1, "class-B"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
wait(rollforward(env, 1, mutations));
wait(check(env, "class-A"_sr, "test_long_v0"_sr, Optional<int64_t>{ 1 }));
wait(check(env, "class-B"_sr, "test_long_v1"_sr, Optional<int64_t>{ 2 }));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/RollforwardWithExistingMutations") {
state TransactionEnvironment env(params.getDataDir());
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
appendVersionedMutation(
mutations, 1, "class-A"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
wait(rollforward(env, 1, mutations));
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{ 1 }));
wait(check(env, "class-A"_sr, "test_long_v1"_sr, Optional<int64_t>{ 2 }));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/RollforwardWithInvalidMutation") {
state TransactionEnvironment env(params.getDataDir());
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
appendVersionedMutation(
mutations, 1, "class-A"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 1 }).contents());
appendVersionedMutation(
mutations, 10, "class-A"_sr, "test_long_v10"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
wait(rollforward(env, 5, mutations));
wait(check(env, "class-A"_sr, "test_long_v1"_sr, Optional<int64_t>{ 1 }));
wait(check(env, "class-A"_sr, "test_long_v10"_sr, Optional<int64_t>{}));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/GetConfigClasses") {
wait(testGetConfigClasses(params, false));
return Void();

@ -164,6 +164,23 @@ struct ConfigFollowerRollbackRequest {
}
};
struct ConfigFollowerRollforwardRequest {
static constexpr FileIdentifier file_identifier = 678894;
Version version{ 0 };
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
ReplyPromise<Void> reply;
ConfigFollowerRollforwardRequest() = default;
explicit ConfigFollowerRollforwardRequest(Version version,
Standalone<VectorRef<VersionedConfigMutationRef>> mutations)
: version(version), mutations(mutations) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, mutations, reply);
}
};
struct ConfigFollowerGetCommittedVersionReply {
static constexpr FileIdentifier file_identifier = 9214735;
Version version;
@ -200,6 +217,7 @@ public:
RequestStream<ConfigFollowerGetChangesRequest> getChanges;
RequestStream<ConfigFollowerCompactRequest> compact;
RequestStream<ConfigFollowerRollbackRequest> rollback;
RequestStream<ConfigFollowerRollforwardRequest> rollforward;
RequestStream<ConfigFollowerGetCommittedVersionRequest> getCommittedVersion;
ConfigFollowerInterface();
@ -211,6 +229,6 @@ public:
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, rollback, getCommittedVersion);
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, rollback, rollforward, getCommittedVersion);
}
};

@ -316,6 +316,38 @@ class ConfigNodeImpl {
return Void();
}
ACTOR static Future<Void> commitMutations(ConfigNodeImpl* self,
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
std::unordered_map<Version, int> versionIndex,
Version commitVersion,
ConfigCommitAnnotationRef annotation) {
for (const auto& mutation : mutations) {
if (mutation.version > commitVersion) {
continue;
}
Key key = versionedMutationKey(mutation.version, versionIndex[mutation.version]++);
Value value = ObjectWriter::toValue(mutation.mutation, IncludeVersion());
if (mutation.mutation.isSet()) {
TraceEvent("ConfigNodeSetting")
.detail("ConfigClass", mutation.mutation.getConfigClass())
.detail("KnobName", mutation.mutation.getKnobName())
.detail("Value", mutation.mutation.getValue().toString())
.detail("Version", mutation.version);
++self->setMutations;
} else {
++self->clearMutations;
}
self->kvStore->set(KeyValueRef(key, value));
}
self->kvStore->set(
KeyValueRef(versionedAnnotationKey(commitVersion), BinaryWriter::toValue(annotation, IncludeVersion())));
ConfigGeneration newGeneration = { commitVersion, commitVersion };
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(newGeneration, IncludeVersion())));
wait(self->kvStore->commit());
++self->successfulCommits;
return Void();
}
ACTOR static Future<Void> commit(ConfigNodeImpl* self, ConfigTransactionCommitRequest req) {
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation.committedVersion != currentGeneration.committedVersion) {
@ -327,28 +359,11 @@ class ConfigNodeImpl {
req.reply.sendError(not_committed());
return Void();
}
int index = 0;
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
for (const auto& mutation : req.mutations) {
Key key = versionedMutationKey(req.generation.liveVersion, index++);
Value value = ObjectWriter::toValue(mutation, IncludeVersion());
if (mutation.isSet()) {
TraceEvent("ConfigNodeSetting")
.detail("ConfigClass", mutation.getConfigClass())
.detail("KnobName", mutation.getKnobName())
.detail("Value", mutation.getValue().toString())
.detail("Version", req.generation.liveVersion);
++self->setMutations;
} else {
++self->clearMutations;
}
self->kvStore->set(KeyValueRef(key, value));
mutations.emplace_back_deep(mutations.arena(), req.generation.liveVersion, mutation);
}
self->kvStore->set(KeyValueRef(versionedAnnotationKey(req.generation.liveVersion),
BinaryWriter::toValue(req.annotation, IncludeVersion())));
ConfigGeneration newGeneration = { req.generation.liveVersion, req.generation.liveVersion };
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(newGeneration, IncludeVersion())));
wait(self->kvStore->commit());
++self->successfulCommits;
wait(commitMutations(self, mutations, {}, req.generation.liveVersion, req.annotation));
req.reply.send(Void());
return Void();
}
@ -446,6 +461,7 @@ class ConfigNodeImpl {
}
ACTOR static Future<Void> rollback(ConfigNodeImpl* self, ConfigFollowerRollbackRequest req) {
// TODO: Actaully delete mutations from kvstore (similar to compact)
state ConfigGeneration generation = wait(getGeneration(self));
if (req.version < generation.committedVersion) {
generation.committedVersion = req.version;
@ -455,6 +471,23 @@ class ConfigNodeImpl {
return Void();
}
ACTOR static Future<Void> rollforward(ConfigNodeImpl* self, ConfigFollowerRollforwardRequest req) {
state std::unordered_map<Version, int> versionIndex;
state int i;
for (i = 0; i < req.mutations.size(); ++i) {
state VersionedConfigMutationRef mutation = req.mutations[i];
if (versionIndex.find(mutation.version) == versionIndex.end()) {
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
wait(getMutations(self, mutation.version, mutation.version));
versionIndex[mutation.version] = versionedMutations.size();
}
}
wait(commitMutations(
self, req.mutations, versionIndex, req.version, ConfigCommitAnnotationRef("rollforward"_sr, now())));
req.reply.send(Void());
return Void();
}
ACTOR static Future<Void> getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) {
ConfigGeneration generation = wait(getGeneration(self));
req.reply.send(ConfigFollowerGetCommittedVersionReply{ generation.committedVersion });
@ -480,6 +513,10 @@ class ConfigNodeImpl {
++self->rollbackRequests;
wait(rollback(self, req));
}
when(ConfigFollowerRollforwardRequest req = waitNext(cfi->rollforward.getFuture())) {
++self->rollforwardRequests;
wait(rollforward(self, req));
}
when(ConfigFollowerGetCommittedVersionRequest req = waitNext(cfi->getCommittedVersion.getFuture())) {
++self->getCommittedVersionRequests;
wait(getCommittedVersion(self, req));