From 2a59c5fd4ecdd3b09c8f5d3be2ef7128212678c3 Mon Sep 17 00:00:00 2001 From: Renxuan Wang Date: Thu, 24 Mar 2022 19:20:42 -0700 Subject: [PATCH] Workers should monitor coordinators in submitCandidacy(). (#6655) * Workers should monitor coordinators in submitCandidacy(). * Change re-resolve delay to a knob. --- fdbclient/ClientKnobs.cpp | 1 + fdbclient/ClientKnobs.h | 1 + fdbclient/MonitorLeader.actor.cpp | 7 +- fdbserver/ClusterController.actor.cpp | 19 ++--- fdbserver/LeaderElection.actor.cpp | 114 ++++++++++++++++++-------- fdbserver/LeaderElection.h | 8 +- fdbserver/worker.actor.cpp | 6 +- 7 files changed, 102 insertions(+), 54 deletions(-) diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 5f098ffa82..114d5ed1a6 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -50,6 +50,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( MAX_GENERATIONS_OVERRIDE, 0 ); init( MAX_GENERATIONS_SIM, 50 ); //Disable network connections after this many generations in simulation, should be less than RECOVERY_DELAY_START_GENERATION + init( COORDINATOR_HOSTNAME_RESOLVE_DELAY, 0.05 ); init( COORDINATOR_RECONNECTION_DELAY, 1.0 ); init( CLIENT_EXAMPLE_AMOUNT, 20 ); init( MAX_CLIENT_STATUS_AGE, 1.0 ); diff --git a/fdbclient/ClientKnobs.h b/fdbclient/ClientKnobs.h index 8d6afa6d7b..7d40dfee49 100644 --- a/fdbclient/ClientKnobs.h +++ b/fdbclient/ClientKnobs.h @@ -49,6 +49,7 @@ public: double MAX_GENERATIONS_OVERRIDE; double MAX_GENERATIONS_SIM; + double COORDINATOR_HOSTNAME_RESOLVE_DELAY; double COORDINATOR_RECONNECTION_DELAY; int CLIENT_EXAMPLE_AMOUNT; double MAX_CLIENT_STATUS_AGE; diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index 509953599b..3440822ec2 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -169,7 +169,7 @@ void ClusterConnectionString::resolveHostnamesBlocking() { } void ClusterConnectionString::resetToUnresolved() { - if (hostnames.size() > 0) { + if (status == RESOLVED && hostnames.size() > 0) { coords.clear(); hostnames.clear(); networkAddressToHostname.clear(); @@ -558,8 +558,8 @@ ACTOR Future monitorNominee(Key key, .detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname") .detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString()); if (rep.getError().code() == error_code_request_maybe_delivered) { - // 50 milliseconds delay to prevent tight resolving loop due to outdated DNS cache - wait(delay(0.05)); + // Delay to prevent tight resolving loop due to outdated DNS cache + wait(delay(CLIENT_KNOBS->COORDINATOR_HOSTNAME_RESOLVE_DELAY)); throw coordinators_changed(); } else { throw rep.getError(); @@ -589,7 +589,6 @@ ACTOR Future monitorNominee(Key key, if (li.present() && li.get().forward) wait(Future(Never())); - wait(Future(Void())); } } } diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index b51b2182db..8d44c257f6 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -2475,11 +2475,12 @@ ACTOR Future workerHealthMonitor(ClusterControllerData* self) { } } -ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, +ACTOR Future clusterControllerCore(Reference connRecord, + ClusterControllerFullInterface interf, Future leaderFail, - ServerCoordinators coordinators, LocalityData locality, ConfigDBType configDBType) { + state ServerCoordinators coordinators(connRecord); state ClusterControllerData self(interf, locality, coordinators); state ConfigBroadcaster configBroadcaster(coordinators, configDBType); state Future coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY); @@ -2612,7 +2613,7 @@ ACTOR Future replaceInterface(ClusterControllerFullInterface interf) { } } -ACTOR Future clusterController(ServerCoordinators coordinators, +ACTOR Future clusterController(Reference connRecord, Reference>> currentCC, bool hasConnected, Reference> asyncPriorityInfo, @@ -2623,9 +2624,10 @@ ACTOR Future clusterController(ServerCoordinators coordinators, state bool inRole = false; cci.initEndpoints(); try { + wait(connRecord->resolveHostnames()); // Register as a possible leader; wait to be elected state Future leaderFail = - tryBecomeLeader(coordinators, cci, currentCC, hasConnected, asyncPriorityInfo); + tryBecomeLeader(connRecord, cci, currentCC, hasConnected, asyncPriorityInfo); state Future shouldReplace = replaceInterface(cci); while (!currentCC->get().present() || currentCC->get().get() != cci) { @@ -2644,7 +2646,7 @@ ACTOR Future clusterController(ServerCoordinators coordinators, startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID()); inRole = true; - wait(clusterControllerCore(cci, leaderFail, coordinators, locality, configDBType)); + wait(clusterControllerCore(connRecord, cci, leaderFail, locality, configDBType)); } } catch (Error& e) { if (inRole) @@ -2673,15 +2675,12 @@ ACTOR Future clusterController(Reference connRec state bool hasConnected = false; loop { try { - wait(connRecord->resolveHostnames()); - ServerCoordinators coordinators(connRecord); - wait(clusterController(coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType)); + wait(clusterController(connRecord, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType)); + hasConnected = true; } catch (Error& e) { if (e.code() != error_code_coordinators_changed) throw; // Expected to terminate fdbserver } - - hasConnected = true; } } diff --git a/fdbserver/LeaderElection.actor.cpp b/fdbserver/LeaderElection.actor.cpp index 1c968a515c..9d677b1dac 100644 --- a/fdbserver/LeaderElection.actor.cpp +++ b/fdbserver/LeaderElection.actor.cpp @@ -25,28 +25,56 @@ #include "fdbclient/MonitorLeader.h" #include "flow/actorcompiler.h" // This must be the last #include. +// Keep trying to become a leader by submitting itself to all coordinators. +// Monitor the health of all coordinators at the same time. +// Note: for coordinators whose NetworkAddress is parsed out of a hostname, a connection failure will cause this actor +// to throw `coordinators_changed()` error ACTOR Future submitCandidacy(Key key, LeaderElectionRegInterface coord, LeaderInfo myInfo, UID prevChangeID, - Reference>>> nominees, - int index) { + AsyncTrigger* nomineeChange, + Optional* nominee, + Optional hostname = Optional()) { loop { - auto const& nom = nominees->get()[index]; - Optional li = wait( - retryBrokenPromise(coord.candidacy, - CandidacyRequest(key, myInfo, nom.present() ? nom.get().changeID : UID(), prevChangeID), - TaskPriority::CoordinationReply)); + state Optional li; - if (li != nominees->get()[index]) { - std::vector> v = nominees->get(); - v[index] = li; - nominees->set(v); + if (coord.candidacy.getEndpoint().getPrimaryAddress().fromHostname) { + state ErrorOr> rep = wait(coord.candidacy.tryGetReply( + CandidacyRequest(key, myInfo, nominee->present() ? nominee->get().changeID : UID(), prevChangeID), + TaskPriority::CoordinationReply)); + if (rep.isError()) { + // Connecting to nominee failed, most likely due to connection failed. + TraceEvent("SubmitCandadicyError") + .error(rep.getError()) + .detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname") + .detail("OldAddr", coord.candidacy.getEndpoint().getPrimaryAddress().toString()); + if (rep.getError().code() == error_code_request_maybe_delivered) { + // Delay to prevent tight resolving loop due to outdated DNS cache + wait(delay(CLIENT_KNOBS->COORDINATOR_HOSTNAME_RESOLVE_DELAY)); + throw coordinators_changed(); + } else { + throw rep.getError(); + } + } else if (rep.present()) { + li = rep.get(); + } + } else { + Optional tmp = wait(retryBrokenPromise( + coord.candidacy, + CandidacyRequest(key, myInfo, nominee->present() ? nominee->get().changeID : UID(), prevChangeID), + TaskPriority::CoordinationReply)); + li = tmp; + } + + wait(Future(Void())); // Make sure we weren't cancelled + + if (li != *nominee) { + *nominee = li; + nomineeChange->trigger(); if (li.present() && li.get().forward) wait(Future(Never())); - - wait(Future(Void())); // Make sure we weren't cancelled } } } @@ -84,13 +112,14 @@ ACTOR Future changeLeaderCoordinators(ServerCoordinators coordinators, Val return Void(); } -ACTOR Future tryBecomeLeaderInternal(ServerCoordinators coordinators, +ACTOR Future tryBecomeLeaderInternal(Reference connRecord, Value proposedSerializedInterface, Reference> outSerializedLeader, bool hasConnected, Reference> asyncPriorityInfo) { - state Reference>>> nominees( - new AsyncVar>>()); + state ServerCoordinators coordinators(connRecord); + state AsyncTrigger nomineeChange; + state std::vector> nominees; state LeaderInfo myInfo; state Future candidacies; state bool iAmLeader = false; @@ -105,8 +134,6 @@ ACTOR Future tryBecomeLeaderInternal(ServerCoordinators coordinators, wait(delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY)); } - nominees->set(std::vector>(coordinators.clientLeaderServers.size())); - myInfo.serializedInfo = proposedSerializedInterface; outSerializedLeader->set(Value()); @@ -114,6 +141,9 @@ ACTOR Future tryBecomeLeaderInternal(ServerCoordinators coordinators, (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) ? buggifyDelayedAsyncVar(outSerializedLeader) : Void(); while (!iAmLeader) { + wait(connRecord->resolveHostnames()); + coordinators = ServerCoordinators(connRecord); + nominees.resize(coordinators.leaderElectionServers.size()); state Future badCandidateTimeout; myInfo.changeID = deterministicRandom()->randomUniqueID(); @@ -122,13 +152,25 @@ ACTOR Future tryBecomeLeaderInternal(ServerCoordinators coordinators, std::vector> cand; cand.reserve(coordinators.leaderElectionServers.size()); - for (int i = 0; i < coordinators.leaderElectionServers.size(); i++) - cand.push_back(submitCandidacy( - coordinators.clusterKey, coordinators.leaderElectionServers[i], myInfo, prevChangeID, nominees, i)); + for (int i = 0; i < coordinators.leaderElectionServers.size(); i++) { + Optional hostname; + auto r = connRecord->getConnectionString().networkAddressToHostname.find( + coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress()); + if (r != connRecord->getConnectionString().networkAddressToHostname.end()) { + hostname = r->second; + } + cand.push_back(submitCandidacy(coordinators.clusterKey, + coordinators.leaderElectionServers[i], + myInfo, + prevChangeID, + &nomineeChange, + &nominees[i], + hostname)); + } candidacies = waitForAll(cand); loop { - state Optional> leader = getLeader(nominees->get()); + state Optional> leader = getLeader(nominees); if (leader.present() && leader.get().first.forward) { // These coordinators are forwarded to another set. But before we change our own cluster file, we need // to make sure that a majority of coordinators know that. SOMEDAY: Wait briefly to see if other @@ -172,22 +214,30 @@ ACTOR Future tryBecomeLeaderInternal(ServerCoordinators coordinators, // If more than 2*SERVER_KNOBS->POLLING_FREQUENCY elapses while we are nominated by some coordinator but // there is no leader, we might be breaking the leader election process for someone with better // communications but lower ID, so change IDs. - if ((!leader.present() || !leader.get().second) && - std::count(nominees->get().begin(), nominees->get().end(), myInfo)) { + if ((!leader.present() || !leader.get().second) && std::count(nominees.begin(), nominees.end(), myInfo)) { if (!badCandidateTimeout.isValid()) badCandidateTimeout = delay(SERVER_KNOBS->POLLING_FREQUENCY * 2, TaskPriority::CoordinationReply); } else badCandidateTimeout = Future(); - choose { - when(wait(nominees->onChange())) {} - when(wait(badCandidateTimeout.isValid() ? badCandidateTimeout : Never())) { - TEST(true); // Bad candidate timeout - TraceEvent("LeaderBadCandidateTimeout", myInfo.changeID).log(); - break; + try { + choose { + when(wait(nomineeChange.onTrigger())) {} + when(wait(badCandidateTimeout.isValid() ? badCandidateTimeout : Never())) { + TEST(true); // Bad candidate timeout + TraceEvent("LeaderBadCandidateTimeout", myInfo.changeID).log(); + break; + } + when(wait(candidacies)) { ASSERT(false); } + when(wait(asyncPriorityInfo->onChange())) { break; } + } + } catch (Error& e) { + if (e.code() == error_code_coordinators_changed) { + connRecord->getConnectionString().resetToUnresolved(); + break; + } else { + throw e; } - when(wait(candidacies)) { ASSERT(false); } - when(wait(asyncPriorityInfo->onChange())) { break; } } } diff --git a/fdbserver/LeaderElection.h b/fdbserver/LeaderElection.h index ad5d959ce5..9639116ec5 100644 --- a/fdbserver/LeaderElection.h +++ b/fdbserver/LeaderElection.h @@ -37,7 +37,7 @@ class ServerCoordinators; // eventually be set. If the return value is cancelled, the candidacy or leadership of the proposedInterface // will eventually end. template -Future tryBecomeLeader(ServerCoordinators const& coordinators, +Future tryBecomeLeader(Reference const& connRecord, LeaderInterface const& proposedInterface, Reference>> const& outKnownLeader, bool hasConnected, @@ -50,20 +50,20 @@ Future changeLeaderCoordinators(ServerCoordinators const& coordinators, Va #pragma region Implementation #endif // __INTEL_COMPILER -Future tryBecomeLeaderInternal(ServerCoordinators const& coordinators, +Future tryBecomeLeaderInternal(Reference const& connRecord, Value const& proposedSerializedInterface, Reference> const& outSerializedLeader, bool const& hasConnected, Reference> const& asyncPriorityInfo); template -Future tryBecomeLeader(ServerCoordinators const& coordinators, +Future tryBecomeLeader(Reference const& connRecord, LeaderInterface const& proposedInterface, Reference>> const& outKnownLeader, bool hasConnected, Reference> const& asyncPriorityInfo) { auto serializedInfo = makeReference>(); - Future m = tryBecomeLeaderInternal(coordinators, + Future m = tryBecomeLeaderInternal(connRecord, ObjectWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 6863ec39c6..0cc0faa57d 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -2726,8 +2726,6 @@ ACTOR Future fdbd(Reference connRecord, actors.push_back(serveProcess()); try { - wait(connRecord->resolveHostnames()); - ServerCoordinators coordinators(connRecord); if (g_network->isSimulated()) { whitelistBinPaths = ",, random_path, /bin/snap_create.sh,,"; } @@ -2745,8 +2743,8 @@ ACTOR Future fdbd(Reference connRecord, if (coordFolder.size()) { // SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up // their files - actors.push_back(fileNotFoundToNever( - coordinationServer(coordFolder, coordinators.ccr, configNode, configBroadcastInterface))); + actors.push_back( + fileNotFoundToNever(coordinationServer(coordFolder, connRecord, configNode, configBroadcastInterface))); } state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));