Ensure change coordinators request came from same generation proxy

There was a rare but possible issue where a change coordinators request
could be in flight, but reading `\xff/coordinators` would return the old
set of coordinators, even though a previous attempt to write it returned
`commit_unknown_result`. This happened if there was an unrelated
recovery between setting `\xff/coordinators` and the updated
coordinators reaching each machine. The `commit_unknown_result` caused
by this recovery meant the change coordinators retry loop would read
`\xff/coordinators`, but since the request was ongoing it would read the
old set of coordinators. The fix is to have the cluster controller
ignore any change coordinators request from an old generation, meaning
when the retry loop reads the old coordiantors from `\xff/coordinators`,
it is guaranteed that the in progress change coordinators message will
be rejected when arriving at the cluster controller.
This commit is contained in:
Lukas Joswiak 2022-06-20 10:11:37 -07:00
parent bc62b538b1
commit c357a90b30
4 changed files with 36 additions and 3 deletions

View File

@ -804,6 +804,8 @@ ACTOR Future<Optional<ClusterConnectionString>> getConnectionString(Database cx)
}
}
static std::vector<std::string> connectionStrings;
namespace {
ACTOR Future<Optional<ClusterConnectionString>> getClusterConnectionStringFromStorageServer(Transaction* tr) {
@ -821,6 +823,19 @@ ACTOR Future<Optional<ClusterConnectionString>> getClusterConnectionStringFromSt
Version readVersion = wait(tr->getReadVersion());
state Optional<Value> currentKey = wait(tr->get(coordinatorsKey));
if (g_network->isSimulated() && currentKey.present()) {
// If the change coordinators request succeeded, the coordinators
// should have changed to the connection string of the most
// recently issued request. If instead the connection string is
// equal to one of the previously issued requests, there is a bug
// and we are breaking the promises we make with
// commit_unknown_result (the transaction must no longer be in
// progress when receiving this error).
int n = connectionStrings.size() > 0 ? connectionStrings.size() - 1 : 0; // avoid underflow
for (int i = 0; i < n; ++i) {
ASSERT(currentKey.get() != connectionStrings.at(i));
}
}
if (!currentKey.present()) {
// Someone deleted this key entirely?
@ -879,10 +894,12 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
std::sort(old.hostnames.begin(), old.hostnames.end());
std::sort(old.coords.begin(), old.coords.end());
if (conn->hostnames == old.hostnames && conn->coords == old.coords && old.clusterKeyName() == newName) {
connectionStrings.clear();
return CoordinatorsResult::SAME_NETWORK_ADDRESSES;
}
conn->parseKey(newName + ':' + deterministicRandom()->randomAlphaNumeric(32));
connectionStrings.push_back(conn->toString());
if (g_network->isSimulated()) {
int i = 0;

View File

@ -521,6 +521,19 @@ ACTOR Future<Void> changeCoordinators(Reference<ClusterRecoveryData> self) {
TraceEvent("ChangeCoordinators", self->dbgid).log();
++self->changeCoordinatorsRequests;
state ChangeCoordinatorsRequest changeCoordinatorsRequest = req;
if (self->masterInterface.id() != changeCoordinatorsRequest.masterId) {
// Make sure the request is coming from a proxy from the same
// generation. If not, throw coordinators_changed - this is OK
// because the client will still receive commit_unknown_result, and
// will retry the request. This check is necessary because
// otherwise in rare circumstances where a recovery occurs between
// the change coordinators request from the client and the cstate
// actually being moved, the client may think the change
// coordinators command failed when it is still in progress. So we
// preempt the issue here and force failure if the generations
// don't match.
throw coordinators_changed();
}
// Kill cluster controller to facilitate coordinator registration update
if (self->controllerData->shouldCommitSuicide) {

View File

@ -1138,7 +1138,8 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
if (!self->isMyFirstBatch &&
pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get() != self->oldCoordinators.get()) {
wait(brokenPromiseToNever(pProxyCommitData->db->get().clusterInterface.changeCoordinators.getReply(
ChangeCoordinatorsRequest(pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get()))));
ChangeCoordinatorsRequest(pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get(),
self->pProxyCommitData->master.id()))));
ASSERT(false); // ChangeCoordinatorsRequest should always throw
}

View File

@ -83,13 +83,15 @@ struct ChangeCoordinatorsRequest {
constexpr static FileIdentifier file_identifier = 13605416;
Standalone<StringRef> newConnectionString;
ReplyPromise<Void> reply; // normally throws even on success!
UID masterId;
ChangeCoordinatorsRequest() {}
ChangeCoordinatorsRequest(Standalone<StringRef> newConnectionString) : newConnectionString(newConnectionString) {}
ChangeCoordinatorsRequest(Standalone<StringRef> newConnectionString, UID masterId)
: newConnectionString(newConnectionString), masterId(masterId) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, newConnectionString, reply);
serializer(ar, newConnectionString, reply, masterId);
}
};