Retry messages to well known endpoints, add notes for future work

This commit is contained in:
Lukas Joswiak 2021-10-21 12:32:09 -07:00 committed by Trevor Clinkenbeard
parent 92998fd20b
commit 57c2cf4a24
2 changed files with 25 additions and 10 deletions

View File

@ -186,7 +186,15 @@ class ConfigNodeImpl {
}
state Version committedVersion =
wait(map(getGeneration(self), [](auto const& gen) { return gen.committedVersion; }));
ASSERT(req.lastSeenVersion < committedVersion);
// TODO: Reenable this when running the ConfigIncrement workload with reboot=false
// if (committedVersion < req.mostRecentVersion) {
// // Handle a very rare case where a ConfigNode loses data between
// // responding with a committed version and responding to the
// // subsequent get changes request.
// TEST(true); // ConfigNode data loss occurred on a minority of coordinators
// req.reply.sendError(process_behind()); // Reuse the process_behind error
// return Void();
// }
state Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
wait(getMutations(self, req.lastSeenVersion + 1, committedVersion));
state Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> versionedAnnotations =

View File

@ -76,7 +76,9 @@ class GetCommittedVersionQuorum {
// Now roll node forward to match the largest committed version of
// the replies.
// TODO: Load balance over quorum
// TODO: Load balance over quorum. Also need to catch
// error_code_process_behind and retry with the next ConfigNode in
// the quorum.
state ConfigFollowerInterface quorumCfi = self->replies[self->largestCommitted][0];
try {
ConfigFollowerGetChangesReply reply = wait(retryBrokenPromise(
@ -111,7 +113,6 @@ class GetCommittedVersionQuorum {
ACTOR static Future<Void> getCommittedVersionActor(GetCommittedVersionQuorum* self, ConfigFollowerInterface cfi) {
try {
// TODO: Timeout value should be a variable/field
ConfigFollowerGetCommittedVersionReply reply =
wait(timeoutError(cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}),
getCommittedVersionTimeout));
@ -175,7 +176,10 @@ class GetCommittedVersionQuorum {
// occurred, the quorum version hasn't been set, and there are
// no more incoming responses. Note that this means that it is
// impossible to reach a quorum, so send back the largest
// committed version seen.
// committed version seen. We also need to store the interface
// for the timed out server for future communication attempts.
auto& nodes = self->replies[self->largestCommitted];
nodes.push_back(cfi);
self->quorumVersion.send(CommittedVersions{ self->lastSeenVersion, self->largestCommitted });
}
}
@ -241,8 +245,8 @@ class PaxosConfigConsumerImpl {
state Version committedVersion = wait(getCommittedVersion(self));
// TODO: Load balance
ConfigFollowerGetSnapshotAndChangesReply reply =
wait(self->getCommittedVersionQuorum.getReadReplicas()[0].getSnapshotAndChanges.getReply(
ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }));
wait(retryBrokenPromise(self->getCommittedVersionQuorum.getReadReplicas()[0].getSnapshotAndChanges,
ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }));
TraceEvent(SevDebug, "ConfigConsumerGotSnapshotAndChanges", self->id)
.detail("SnapshotVersion", reply.snapshotVersion)
.detail("SnapshotSize", reply.snapshot.size())
@ -271,10 +275,11 @@ class PaxosConfigConsumerImpl {
// ConfigNodes changes to 1, 1, 2, the committed version
// returned would be 1.
if (committedVersion > self->lastSeenVersion) {
// TODO: Load balance
ConfigFollowerGetChangesReply reply =
wait(self->getCommittedVersionQuorum.getReadReplicas()[0].getChanges.getReply(
ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }));
// TODO: Load balance to avoid always hitting the
// node at index 0 first
ConfigFollowerGetChangesReply reply = wait(
retryBrokenPromise(self->getCommittedVersionQuorum.getReadReplicas()[0].getChanges,
ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }));
for (const auto& versionedMutation : reply.changes) {
TraceEvent te(SevDebug, "ConsumerFetchedMutation", self->id);
te.detail("Version", versionedMutation.version)
@ -289,6 +294,8 @@ class PaxosConfigConsumerImpl {
}
self->lastSeenVersion = committedVersion;
broadcaster->applyChanges(reply.changes, committedVersion, reply.annotations);
// TODO: Catch error_code_process_behind and retry with
// the next ConfigNode in the quorum.
}
wait(delayJittered(self->pollingInterval));
} catch (Error& e) {