From e40cc8722cad80e93eeb600cfaf6cd95d263d12b Mon Sep 17 00:00:00 2001 From: Renxuan Wang Date: Wed, 20 Apr 2022 13:42:46 -0700 Subject: [PATCH] A few hostname improvements. (#6825) * Add tryResolveHostnames() in connection string. * Add missing hostname to related interfaces. * Do not pass RequestStream into *GetReplyFromHostname() functions. Because we are using new RequestStream for each request anyways. Also, the passed in pointer could be nullptr, which results in seg faults. * Add dynamic hostname resolve and reconnect intervals. * Address comments. --- fdbclient/ConfigTransactionInterface.cpp | 6 ++- fdbclient/ConfigTransactionInterface.h | 5 ++- fdbclient/CoordinationInterface.h | 4 ++ fdbclient/MonitorLeader.actor.cpp | 41 +++++++++++++------- fdbrpc/genericactors.actor.h | 38 +++++++++---------- fdbserver/ClusterController.actor.cpp | 48 ++++++++++-------------- fdbserver/Coordination.actor.cpp | 4 +- fdbserver/CoordinationInterface.h | 10 ++--- fdbserver/LeaderElection.actor.cpp | 2 +- flow/Hostname.actor.cpp | 4 +- flow/Hostname.h | 6 +++ flow/Knobs.cpp | 5 ++- flow/Knobs.h | 5 ++- 13 files changed, 102 insertions(+), 76 deletions(-) diff --git a/fdbclient/ConfigTransactionInterface.cpp b/fdbclient/ConfigTransactionInterface.cpp index fab26f3498..1f053a0052 100644 --- a/fdbclient/ConfigTransactionInterface.cpp +++ b/fdbclient/ConfigTransactionInterface.cpp @@ -34,12 +34,16 @@ void ConfigTransactionInterface::setupWellKnownEndpoints() { } ConfigTransactionInterface::ConfigTransactionInterface(NetworkAddress const& remote) - : getGeneration(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETGENERATION)), + : _id(deterministicRandom()->randomUniqueID()), + getGeneration(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETGENERATION)), get(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GET)), getClasses(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETCLASSES)), getKnobs(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETKNOBS)), commit(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_COMMIT)) {} +ConfigTransactionInterface::ConfigTransactionInterface(Hostname const& remote) + : _id(deterministicRandom()->randomUniqueID()), hostname(remote) {} + bool ConfigTransactionInterface::operator==(ConfigTransactionInterface const& rhs) const { return _id == rhs._id; } diff --git a/fdbclient/ConfigTransactionInterface.h b/fdbclient/ConfigTransactionInterface.h index 7489594ad0..98b65e4c4b 100644 --- a/fdbclient/ConfigTransactionInterface.h +++ b/fdbclient/ConfigTransactionInterface.h @@ -200,9 +200,12 @@ public: class RequestStream getKnobs; class RequestStream commit; + Optional hostname; + ConfigTransactionInterface(); void setupWellKnownEndpoints(); ConfigTransactionInterface(NetworkAddress const& remote); + ConfigTransactionInterface(Hostname const& remote); bool operator==(ConfigTransactionInterface const& rhs) const; bool operator!=(ConfigTransactionInterface const& rhs) const; @@ -210,6 +213,6 @@ public: template void serialize(Ar& ar) { - serializer(ar, getGeneration, get, getClasses, getKnobs, commit); + serializer(ar, getGeneration, get, getClasses, getKnobs, commit, hostname); } }; diff --git a/fdbclient/CoordinationInterface.h b/fdbclient/CoordinationInterface.h index c8f252088c..3f2f8bd385 100644 --- a/fdbclient/CoordinationInterface.h +++ b/fdbclient/CoordinationInterface.h @@ -104,6 +104,10 @@ public: ConnectionStringStatus status = RESOLVED; AsyncTrigger resolveFinish; + // This function tries to resolve all hostnames once, and return them with coords. + // Best effort, does not guarantee that the resolves succeed. + Future> tryResolveHostnames(); + std::vector coords; std::vector hostnames; std::unordered_map networkAddressToHostname; diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index 678c47eba0..0b80e5076c 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -364,6 +364,29 @@ TEST_CASE("/fdbclient/MonitorLeader/ConnectionString") { return Void(); } +ACTOR Future> tryResolveHostnamesImpl(ClusterConnectionString* self) { + state std::set allCoordinatorsSet; + std::vector> fs; + for (auto& hostname : self->hostnames) { + fs.push_back(map(hostname.resolve(), [&](Optional const& addr) -> Void { + if (addr.present()) { + allCoordinatorsSet.insert(addr.get()); + } + return Void(); + })); + } + wait(waitForAll(fs)); + for (const auto& coord : self->coords) { + allCoordinatorsSet.insert(coord); + } + std::vector allCoordinators(allCoordinatorsSet.begin(), allCoordinatorsSet.end()); + return allCoordinators; +} + +Future> ClusterConnectionString::tryResolveHostnames() { + return tryResolveHostnamesImpl(this); +} + TEST_CASE("/fdbclient/MonitorLeader/PartialResolve") { std::string connectionString = "TestCluster:0@host.name:1234,host-name:5678"; std::string hn = "host-name", port = "5678"; @@ -373,19 +396,9 @@ TEST_CASE("/fdbclient/MonitorLeader/PartialResolve") { INetworkConnections::net()->addMockTCPEndpoint(hn, port, { address }); state ClusterConnectionString cs(connectionString); - - state std::unordered_set coordinatorAddresses; - std::vector> fs; - for (auto& hostname : cs.hostnames) { - fs.push_back(map(hostname.resolve(), [&](Optional const& addr) -> Void { - if (addr.present()) { - coordinatorAddresses.insert(addr.get()); - } - return Void(); - })); - } - wait(waitForAll(fs)); - ASSERT(coordinatorAddresses.size() == 1 && coordinatorAddresses.count(address) == 1); + state std::vector allCoordinators = wait(cs.tryResolveHostnames()); + ASSERT(allCoordinators.size() == 1 && + std::find(allCoordinators.begin(), allCoordinators.end(), address) != allCoordinators.end()); return Void(); } @@ -585,7 +598,7 @@ ACTOR Future monitorNominee(Key key, .detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString()); if (rep.getError().code() == error_code_request_maybe_delivered) { // Delay to prevent tight resolving loop due to outdated DNS cache - wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY)); + wait(delay(FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL)); throw coordinators_changed(); } else { throw rep.getError(); diff --git a/fdbrpc/genericactors.actor.h b/fdbrpc/genericactors.actor.h index 6226676ae1..955d3f913e 100644 --- a/fdbrpc/genericactors.actor.h +++ b/fdbrpc/genericactors.actor.h @@ -73,10 +73,7 @@ Future retryBrokenPromise(RequestStream to, Req request } ACTOR template -Future> tryGetReplyFromHostname(RequestStream* to, - Req request, - Hostname hostname, - WellKnownEndpoints token) { +Future> tryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token) { // A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname. // If resolving fails, return lookup_failed(). // Otherwise, return tryGetReply(request). @@ -84,8 +81,8 @@ Future> tryGetReplyFromHostname(RequestStream* to, if (!address.present()) { return ErrorOr(lookup_failed()); } - *to = RequestStream(Endpoint::wellKnown({ address.get() }, token)); - ErrorOr reply = wait(to->tryGetReply(request)); + RequestStream to(Endpoint::wellKnown({ address.get() }, token)); + state ErrorOr reply = wait(to.tryGetReply(request)); if (reply.isError()) { resetReply(request); if (reply.getError().code() == error_code_request_maybe_delivered) { @@ -98,8 +95,7 @@ Future> tryGetReplyFromHostname(RequestStream* to, } ACTOR template -Future> tryGetReplyFromHostname(RequestStream* to, - Req request, +Future> tryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token, TaskPriority taskID) { @@ -110,8 +106,8 @@ Future> tryGetReplyFromHostname(RequestStream* to, if (!address.present()) { return ErrorOr(lookup_failed()); } - *to = RequestStream(Endpoint::wellKnown({ address.get() }, token)); - ErrorOr reply = wait(to->tryGetReply(request, taskID)); + RequestStream to(Endpoint::wellKnown({ address.get() }, token)); + state ErrorOr reply = wait(to.tryGetReply(request, taskID)); if (reply.isError()) { resetReply(request); if (reply.getError().code() == error_code_request_maybe_delivered) { @@ -124,21 +120,21 @@ Future> tryGetReplyFromHostname(RequestStream* to, } ACTOR template -Future retryGetReplyFromHostname(RequestStream* to, - Req request, - Hostname hostname, - WellKnownEndpoints token) { +Future retryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token) { // Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname. // Suitable for use with hostname, where RequestStream is NOT initialized yet. // Not normally useful for endpoints initialized with NetworkAddress. + state double reconnetInterval = FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL; loop { NetworkAddress address = wait(hostname.resolveWithRetry()); - *to = RequestStream(Endpoint::wellKnown({ address }, token)); - ErrorOr reply = wait(to->tryGetReply(request)); + RequestStream to(Endpoint::wellKnown({ address }, token)); + state ErrorOr reply = wait(to.tryGetReply(request)); if (reply.isError()) { resetReply(request); if (reply.getError().code() == error_code_request_maybe_delivered) { // Connection failure. + wait(delay(reconnetInterval)); + reconnetInterval = std::min(2 * reconnetInterval, FLOW_KNOBS->HOSTNAME_RECONNECT_MAX_INTERVAL); hostname.resetToUnresolved(); INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service); } else { @@ -151,22 +147,24 @@ Future retryGetReplyFromHostname(RequestStream* to, } ACTOR template -Future retryGetReplyFromHostname(RequestStream* to, - Req request, +Future retryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token, TaskPriority taskID) { // Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname. // Suitable for use with hostname, where RequestStream is NOT initialized yet. // Not normally useful for endpoints initialized with NetworkAddress. + state double reconnetInterval = FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL; loop { NetworkAddress address = wait(hostname.resolveWithRetry()); - *to = RequestStream(Endpoint::wellKnown({ address }, token)); - ErrorOr reply = wait(to->tryGetReply(request, taskID)); + RequestStream to(Endpoint::wellKnown({ address }, token)); + state ErrorOr reply = wait(to.tryGetReply(request, taskID)); if (reply.isError()) { resetReply(request); if (reply.getError().code() == error_code_request_maybe_delivered) { // Connection failure. + wait(delay(reconnetInterval)); + reconnetInterval = std::min(2 * reconnetInterval, FLOW_KNOBS->HOSTNAME_RECONNECT_MAX_INTERVAL); hostname.resetToUnresolved(); INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service); } else { diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 21e42a5f5c..a8160f82a5 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1097,18 +1097,24 @@ void haltRegisteringOrCurrentSingleton(ClusterControllerData* self, } } -void registerWorker(RegisterWorkerRequest req, - ClusterControllerData* self, - std::unordered_set coordinatorAddresses, - ConfigBroadcaster* configBroadcaster) { +ACTOR Future registerWorker(RegisterWorkerRequest req, + ClusterControllerData* self, + ClusterConnectionString cs, + ConfigBroadcaster* configBroadcaster) { + std::vector coordinatorAddresses = wait(cs.tryResolveHostnames()); + const WorkerInterface& w = req.wi; ProcessClass newProcessClass = req.processClass; auto info = self->id_worker.find(w.locality.processId()); ClusterControllerPriorityInfo newPriorityInfo = req.priorityInfo; newPriorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController); + bool isCoordinator = - (coordinatorAddresses.count(req.wi.address()) > 0) || - (req.wi.secondaryAddress().present() && coordinatorAddresses.count(req.wi.secondaryAddress().get()) > 0); + (std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), req.wi.address()) != + coordinatorAddresses.end()) || + (req.wi.secondaryAddress().present() && + std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), req.wi.secondaryAddress().get()) != + coordinatorAddresses.end()); for (auto it : req.incompatiblePeers) { self->db.incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL; @@ -1271,6 +1277,8 @@ void registerWorker(RegisterWorkerRequest req, if (!req.reply.isSet() && newPriorityInfo != req.priorityInfo) { req.reply.send(RegisterWorkerReply(newProcessClass, newPriorityInfo)); } + + return Void(); } #define TIME_KEEPER_VERSION LiteralStringRef("1") @@ -2543,30 +2551,12 @@ ACTOR Future clusterControllerCore(Reference con when(RecruitBlobWorkerRequest req = waitNext(interf.recruitBlobWorker.getFuture())) { clusterRecruitBlobWorker(&self, req); } - when(state RegisterWorkerRequest req = waitNext(interf.registerWorker.getFuture())) { + when(RegisterWorkerRequest req = waitNext(interf.registerWorker.getFuture())) { ++self.registerWorkerRequests; - state ClusterConnectionString ccs = coordinators.ccr->getConnectionString(); - - state std::unordered_set coordinatorAddresses; - std::vector> fs; - for (auto& hostname : ccs.hostnames) { - fs.push_back(map(hostname.resolve(), [&](Optional const& addr) -> Void { - if (addr.present()) { - coordinatorAddresses.insert(addr.get()); - } - return Void(); - })); - } - wait(waitForAll(fs)); - - for (const auto& coord : ccs.coordinators()) { - coordinatorAddresses.insert(coord); - } - - registerWorker(req, - &self, - coordinatorAddresses, - (configDBType == ConfigDBType::DISABLED) ? nullptr : &configBroadcaster); + self.addActor.send(registerWorker(req, + &self, + coordinators.ccr->getConnectionString(), + (configDBType == ConfigDBType::DISABLED) ? nullptr : &configBroadcaster)); } when(GetWorkersRequest req = waitNext(interf.getWorkers.getFuture())) { ++self.getWorkersRequests; diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index 851007fef7..53086e5516 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -75,7 +75,7 @@ struct GenerationRegVal { } }; -GenerationRegInterface::GenerationRegInterface(NetworkAddress remote) +GenerationRegInterface::GenerationRegInterface(NetworkAddress const& remote) : read(Endpoint::wellKnown({ remote }, WLTOKEN_GENERATIONREG_READ)), write(Endpoint::wellKnown({ remote }, WLTOKEN_GENERATIONREG_WRITE)) {} @@ -84,7 +84,7 @@ GenerationRegInterface::GenerationRegInterface(INetwork* local) { write.makeWellKnownEndpoint(WLTOKEN_GENERATIONREG_WRITE, TaskPriority::Coordination); } -LeaderElectionRegInterface::LeaderElectionRegInterface(NetworkAddress remote) +LeaderElectionRegInterface::LeaderElectionRegInterface(NetworkAddress const& remote) : ClientLeaderRegInterface(remote), candidacy(Endpoint::wellKnown({ remote }, WLTOKEN_LEADERELECTIONREG_CANDIDACY)), electionResult(Endpoint::wellKnown({ remote }, WLTOKEN_LEADERELECTIONREG_ELECTIONRESULT)), leaderHeartbeat(Endpoint::wellKnown({ remote }, WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT)), diff --git a/fdbserver/CoordinationInterface.h b/fdbserver/CoordinationInterface.h index 461904787c..b966678cc6 100644 --- a/fdbserver/CoordinationInterface.h +++ b/fdbserver/CoordinationInterface.h @@ -53,9 +53,9 @@ struct GenerationRegInterface { // the v2 of the previous generation is the v1 of the next. GenerationRegInterface() {} - GenerationRegInterface(NetworkAddress remote); + GenerationRegInterface(NetworkAddress const& remote); GenerationRegInterface(INetwork* local); - GenerationRegInterface(Hostname hostname) : hostname(hostname){}; + GenerationRegInterface(Hostname const& hostname) : hostname(hostname){}; }; struct UniqueGeneration { @@ -128,9 +128,9 @@ struct LeaderElectionRegInterface : ClientLeaderRegInterface { RequestStream forward; LeaderElectionRegInterface() {} - LeaderElectionRegInterface(NetworkAddress remote); + LeaderElectionRegInterface(NetworkAddress const& remote); LeaderElectionRegInterface(INetwork* local); - LeaderElectionRegInterface(Hostname hostname) : ClientLeaderRegInterface(hostname) {} + LeaderElectionRegInterface(Hostname const& hostname) : ClientLeaderRegInterface(hostname) {} }; struct CandidacyRequest { @@ -220,7 +220,7 @@ class ConfigNode; class ServerCoordinators : public ClientCoordinators { public: - explicit ServerCoordinators(Reference); + explicit ServerCoordinators(Reference ccr); std::vector leaderElectionServers; std::vector stateServers; diff --git a/fdbserver/LeaderElection.actor.cpp b/fdbserver/LeaderElection.actor.cpp index 04f7923a1a..272e07c1ec 100644 --- a/fdbserver/LeaderElection.actor.cpp +++ b/fdbserver/LeaderElection.actor.cpp @@ -51,7 +51,7 @@ ACTOR Future submitCandidacy(Key key, .detail("OldAddr", coord.candidacy.getEndpoint().getPrimaryAddress().toString()); if (rep.getError().code() == error_code_request_maybe_delivered) { // Delay to prevent tight resolving loop due to outdated DNS cache - wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY)); + wait(delay(FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL)); throw coordinators_changed(); } else { throw rep.getError(); diff --git a/flow/Hostname.actor.cpp b/flow/Hostname.actor.cpp index 6562f2d484..ab89280a44 100644 --- a/flow/Hostname.actor.cpp +++ b/flow/Hostname.actor.cpp @@ -84,13 +84,15 @@ ACTOR Future> resolveImpl(Hostname* self) { } ACTOR Future resolveWithRetryImpl(Hostname* self) { + state double resolveInterval = FLOW_KNOBS->HOSTNAME_RESOLVE_INIT_INTERVAL; loop { try { Optional address = wait(resolveImpl(self)); if (address.present()) { return address.get(); } - wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY)); + wait(delay(resolveInterval)); + resolveInterval = std::min(2 * resolveInterval, FLOW_KNOBS->HOSTNAME_RESOLVE_MAX_INTERVAL); } catch (Error& e) { ASSERT(e.code() == error_code_actor_cancelled); throw; diff --git a/flow/Hostname.h b/flow/Hostname.h index 0faa4c53df..2492a17370 100644 --- a/flow/Hostname.h +++ b/flow/Hostname.h @@ -74,6 +74,7 @@ struct Hostname { Optional resolvedAddress; enum HostnameStatus { UNRESOLVED, RESOLVING, RESOLVED }; + // The resolve functions below use DNS cache. Future> resolve(); Future resolveWithRetry(); Optional resolveBlocking(); // This one should only be used when resolving asynchronously is @@ -81,6 +82,11 @@ struct Hostname { void resetToUnresolved(); HostnameStatus status = UNRESOLVED; AsyncTrigger resolveFinish; + + template + void serialize(Ar& ar) { + serializer(ar, host, service, isTLS, resolvedAddress, status); + } }; #endif diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 555112c16f..652ff709d2 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -40,7 +40,10 @@ FlowKnobs const* FLOW_KNOBS = &bootstrapGlobalFlowKnobs; void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) { init( AUTOMATIC_TRACE_DUMP, 1 ); init( PREVENT_FAST_SPIN_DELAY, .01 ); - init( HOSTNAME_RESOLVE_DELAY, .05 ); + init( HOSTNAME_RESOLVE_INIT_INTERVAL, .05 ); + init( HOSTNAME_RESOLVE_MAX_INTERVAL, 1.0 ); + init( HOSTNAME_RECONNECT_INIT_INTERVAL, .05 ); + init( HOSTNAME_RECONNECT_MAX_INTERVAL, 1.0 ); init( CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED, 1.0 ); init( DELAY_JITTER_OFFSET, 0.9 ); diff --git a/flow/Knobs.h b/flow/Knobs.h index d2faff94af..4234010c37 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -113,7 +113,10 @@ class FlowKnobs : public KnobsImpl { public: int AUTOMATIC_TRACE_DUMP; double PREVENT_FAST_SPIN_DELAY; - double HOSTNAME_RESOLVE_DELAY; + double HOSTNAME_RESOLVE_INIT_INTERVAL; + double HOSTNAME_RESOLVE_MAX_INTERVAL; + double HOSTNAME_RECONNECT_INIT_INTERVAL; + double HOSTNAME_RECONNECT_MAX_INTERVAL; double CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED; double DELAY_JITTER_OFFSET;