diff --git a/fdbclient/ConfigTransactionInterface.cpp b/fdbclient/ConfigTransactionInterface.cpp index 66618e01d7..1e07395ecf 100644 --- a/fdbclient/ConfigTransactionInterface.cpp +++ b/fdbclient/ConfigTransactionInterface.cpp @@ -55,6 +55,22 @@ bool ConfigGeneration::operator!=(ConfigGeneration const& rhs) const { return !(*this == rhs); } +bool ConfigGeneration::operator<(ConfigGeneration const& rhs) const { + if (committedVersion != rhs.committedVersion) { + return committedVersion < rhs.committedVersion; + } else { + return liveVersion < rhs.liveVersion; + } +} + +bool ConfigGeneration::operator>(ConfigGeneration const& rhs) const { + if (committedVersion != rhs.committedVersion) { + return committedVersion > rhs.committedVersion; + } else { + return liveVersion > rhs.liveVersion; + } +} + void ConfigTransactionCommitRequest::set(KeyRef key, ValueRef value) { if (key == configTransactionDescriptionKey) { annotation.description = KeyRef(arena, value); diff --git a/fdbclient/ConfigTransactionInterface.h b/fdbclient/ConfigTransactionInterface.h index ff85760a3f..a9c9d2995a 100644 --- a/fdbclient/ConfigTransactionInterface.h +++ b/fdbclient/ConfigTransactionInterface.h @@ -28,18 +28,20 @@ #include "flow/flow.h" struct ConfigGeneration { - // The live version of each node is monotonically increasing - Version liveVersion{ 0 }; // The committedVersion of each node is the version of the last commit made durable. // Each committedVersion was previously given to clients as a liveVersion, prior to commit. Version committedVersion{ 0 }; + // The live version of each node is monotonically increasing + Version liveVersion{ 0 }; bool operator==(ConfigGeneration const&) const; bool operator!=(ConfigGeneration const&) const; + bool operator<(ConfigGeneration const&) const; + bool operator>(ConfigGeneration const&) const; template <class Ar> void serialize(Ar& ar) { - serializer(ar, liveVersion, committedVersion); + serializer(ar, committedVersion, liveVersion); } }; @@ -57,12 +59,16 @@ struct ConfigTransactionGetGenerationReply { struct ConfigTransactionGetGenerationRequest { static constexpr FileIdentifier file_identifier = 138941; + // A hint to catch up lagging nodes: + Optional<Version> lastSeenLiveVersion; ReplyPromise<ConfigTransactionGetGenerationReply> reply; ConfigTransactionGetGenerationRequest() = default; + explicit ConfigTransactionGetGenerationRequest(Optional<Version> const& lastSeenLiveVersion) + : lastSeenLiveVersion(lastSeenLiveVersion) {} template <class Ar> void serialize(Ar& ar) { - serializer(ar, reply); + serializer(ar, lastSeenLiveVersion, reply); } }; diff --git a/fdbclient/PaxosConfigTransaction.actor.cpp b/fdbclient/PaxosConfigTransaction.actor.cpp index 7b869e72c4..0143aa949e 100644 --- a/fdbclient/PaxosConfigTransaction.actor.cpp +++ b/fdbclient/PaxosConfigTransaction.actor.cpp @@ -22,24 +22,81 @@ #include "fdbclient/PaxosConfigTransaction.h" #include "flow/actorcompiler.h" // must be last include +namespace { + +class GetGenerationQuorum { + std::vector<Future<Void>> futures; + std::map<ConfigGeneration, size_t> seenGenerations; + Promise<ConfigGeneration> result; + size_t totalRepliesReceived{ 0 }; + size_t maxAgreement{ 0 }; + size_t size{ 0 }; + Optional<Version> lastSeenLiveVersion; + + ACTOR static Future<Void> handleReplyActor(GetGenerationQuorum* self, + Future<ConfigTransactionGetGenerationReply> replyFuture) { + ConfigTransactionGetGenerationReply reply = wait(replyFuture); + ++self->totalRepliesReceived; + auto gen = reply.generation; + self->lastSeenLiveVersion = std::max(gen.liveVersion, self->lastSeenLiveVersion.orDefault(::invalidVersion)); + auto& count = self->seenGenerations[gen]; + ++count; + self->maxAgreement = std::max(count, self->maxAgreement); + if (count == self->size / 2 + 1) { + self->result.send(gen); // self may be destroyed here + } else if (self->maxAgreement + (self->size - self->totalRepliesReceived) < (self->size / 2 + 1)) { + self->result.sendError(failed_to_reach_quorum()); + } + return Void(); + } + +public: + GetGenerationQuorum(size_t size, Optional<Version> const& lastSeenLiveVersion) + : size(size), lastSeenLiveVersion(lastSeenLiveVersion) { + futures.reserve(size); + } + void addReplyCallback(Future<ConfigTransactionGetGenerationReply> replyFuture) { + futures.push_back(handleReplyActor(this, replyFuture)); + } + Future<ConfigGeneration> getGeneration() const { return result.getFuture(); } + Optional<Version> getLastSeenLiveVersion() const { return lastSeenLiveVersion; } +}; + +} // namespace + class PaxosConfigTransactionImpl { ConfigTransactionCommitRequest toCommit; Future<ConfigGeneration> getGenerationFuture; std::vector<ConfigTransactionInterface> ctis; int numRetries{ 0 }; bool committed{ false }; + Optional<Version> lastSeenLiveVersion; Optional<UID> dID; Database cx; ACTOR static Future<ConfigGeneration> getGeneration(PaxosConfigTransactionImpl* self) { - state std::vector<Future<ConfigTransactionGetGenerationReply>> getGenerationFutures; - getGenerationFutures.reserve(self->ctis.size()); - for (auto const& cti : self->ctis) { - getGenerationFutures.push_back(cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{})); + state GetGenerationQuorum quorum(self->ctis.size(), self->lastSeenLiveVersion); + state int retries = 0; + loop { + for (auto const& cti : self->ctis) { + quorum.addReplyCallback( + cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion })); + } + try { + state ConfigGeneration gen = wait(quorum.getGeneration()); + wait(delay(0.0)); // Let reply callback actors finish before destructing quorum + return gen; + } catch (Error& e) { + if (e.code() == error_code_failed_to_reach_quorum) { + TEST(true); // Failed to reach quorum getting generation + wait(delayJittered(0.01 * (1 << retries))); + ++retries; + } else { + throw e; + } + } + self->lastSeenLiveVersion = quorum.getLastSeenLiveVersion(); } - // FIXME: Must tolerate failures and disagreement - wait(waitForAll(getGenerationFutures)); - return getGenerationFutures[0].get().generation; } ACTOR static Future<Optional<Value>> get(PaxosConfigTransactionImpl* self, Key key) { @@ -48,7 +105,7 @@ class PaxosConfigTransactionImpl { } state ConfigKey configKey = ConfigKey::decodeKey(key); ConfigGeneration generation = wait(self->getGenerationFuture); - // TODO: Load balance + // TODO: Load balance, and only send request to replicas that we have gotten the current generation from ConfigTransactionGetReply reply = wait(self->ctis[0].get.getReply(ConfigTransactionGetRequest{ generation, configKey })); if (reply.value.present()) { diff --git a/fdbserver/ConfigNode.actor.cpp b/fdbserver/ConfigNode.actor.cpp index af452eb242..99cee322b0 100644 --- a/fdbserver/ConfigNode.actor.cpp +++ b/fdbserver/ConfigNode.actor.cpp @@ -203,6 +203,10 @@ class ConfigNodeImpl { ACTOR static Future<Void> getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) { state ConfigGeneration generation = wait(getGeneration(self)); ++generation.liveVersion; + if (req.lastSeenLiveVersion.present()) { + TEST(req.lastSeenLiveVersion.get() >= generation.liveVersion); // Node is lagging behind some other node + generation.liveVersion = std::max(generation.liveVersion, req.lastSeenLiveVersion.get() + 1); + } self->kvStore->set(KeyValueRef(currentGenerationKey, BinaryWriter::toValue(generation, IncludeVersion()))); wait(self->kvStore->commit()); req.reply.send(ConfigTransactionGetGenerationReply{ generation }); diff --git a/fdbserver/workloads/ConfigIncrement.actor.cpp b/fdbserver/workloads/ConfigIncrement.actor.cpp index 01ee092a78..27cba61dcb 100644 --- a/fdbserver/workloads/ConfigIncrement.actor.cpp +++ b/fdbserver/workloads/ConfigIncrement.actor.cpp @@ -66,13 +66,16 @@ class ConfigIncrementWorkload : public TestWorkload { try { state Reference<ISingleThreadTransaction> tr = self->getTransaction(cx); state int currentValue = wait(get(tr)); + ASSERT_GE(currentValue, self->lastKnownValue); set(tr, currentValue + 1); wait(delay(deterministicRandom()->random01() * 2 * self->meanSleepWithinTransactions)); wait(tr->commit()); ASSERT_GT(tr->getCommittedVersion(), self->lastKnownCommittedVersion); - ASSERT_GE(currentValue, self->lastKnownValue); self->lastKnownCommittedVersion = tr->getCommittedVersion(); self->lastKnownValue = currentValue + 1; + TraceEvent("ConfigIncrementSucceeded") + .detail("CommittedVersion", self->lastKnownCommittedVersion) + .detail("CommittedValue", self->lastKnownValue); ++self->transactions; ++trsComplete; wait(delay(deterministicRandom()->random01() * 2 * self->meanSleepBetweenTransactions)); diff --git a/flow/error_definitions.h b/flow/error_definitions.h index b69801cfd7..b77f622e48 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -77,6 +77,7 @@ ERROR( dd_not_found, 1053, "Data distributor not found") ERROR( wrong_connection_file, 1054, "Connection file mismatch") ERROR( version_already_compacted, 1055, "The requested changes have been compacted away") ERROR( local_config_changed, 1056, "Local configuration file has changed. Restart and apply these changes" ) +ERROR( failed_to_reach_quorum, 1057, "Failed to reach quorum from configuration database nodes. Retry sending these requests" ) ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )