1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-28 10:52:03 +08:00

Overwrite existing mutations with rollforward mutations, use existing annotations

This commit is contained in:
Lukas Joswiak 2021-08-26 12:17:58 -07:00
parent b3a633b7d4
commit 9c561f49d9
3 changed files with 64 additions and 32 deletions

@ -110,8 +110,12 @@ public:
Future<Void> rollback(Version version) { return cfi.rollback.getReply(ConfigFollowerRollbackRequest{ version }); }
Future<Void> rollforward(Version version, Standalone<VectorRef<VersionedConfigMutationRef>> mutations) {
return cfi.rollforward.getReply(ConfigFollowerRollforwardRequest{ version, mutations });
Future<Void> rollforward(Version beginVersion,
Version endVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
const std::map<Version, ConfigCommitAnnotationRef>& annotations) {
return cfi.rollforward.getReply(
ConfigFollowerRollforwardRequest{ beginVersion, endVersion, mutations, annotations });
}
void restartNode() {
@ -411,8 +415,11 @@ public:
Future<Void> compact() { return writeTo.compact(); }
Future<Void> rollback(Version version) { return writeTo.rollback(version); }
Future<Void> rollforward(Version version, Standalone<VectorRef<VersionedConfigMutationRef>> mutations) {
return writeTo.rollforward(version, mutations);
Future<Void> rollforward(Version beginVersion,
Version endVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
const std::map<Version, ConfigCommitAnnotationRef>& annotations) {
return writeTo.rollforward(beginVersion, endVersion, mutations, annotations);
}
Future<Void> getError() const { return writeTo.getError(); }
};
@ -959,21 +966,33 @@ TEST_CASE("/fdbserver/ConfigDB/Transaction/Rollforward") {
mutations, 0, "class-A"_sr, "test_long_v0"_sr, KnobValueRef::create(int64_t{ 1 }).contents());
appendVersionedMutation(
mutations, 1, "class-B"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
wait(rollforward(env, 1, mutations));
std::map<Version, ConfigCommitAnnotationRef> annotations = {
{ 0, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } },
{ 1, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } }
};
wait(rollforward(env, 0, 1, mutations, annotations));
wait(check(env, "class-A"_sr, "test_long_v0"_sr, Optional<int64_t>{ 1 }));
wait(check(env, "class-B"_sr, "test_long_v1"_sr, Optional<int64_t>{ 2 }));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/RollforwardWithExistingMutations") {
TEST_CASE("/fdbserver/ConfigDB/Transaction/RollforwardWithExistingMutation") {
state TransactionEnvironment env(params.getDataDir());
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
appendVersionedMutation(
mutations, 1, "class-A"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
wait(rollforward(env, 1, mutations));
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{ 1 }));
appendVersionedMutation(
mutations, 2, "class-A"_sr, "test_long_v2"_sr, KnobValueRef::create(int64_t{ 3 }).contents());
std::map<Version, ConfigCommitAnnotationRef> annotations = {
{ 1, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } },
{ 2, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } }
};
wait(rollforward(env, 1, 2, mutations, annotations));
// Existing mutations will be overwritten by the rollforward request.
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{}));
wait(check(env, "class-A"_sr, "test_long_v1"_sr, Optional<int64_t>{ 2 }));
wait(check(env, "class-A"_sr, "test_long_v2"_sr, Optional<int64_t>{ 3 }));
return Void();
}
@ -984,7 +1003,10 @@ TEST_CASE("/fdbserver/ConfigDB/Transaction/RollforwardWithInvalidMutation") {
mutations, 1, "class-A"_sr, "test_long_v1"_sr, KnobValueRef::create(int64_t{ 1 }).contents());
appendVersionedMutation(
mutations, 10, "class-A"_sr, "test_long_v10"_sr, KnobValueRef::create(int64_t{ 2 }).contents());
wait(rollforward(env, 5, mutations));
std::map<Version, ConfigCommitAnnotationRef> annotations = {
{ 1, ConfigCommitAnnotationRef{ "unit_test"_sr, now() } }
};
wait(rollforward(env, 0, 5, mutations, annotations));
wait(check(env, "class-A"_sr, "test_long_v1"_sr, Optional<int64_t>{ 1 }));
wait(check(env, "class-A"_sr, "test_long_v10"_sr, Optional<int64_t>{}));
return Void();

@ -166,18 +166,22 @@ struct ConfigFollowerRollbackRequest {
struct ConfigFollowerRollforwardRequest {
static constexpr FileIdentifier file_identifier = 678894;
Version version{ 0 };
Version beginVersion{ 0 };
Version endVersion{ 0 };
Standalone<VectorRef<VersionedConfigMutationRef>> mutations;
std::map<Version, ConfigCommitAnnotationRef> annotations;
ReplyPromise<Void> reply;
ConfigFollowerRollforwardRequest() = default;
explicit ConfigFollowerRollforwardRequest(Version version,
Standalone<VectorRef<VersionedConfigMutationRef>> mutations)
: version(version), mutations(mutations) {}
explicit ConfigFollowerRollforwardRequest(Version beginVersion,
Version endVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
std::map<Version, ConfigCommitAnnotationRef> annotations)
: beginVersion(beginVersion), endVersion(endVersion), mutations(mutations), annotations(annotations) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, mutations, reply);
serializer(ar, beginVersion, endVersion, mutations, annotations, reply);
}
};

@ -318,14 +318,22 @@ class ConfigNodeImpl {
ACTOR static Future<Void> commitMutations(ConfigNodeImpl* self,
Standalone<VectorRef<VersionedConfigMutationRef>> mutations,
std::unordered_map<Version, int> versionIndex,
Version commitVersion,
ConfigCommitAnnotationRef annotation) {
std::map<Version, ConfigCommitAnnotationRef> annotations,
Version commitVersion) {
Version latestVersion = 0;
int index = 0;
for (const auto& mutation : mutations) {
if (mutation.version > commitVersion) {
continue;
}
Key key = versionedMutationKey(mutation.version, versionIndex[mutation.version]++);
// Mutations should be in ascending version order.
ASSERT(mutation.version >= latestVersion);
if (mutation.version > latestVersion) {
latestVersion = mutation.version;
index = 0;
}
ASSERT(annotations.find(mutation.version) != annotations.end());
Key key = versionedMutationKey(mutation.version, index++);
Value value = ObjectWriter::toValue(mutation.mutation, IncludeVersion());
if (mutation.mutation.isSet()) {
TraceEvent("ConfigNodeSetting")
@ -339,8 +347,10 @@ class ConfigNodeImpl {
}
self->kvStore->set(KeyValueRef(key, value));
}
self->kvStore->set(
KeyValueRef(versionedAnnotationKey(commitVersion), BinaryWriter::toValue(annotation, IncludeVersion())));
for (const auto& [version, annotation] : annotations) {
self->kvStore->set(
KeyValueRef(versionedAnnotationKey(version), BinaryWriter::toValue(annotation, IncludeVersion())));
}
ConfigGeneration newGeneration = { commitVersion, commitVersion };
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(newGeneration, IncludeVersion())));
wait(self->kvStore->commit());
@ -363,7 +373,8 @@ class ConfigNodeImpl {
for (const auto& mutation : req.mutations) {
mutations.emplace_back_deep(mutations.arena(), req.generation.liveVersion, mutation);
}
wait(commitMutations(self, mutations, {}, req.generation.liveVersion, req.annotation));
wait(commitMutations(
self, mutations, { { req.generation.liveVersion, req.annotation } }, req.generation.liveVersion));
req.reply.send(Void());
return Void();
}
@ -472,18 +483,13 @@ class ConfigNodeImpl {
}
ACTOR static Future<Void> rollforward(ConfigNodeImpl* self, ConfigFollowerRollforwardRequest req) {
state std::unordered_map<Version, int> versionIndex;
state int i;
for (i = 0; i < req.mutations.size(); ++i) {
state VersionedConfigMutationRef mutation = req.mutations[i];
if (versionIndex.find(mutation.version) == versionIndex.end()) {
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
wait(getMutations(self, mutation.version, mutation.version));
versionIndex[mutation.version] = versionedMutations.size();
}
ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.beginVersion != currentGeneration.committedVersion) {
++self->failedCommits;
req.reply.sendError(transaction_too_old());
return Void();
}
wait(commitMutations(
self, req.mutations, versionIndex, req.version, ConfigCommitAnnotationRef("rollforward"_sr, now())));
wait(commitMutations(self, req.mutations, req.annotations, req.endVersion));
req.reply.send(Void());
return Void();
}