mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 18:56:00 +08:00
Add rollback capability to ConfigNode
This commit is contained in:
parent
1d36c18a68
commit
919d2566e0
@ -108,6 +108,8 @@ public:
|
||||
|
||||
Future<Void> compact() { return cfi.compact.getReply(ConfigFollowerCompactRequest{ lastWrittenVersion }); }
|
||||
|
||||
Future<Void> rollback(Version version) { return cfi.rollback.getReply(ConfigFollowerRollbackRequest{ version }); }
|
||||
|
||||
void restartNode() {
|
||||
cfiServer.cancel();
|
||||
ctiServer.cancel();
|
||||
@ -404,6 +406,7 @@ public:
|
||||
}
|
||||
|
||||
Future<Void> compact() { return writeTo.compact(); }
|
||||
Future<Void> rollback(Version version) { return writeTo.rollback(version); }
|
||||
Future<Void> getError() const { return writeTo.getError(); }
|
||||
};
|
||||
|
||||
@ -511,6 +514,10 @@ Future<Void> compact(BroadcasterToLocalConfigEnvironment& env) {
|
||||
env.compact();
|
||||
return Void();
|
||||
}
|
||||
template <class Env, class... Args>
|
||||
Future<Void> rollback(Env& env, Args&&... args) {
|
||||
return waitOrError(env.rollback(std::forward<Args>(args)...), env.getError());
|
||||
}
|
||||
|
||||
ACTOR template <class Env>
|
||||
Future<Void> testRestartLocalConfig(UnitTestParameters params) {
|
||||
@ -904,6 +911,36 @@ TEST_CASE("/fdbserver/ConfigDB/Transaction/CompactNode") {
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/ConfigDB/Transaction/Rollback") {
|
||||
state TransactionEnvironment env(params.getDataDir());
|
||||
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
|
||||
// Rollback to version 0 should undo the set.
|
||||
wait(rollback(env, 0));
|
||||
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{}));
|
||||
// Make sure sets still work after rollback.
|
||||
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 2 }));
|
||||
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{ 2 }));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/ConfigDB/Transaction/RollbackToCurrentVersion") {
|
||||
state TransactionEnvironment env(params.getDataDir());
|
||||
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
|
||||
// Rollback to the latest written version shouldn't undo anything.
|
||||
wait(rollback(env, 1));
|
||||
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{ 1 }));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/ConfigDB/Transaction/RollbackToNewerVersion") {
|
||||
state TransactionEnvironment env(params.getDataDir());
|
||||
wait(set(env, "class-A"_sr, "test_long"_sr, int64_t{ 1 }));
|
||||
// Rollback to the a version not yet committed should have no effect.
|
||||
wait(rollback(env, 999));
|
||||
wait(check(env, "class-A"_sr, "test_long"_sr, Optional<int64_t>{ 1 }));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/ConfigDB/Transaction/GetConfigClasses") {
|
||||
wait(testGetConfigClasses(params, false));
|
||||
return Void();
|
||||
|
@ -150,6 +150,20 @@ struct ConfigFollowerCompactRequest {
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerRollbackRequest {
|
||||
static constexpr FileIdentifier file_identifier = 765456;
|
||||
Version version{ 0 };
|
||||
ReplyPromise<Void> reply;
|
||||
|
||||
ConfigFollowerRollbackRequest() = default;
|
||||
explicit ConfigFollowerRollbackRequest(Version version) : version(version) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerGetCommittedVersionReply {
|
||||
static constexpr FileIdentifier file_identifier = 9214735;
|
||||
Version version;
|
||||
@ -185,6 +199,7 @@ public:
|
||||
RequestStream<ConfigFollowerGetSnapshotAndChangesRequest> getSnapshotAndChanges;
|
||||
RequestStream<ConfigFollowerGetChangesRequest> getChanges;
|
||||
RequestStream<ConfigFollowerCompactRequest> compact;
|
||||
RequestStream<ConfigFollowerRollbackRequest> rollback;
|
||||
RequestStream<ConfigFollowerGetCommittedVersionRequest> getCommittedVersion;
|
||||
|
||||
ConfigFollowerInterface();
|
||||
@ -196,6 +211,6 @@ public:
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, getCommittedVersion);
|
||||
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact, rollback, getCommittedVersion);
|
||||
}
|
||||
};
|
||||
|
@ -100,6 +100,8 @@ class ConfigNodeImpl {
|
||||
|
||||
// Follower counters
|
||||
Counter compactRequests;
|
||||
Counter rollbackRequests;
|
||||
Counter rollforwardRequests;
|
||||
Counter successfulChangeRequests;
|
||||
Counter failedChangeRequests;
|
||||
Counter snapshotRequests;
|
||||
@ -443,6 +445,16 @@ class ConfigNodeImpl {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> rollback(ConfigNodeImpl* self, ConfigFollowerRollbackRequest req) {
|
||||
state ConfigGeneration generation = wait(getGeneration(self));
|
||||
if (req.version < generation.committedVersion) {
|
||||
generation.committedVersion = req.version;
|
||||
self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion())));
|
||||
}
|
||||
req.reply.send(Void());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getCommittedVersion(ConfigNodeImpl* self, ConfigFollowerGetCommittedVersionRequest req) {
|
||||
ConfigGeneration generation = wait(getGeneration(self));
|
||||
req.reply.send(ConfigFollowerGetCommittedVersionReply{ generation.committedVersion });
|
||||
@ -464,6 +476,10 @@ class ConfigNodeImpl {
|
||||
++self->compactRequests;
|
||||
wait(compact(self, req));
|
||||
}
|
||||
when(ConfigFollowerRollbackRequest req = waitNext(cfi->rollback.getFuture())) {
|
||||
++self->rollbackRequests;
|
||||
wait(rollback(self, req));
|
||||
}
|
||||
when(ConfigFollowerGetCommittedVersionRequest req = waitNext(cfi->getCommittedVersion.getFuture())) {
|
||||
++self->getCommittedVersionRequests;
|
||||
wait(getCommittedVersion(self, req));
|
||||
@ -476,7 +492,8 @@ class ConfigNodeImpl {
|
||||
public:
|
||||
ConfigNodeImpl(std::string const& folder)
|
||||
: id(deterministicRandom()->randomUniqueID()), kvStore(folder, id, "globalconf-"), cc("ConfigNode"),
|
||||
compactRequests("CompactRequests", cc), successfulChangeRequests("SuccessfulChangeRequests", cc),
|
||||
compactRequests("CompactRequests", cc), rollbackRequests("RollbackRequests", cc),
|
||||
rollforwardRequests("RollforwardRequests", cc), successfulChangeRequests("SuccessfulChangeRequests", cc),
|
||||
failedChangeRequests("FailedChangeRequests", cc), snapshotRequests("SnapshotRequests", cc),
|
||||
getCommittedVersionRequests("GetCommittedVersionRequests", cc), successfulCommits("SuccessfulCommits", cc),
|
||||
failedCommits("FailedCommits", cc), setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
|
||||
|
Loading…
x
Reference in New Issue
Block a user