From c69a07a8583b50152859a35a6e2d3f670c3c18e2 Mon Sep 17 00:00:00 2001 From: Renxuan Wang Date: Wed, 27 Apr 2022 21:54:13 -0700 Subject: [PATCH] Check in the new Hostname logic. (#6926) * Revert #6655. 20220407-031010-renxuan-c101052c21da8346 compressed=True data_size=31004844 duration=4310801 ended=100000 fail_fast=10 max_runs=100000 pass=100000 priority=100 remaining=0 runtime=1:04:15 sanity=False started=100047 stopped=20220407-041425 submitted=20220407-031010 timeout=5400 username=renxuan * Revert #6271. 20220407-051532-renxuan-470f0fe6aac1c217 compressed=True data_size=30982370 duration=3491067 ended=100002 fail_fast=10 max_runs=100000 pass=100002 priority=100 remaining=0 runtime=0:59:57 sanity=False started=100141 stopped=20220407-061529 submitted=20220407-051532 timeout=5400 username=renxuan * Revert #6266. Remove resolving-related functionalities in connection string. Connection string will be used for storing purpose only, and non-mutable. 20220407-175119-renxuan-55d30ee1a4b42c2f compressed=True data_size=30970443 duration=5437659 ended=100000 fail_fast=10 max_runs=100000 pass=100000 priority=100 remaining=0 runtime=0:59:31 sanity=False started=100154 stopped=20220407-185050 submitted=20220407-175119 timeout=5400 username=renxuan * Add hostname to coordinator interfaces. * Turn on the new hostname logic. * Add the corresponding change in config txns. The most notable change is before calling basicLoadBalance(), we need to call tryInitializeRequestStream() to initialize request streams first. Passed correctness tests. * Return error when hostnames cannot be resolved in coordinators command. * Minor fixes. --- fdbcli/fdbcli.actor.cpp | 1 - fdbclient/AutoPublicAddress.cpp | 54 +- fdbclient/CoordinationInterface.h | 49 +- fdbclient/ManagementAPI.actor.cpp | 59 ++- fdbclient/ManagementAPI.actor.h | 4 +- fdbclient/MonitorLeader.actor.cpp | 495 +++++++----------- fdbclient/MonitorLeader.h | 4 +- fdbclient/NativeAPI.actor.cpp | 48 +- fdbclient/PaxosConfigTransaction.actor.cpp | 81 ++- fdbclient/SimpleConfigTransaction.actor.cpp | 66 ++- fdbclient/SpecialKeySpace.actor.cpp | 45 +- fdbclient/StatusClient.actor.cpp | 36 +- fdbrpc/genericactors.actor.h | 14 + fdbserver/ClusterController.actor.cpp | 21 +- fdbserver/ClusterRecovery.actor.cpp | 3 +- fdbserver/CoordinatedState.actor.cpp | 25 +- fdbserver/Coordination.actor.cpp | 60 +-- fdbserver/CoordinationInterface.h | 10 +- fdbserver/LeaderElection.actor.cpp | 113 ++-- fdbserver/LeaderElection.h | 8 +- fdbserver/PaxosConfigConsumer.actor.cpp | 64 ++- fdbserver/QuietDatabase.actor.cpp | 5 +- fdbserver/SimpleConfigConsumer.actor.cpp | 47 +- fdbserver/SimulatedCluster.actor.cpp | 4 +- fdbserver/Status.actor.cpp | 17 +- fdbserver/fdbserver.actor.cpp | 23 +- fdbserver/worker.actor.cpp | 61 ++- .../workloads/ConsistencyCheck.actor.cpp | 5 +- .../workloads/RemoveServersSafely.actor.cpp | 7 +- .../SpecialKeySpaceCorrectness.actor.cpp | 17 +- 30 files changed, 764 insertions(+), 682 deletions(-) diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 97827aca6d..5f5f7d25fc 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -1189,7 +1189,6 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { ClusterConnectionFile::lookupClusterFileName(opt.clusterFile); try { ccf = makeReference(resolvedClusterFile.first); - wait(ccf->resolveHostnames()); } catch (Error& e) { if (e.code() == error_code_operation_cancelled) { throw; diff --git a/fdbclient/AutoPublicAddress.cpp b/fdbclient/AutoPublicAddress.cpp index c77414ded3..9b94d89e2c 100644 --- a/fdbclient/AutoPublicAddress.cpp +++ b/fdbclient/AutoPublicAddress.cpp @@ -28,28 +28,46 @@ #include "fdbclient/CoordinationInterface.h" -// Determine public IP address by calling the first coordinator. +// Determine public IP address by calling the first available coordinator. +// If fail connecting all coordinators, throw bind_failed(). IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) { - try { - using namespace boost::asio; + int size = ccs.coordinators().size() + ccs.hostnames.size(); + int index = 0; + loop { + try { + using namespace boost::asio; - io_service ioService; - ip::udp::socket socket(ioService); + io_service ioService; + ip::udp::socket socket(ioService); - ccs.resolveHostnamesBlocking(); - const auto& coordAddr = ccs.coordinators()[0]; - const auto boostIp = coordAddr.ip.isV6() ? ip::address(ip::address_v6(coordAddr.ip.toV6())) - : ip::address(ip::address_v4(coordAddr.ip.toV4())); + NetworkAddress coordAddr; + // Try coords first, because they don't need to be resolved. + if (index < ccs.coordinators().size()) { + coordAddr = ccs.coordinators()[index]; + } else { + Hostname& h = ccs.hostnames[index - ccs.coordinators().size()]; + Optional resolvedAddr = h.resolveBlocking(); + if (!resolvedAddr.present()) { + throw lookup_failed(); + } + coordAddr = resolvedAddr.get(); + } + const auto boostIp = coordAddr.ip.isV6() ? ip::address(ip::address_v6(coordAddr.ip.toV6())) + : ip::address(ip::address_v4(coordAddr.ip.toV4())); - ip::udp::endpoint endpoint(boostIp, coordAddr.port); - socket.connect(endpoint); - IPAddress ip = coordAddr.ip.isV6() ? IPAddress(socket.local_endpoint().address().to_v6().to_bytes()) - : IPAddress(socket.local_endpoint().address().to_v4().to_ulong()); - socket.close(); + ip::udp::endpoint endpoint(boostIp, coordAddr.port); + socket.connect(endpoint); + IPAddress ip = coordAddr.ip.isV6() ? IPAddress(socket.local_endpoint().address().to_v6().to_bytes()) + : IPAddress(socket.local_endpoint().address().to_v4().to_ulong()); + socket.close(); - return ip; - } catch (boost::system::system_error e) { - fprintf(stderr, "Error determining public address: %s\n", e.what()); - throw bind_failed(); + return ip; + } catch (...) { + ++index; + if (index == size) { + fprintf(stderr, "Error determining public address.\n"); + throw bind_failed(); + } + } } } diff --git a/fdbclient/CoordinationInterface.h b/fdbclient/CoordinationInterface.h index 3f2f8bd385..65a18cfa7c 100644 --- a/fdbclient/CoordinationInterface.h +++ b/fdbclient/CoordinationInterface.h @@ -61,61 +61,31 @@ struct ClientLeaderRegInterface { // - There is no address present more than once class ClusterConnectionString { public: - enum ConnectionStringStatus { RESOLVED, RESOLVING, UNRESOLVED }; - ClusterConnectionString() {} - ClusterConnectionString(const std::string& connStr); + ClusterConnectionString(const std::string& connectionString); ClusterConnectionString(const std::vector& coordinators, Key key); ClusterConnectionString(const std::vector& hosts, Key key); - ClusterConnectionString(const ClusterConnectionString& rhs) { operator=(rhs); } - ClusterConnectionString& operator=(const ClusterConnectionString& rhs) { - // Copy everything except AsyncTrigger resolveFinish. - status = rhs.status; - coords = rhs.coords; - hostnames = rhs.hostnames; - networkAddressToHostname = rhs.networkAddressToHostname; - key = rhs.key; - keyDesc = rhs.keyDesc; - connectionString = rhs.connectionString; - return *this; - } - std::vector const& coordinators() const { return coords; } - void addResolved(const Hostname& hostname, const NetworkAddress& address) { - coords.push_back(address); - networkAddressToHostname.emplace(address, hostname); - } Key clusterKey() const { return key; } Key clusterKeyName() const { return keyDesc; } // Returns the "name" or "description" part of the clusterKey (the part before the ':') std::string toString() const; static std::string getErrorString(std::string const& source, Error const& e); - Future resolveHostnames(); - // This one should only be used when resolving asynchronously is impossible. For all other cases, resolveHostnames() - // should be preferred. - void resolveHostnamesBlocking(); - // This function derives the member connectionString from the current key, coordinators and hostnames. - void resetConnectionString(); - void resetToUnresolved(); void parseKey(const std::string& key); - ConnectionStringStatus status = RESOLVED; - AsyncTrigger resolveFinish; // This function tries to resolve all hostnames once, and return them with coords. // Best effort, does not guarantee that the resolves succeed. Future> tryResolveHostnames(); std::vector coords; std::vector hostnames; - std::unordered_map networkAddressToHostname; private: void parseConnString(); Key key, keyDesc; - std::string connectionString; }; FDB_DECLARE_BOOLEAN_PARAM(ConnectionStringNeedsPersisted); @@ -165,12 +135,6 @@ public: // Signals to the connection record that it was successfully used to connect to a cluster. void notifyConnected(); - ClusterConnectionString::ConnectionStringStatus connectionStringStatus() const; - Future resolveHostnames(); - // This one should only be used when resolving asynchronously is impossible. For all other cases, resolveHostnames() - // should be preferred. - void resolveHostnamesBlocking(); - virtual void addref() = 0; virtual void delref() = 0; @@ -275,12 +239,21 @@ struct OpenDatabaseCoordRequest { Standalone> supportedVersions; UID knownClientInfoID; Key clusterKey; + std::vector hostnames; std::vector coordinators; ReplyPromise> reply; template void serialize(Ar& ar) { - serializer(ar, issues, supportedVersions, traceLogGroup, knownClientInfoID, clusterKey, coordinators, reply); + serializer(ar, + issues, + supportedVersions, + traceLogGroup, + knownClientInfoID, + clusterKey, + hostnames, + coordinators, + reply); } }; diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 1fcfbd1bc9..894e2ebd9d 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -782,7 +782,7 @@ ACTOR Future> getWorkers(Database cx) { } } -ACTOR Future> getCoordinators(Database cx) { +ACTOR Future> getConnectionString(Database cx) { state Transaction tr(cx); loop { try { @@ -790,9 +790,8 @@ ACTOR Future> getCoordinators(Database cx) { tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); Optional currentKey = wait(tr.get(coordinatorsKey)); if (!currentKey.present()) - return std::vector(); - - return ClusterConnectionString(currentKey.get().toString()).coordinators(); + return Optional(); + return ClusterConnectionString(currentKey.get().toString()); } catch (Error& e) { wait(tr.onError(e)); } @@ -801,7 +800,7 @@ ACTOR Future> getCoordinators(Database cx) { ACTOR Future> changeQuorumChecker(Transaction* tr, Reference change, - ClusterConnectionString* conn) { + std::vector desiredCoordinators) { tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES); @@ -812,47 +811,45 @@ ACTOR Future> changeQuorumChecker(Transaction* tr, return CoordinatorsResult::BAD_DATABASE_STATE; // Someone deleted this key entirely? state ClusterConnectionString old(currentKey.get().toString()); - wait(old.resolveHostnames()); if (tr->getDatabase()->getConnectionRecord() && old.clusterKeyName().toString() != tr->getDatabase()->getConnectionRecord()->getConnectionString().clusterKeyName()) return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database?? + state std::vector oldCoordinators = wait(old.tryResolveHostnames()); state CoordinatorsResult result = CoordinatorsResult::SUCCESS; - if (!conn->coords.size()) { - std::vector desiredCoordinatorAddresses = wait(change->getDesiredCoordinators( + if (!desiredCoordinators.size()) { + std::vector _desiredCoordinators = wait(change->getDesiredCoordinators( tr, - old.coordinators(), + oldCoordinators, Reference(new ClusterConnectionMemoryRecord(old)), result)); - conn->coords = desiredCoordinatorAddresses; + desiredCoordinators = _desiredCoordinators; } if (result != CoordinatorsResult::SUCCESS) return result; - if (!conn->coordinators().size()) + if (!desiredCoordinators.size()) return CoordinatorsResult::INVALID_NETWORK_ADDRESSES; - std::sort(conn->coords.begin(), conn->coords.end()); - std::sort(conn->hostnames.begin(), conn->hostnames.end()); + std::sort(desiredCoordinators.begin(), desiredCoordinators.end()); std::string newName = change->getDesiredClusterKeyName(); if (newName.empty()) newName = old.clusterKeyName().toString(); - if (old.coordinators() == conn->coordinators() && old.clusterKeyName() == newName) + if (oldCoordinators == desiredCoordinators && old.clusterKeyName() == newName) return CoordinatorsResult::SAME_NETWORK_ADDRESSES; - std::string key(newName + ':' + deterministicRandom()->randomAlphaNumeric(32)); - conn->parseKey(key); - conn->resetConnectionString(); + state ClusterConnectionString conn(desiredCoordinators, + StringRef(newName + ':' + deterministicRandom()->randomAlphaNumeric(32))); if (g_network->isSimulated()) { int i = 0; int protectedCount = 0; - while ((protectedCount < ((conn->coordinators().size() / 2) + 1)) && (i < conn->coordinators().size())) { - auto process = g_simulator.getProcessByAddress(conn->coordinators()[i]); + while ((protectedCount < ((desiredCoordinators.size() / 2) + 1)) && (i < desiredCoordinators.size())) { + auto process = g_simulator.getProcessByAddress(desiredCoordinators[i]); auto addresses = process->addresses; if (!process->isReliable()) { @@ -864,14 +861,14 @@ ACTOR Future> changeQuorumChecker(Transaction* tr, if (addresses.secondaryAddress.present()) { g_simulator.protectedAddresses.insert(process->addresses.secondaryAddress.get()); } - TraceEvent("ProtectCoordinator").detail("Address", conn->coordinators()[i]).backtrace(); + TraceEvent("ProtectCoordinator").detail("Address", desiredCoordinators[i]).backtrace(); protectedCount++; i++; } } std::vector>> leaderServers; - ClientCoordinators coord(Reference(new ClusterConnectionMemoryRecord(*conn))); + ClientCoordinators coord(Reference(new ClusterConnectionMemoryRecord(conn))); leaderServers.reserve(coord.clientLeaderServers.size()); for (int i = 0; i < coord.clientLeaderServers.size(); i++) @@ -883,7 +880,7 @@ ACTOR Future> changeQuorumChecker(Transaction* tr, when(wait(waitForAll(leaderServers))) {} when(wait(delay(5.0))) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; } } - tr->set(coordinatorsKey, conn->toString()); + tr->set(coordinatorsKey, conn.toString()); return Optional(); } @@ -909,11 +906,12 @@ ACTOR Future changeQuorum(Database cx, ReferencegetConnectionRecord()->getConnectionString().clusterKeyName()) return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database?? + state std::vector oldCoordinators = wait(old.tryResolveHostnames()); state CoordinatorsResult result = CoordinatorsResult::SUCCESS; if (!desiredCoordinators.size()) { std::vector _desiredCoordinators = wait(change->getDesiredCoordinators( &tr, - old.coordinators(), + oldCoordinators, Reference(new ClusterConnectionMemoryRecord(old)), result)); desiredCoordinators = _desiredCoordinators; @@ -937,7 +935,7 @@ ACTOR Future changeQuorum(Database cx, Reference>> leaderServers; leaderServers.reserve(coord.clientLeaderServers.size()); for (int i = 0; i < coord.clientLeaderServers.size(); i++) { - leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader, - GetLeaderRequest(coord.clusterKey, UID()), - TaskPriority::CoordinationReply)); + if (coord.clientLeaderServers[i].hostname.present()) { + leaderServers.push_back(retryGetReplyFromHostname(GetLeaderRequest(coord.clusterKey, UID()), + coord.clientLeaderServers[i].hostname.get(), + WLTOKEN_CLIENTLEADERREG_GETLEADER, + TaskPriority::CoordinationReply)); + } else { + leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader, + GetLeaderRequest(coord.clusterKey, UID()), + TaskPriority::CoordinationReply)); + } } Optional>> results = wait(timeout(getAll(leaderServers), CLIENT_KNOBS->IS_ACCEPTABLE_DELAY)); diff --git a/fdbclient/ManagementAPI.actor.h b/fdbclient/ManagementAPI.actor.h index 64c54447a7..82c8bb4ee9 100644 --- a/fdbclient/ManagementAPI.actor.h +++ b/fdbclient/ManagementAPI.actor.h @@ -56,7 +56,7 @@ struct IQuorumChange : ReferenceCounted { // Change to use the given set of coordination servers ACTOR Future> changeQuorumChecker(Transaction* tr, Reference change, - ClusterConnectionString* conn); + std::vector desiredCoordinators); ACTOR Future changeQuorum(Database cx, Reference change); Reference autoQuorumChange(int desired = -1); Reference noQuorumChange(); @@ -146,7 +146,7 @@ ACTOR Future setHealthyZone(Database cx, StringRef zoneId, double seconds, ACTOR Future waitForPrimaryDC(Database cx, StringRef dcId); // Gets the cluster connection string -ACTOR Future> getCoordinators(Database cx); +ACTOR Future> getConnectionString(Database cx); void schemaCoverage(std::string const& spath, bool covered = true); bool schemaMatch(json_spirit::mValue const& schema, diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index 0b80e5076c..ad08c8ba3d 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -77,18 +77,6 @@ void IClusterConnectionRecord::setPersisted() { connectionStringNeedsPersisted = false; } -ClusterConnectionString::ConnectionStringStatus IClusterConnectionRecord::connectionStringStatus() const { - return cs.status; -} - -Future IClusterConnectionRecord::resolveHostnames() { - return cs.resolveHostnames(); -} - -void IClusterConnectionRecord::resolveHostnamesBlocking() { - cs.resolveHostnamesBlocking(); -} - std::string ClusterConnectionString::getErrorString(std::string const& source, Error const& e) { if (e.code() == error_code_connection_string_invalid) { return format("Invalid connection string `%s: %d %s", source.c_str(), e.code(), e.what()); @@ -97,101 +85,19 @@ std::string ClusterConnectionString::getErrorString(std::string const& source, E } } -ACTOR Future resolveHostnamesImpl(ClusterConnectionString* self) { - loop { - if (self->status == ClusterConnectionString::UNRESOLVED) { - self->status = ClusterConnectionString::RESOLVING; - std::vector> fs; - for (auto const& hostname : self->hostnames) { - fs.push_back(map(INetworkConnections::net()->resolveTCPEndpoint(hostname.host, hostname.service), - [=](std::vector const& addresses) -> Void { - NetworkAddress address = - addresses[deterministicRandom()->randomInt(0, addresses.size())]; - address.flags = 0; // Reset the parsed address to public - address.fromHostname = NetworkAddressFromHostname::True; - if (hostname.isTLS) { - address.flags |= NetworkAddress::FLAG_TLS; - } - self->addResolved(hostname, address); - return Void(); - })); - } - wait(waitForAll(fs)); - std::sort(self->coords.begin(), self->coords.end()); - if (std::unique(self->coords.begin(), self->coords.end()) != self->coords.end()) { - self->status = ClusterConnectionString::UNRESOLVED; - self->resolveFinish.trigger(); - throw connection_string_invalid(); - } - self->status = ClusterConnectionString::RESOLVED; - self->resolveFinish.trigger(); - break; - } else if (self->status == ClusterConnectionString::RESOLVING) { - wait(self->resolveFinish.onTrigger()); - if (self->status == ClusterConnectionString::RESOLVED) { - break; - } - // Otherwise, this means other threads failed on resolve, so here we go back to the loop and try to resolve - // again. - } else { - // status is RESOLVED, nothing to do. - break; - } - } - return Void(); -} - -Future ClusterConnectionString::resolveHostnames() { - return resolveHostnamesImpl(this); -} - -void ClusterConnectionString::resolveHostnamesBlocking() { - if (status != RESOLVED) { - status = RESOLVING; - for (auto const& hostname : hostnames) { - std::vector addresses = - INetworkConnections::net()->resolveTCPEndpointBlocking(hostname.host, hostname.service); - NetworkAddress address = addresses[deterministicRandom()->randomInt(0, addresses.size())]; - address.flags = 0; // Reset the parsed address to public - address.fromHostname = NetworkAddressFromHostname::True; - if (hostname.isTLS) { - address.flags |= NetworkAddress::FLAG_TLS; - } - addResolved(hostname, address); - } - std::sort(coords.begin(), coords.end()); - if (std::unique(coords.begin(), coords.end()) != coords.end()) { - status = UNRESOLVED; - throw connection_string_invalid(); - } - status = RESOLVED; - } -} - -void ClusterConnectionString::resetToUnresolved() { - if (status == RESOLVED && hostnames.size() > 0) { - coords.clear(); - hostnames.clear(); - networkAddressToHostname.clear(); - status = UNRESOLVED; - parseConnString(); - } -} - -void ClusterConnectionString::resetConnectionString() { - connectionString = toString(); -} - -void ClusterConnectionString::parseConnString() { +ClusterConnectionString::ClusterConnectionString(const std::string& connectionString) { + auto trimmed = trim(connectionString); // Split on '@' into key@addrs - int pAt = connectionString.find_first_of('@'); - if (pAt == connectionString.npos) { + int pAt = trimmed.find_first_of('@'); + if (pAt == trimmed.npos) { throw connection_string_invalid(); } - std::string key = connectionString.substr(0, pAt); - std::string addrs = connectionString.substr(pAt + 1); + std::string key = trimmed.substr(0, pAt); + std::string addrs = trimmed.substr(pAt + 1); parseKey(key); + std::set hostnameSet; + std::set addressSet; std::string curAddr; for (int p = 0; p <= addrs.size();) { int pComma = addrs.find_first_of(',', p); @@ -199,31 +105,29 @@ void ClusterConnectionString::parseConnString() { pComma = addrs.size(); curAddr = addrs.substr(p, pComma - p); if (Hostname::isHostname(curAddr)) { + Hostname h = Hostname::parse(curAddr); + // Check that there are no duplicate hostnames + if (hostnameSet.find(h) != hostnameSet.end()) { + throw connection_string_invalid(); + } hostnames.push_back(Hostname::parse(curAddr)); + hostnameSet.insert(h); } else { - coords.push_back(NetworkAddress::parse(curAddr)); + NetworkAddress n = NetworkAddress::parse(curAddr); + // Check that there are no duplicate addresses + if (addressSet.find(n) != addressSet.end()) { + throw connection_string_invalid(); + } + coords.push_back(n); + addressSet.insert(n); } p = pComma + 1; } - if (hostnames.size() > 0) { - status = UNRESOLVED; - } ASSERT((coords.size() + hostnames.size()) > 0); - - std::sort(coords.begin(), coords.end()); - // Check that there are no duplicate addresses - if (std::unique(coords.begin(), coords.end()) != coords.end()) { - throw connection_string_invalid(); - } -} - -ClusterConnectionString::ClusterConnectionString(const std::string& connStr) { - connectionString = trim(connStr); - parseConnString(); } TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/addresses") { - std::string input; + state std::string input; { input = "asdf:2345@1.1.1.1:345"; @@ -231,6 +135,15 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/addresses") { ASSERT(input == cs.toString()); } + { + input = "asdf:2345@1.1.1.1:345,1.1.1.1:345"; + try { + ClusterConnectionString cs(input); + } catch (Error& e) { + ASSERT(e.code() == error_code_connection_string_invalid); + } + } + { input = "0xxdeadbeef:100100100@1.1.1.1:34534,5.1.5.3:23443"; ClusterConnectionString cs(input); @@ -274,20 +187,27 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/addresses") { } TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") { - std::string input; + state std::string input; { input = "asdf:2345@localhost:1234"; ClusterConnectionString cs(input); - ASSERT(cs.status == ClusterConnectionString::UNRESOLVED); ASSERT(cs.hostnames.size() == 1); ASSERT(input == cs.toString()); } + { + input = "asdf:2345@localhost:1234,localhost:1234"; + try { + ClusterConnectionString cs(input); + } catch (Error& e) { + ASSERT(e.code() == error_code_connection_string_invalid); + } + } + { input = "0xxdeadbeef:100100100@localhost:34534,host-name:23443"; ClusterConnectionString cs(input); - ASSERT(cs.status == ClusterConnectionString::UNRESOLVED); ASSERT(cs.hostnames.size() == 2); ASSERT(input == cs.toString()); } @@ -300,7 +220,6 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") { commented += "# asdfasdf ##"; ClusterConnectionString cs(commented); - ASSERT(cs.status == ClusterConnectionString::UNRESOLVED); ASSERT(cs.hostnames.size() == 2); ASSERT(input == cs.toString()); } @@ -313,7 +232,6 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") { commented += "# asdfasdf ##"; ClusterConnectionString cs(commented); - ASSERT(cs.status == ClusterConnectionString::UNRESOLVED); ASSERT(cs.hostnames.size() == 2); ASSERT(input == cs.toString()); } @@ -321,44 +239,30 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") { return Void(); } -TEST_CASE("/fdbclient/MonitorLeader/ConnectionString") { - state std::string connectionString = "TestCluster:0@localhost:1234,host-name:5678"; - std::string hn1 = "localhost", port1 = "1234"; - state std::string hn2 = "host-name"; - state std::string port2 = "5678"; - state std::vector hostnames; - hostnames.push_back(Hostname::parse(hn1 + ":" + port1)); - hostnames.push_back(Hostname::parse(hn2 + ":" + port2)); +TEST_CASE("/fdbclient/MonitorLeader/ConnectionString/hostname") { + std::string connectionString = "TestCluster:0@localhost:1234,host-name:5678"; + std::string hn1 = "localhost", port1 = "1234", hn2 = "host-name", port2 = "5678"; + std::vector hostnames; - NetworkAddress address1 = NetworkAddress::parse("127.0.0.0:1234"); - NetworkAddress address2 = NetworkAddress::parse("127.0.0.1:5678"); + { + hostnames.push_back(Hostname::parse(hn1 + ":" + port1)); + hostnames.push_back(Hostname::parse(hn2 + ":" + port2)); - INetworkConnections::net()->addMockTCPEndpoint(hn1, port1, { address1 }); - INetworkConnections::net()->addMockTCPEndpoint(hn2, port2, { address2 }); + ClusterConnectionString cs(hostnames, LiteralStringRef("TestCluster:0")); + ASSERT(cs.hostnames.size() == 2); + ASSERT(cs.coordinators().size() == 0); + ASSERT(cs.toString() == connectionString); + } - state ClusterConnectionString cs(hostnames, LiteralStringRef("TestCluster:0")); - ASSERT(cs.status == ClusterConnectionString::UNRESOLVED); - ASSERT(cs.hostnames.size() == 2); - ASSERT(cs.coordinators().size() == 0); - wait(cs.resolveHostnames()); - ASSERT(cs.status == ClusterConnectionString::RESOLVED); - ASSERT(cs.hostnames.size() == 2); - ASSERT(cs.coordinators().size() == 2); - ASSERT(cs.toString() == connectionString); - cs.resetToUnresolved(); - ASSERT(cs.status == ClusterConnectionString::UNRESOLVED); - ASSERT(cs.hostnames.size() == 2); - ASSERT(cs.coordinators().size() == 0); - ASSERT(cs.toString() == connectionString); - - INetworkConnections::net()->removeMockTCPEndpoint(hn2, port2); - NetworkAddress address3 = NetworkAddress::parse("127.0.0.0:5678"); - INetworkConnections::net()->addMockTCPEndpoint(hn2, port2, { address3 }); - - try { - wait(cs.resolveHostnames()); - } catch (Error& e) { - ASSERT(e.code() == error_code_connection_string_invalid); + { + hostnames.clear(); + hostnames.push_back(Hostname::parse(hn1 + ":" + port1)); + hostnames.push_back(Hostname::parse(hn1 + ":" + port1)); + try { + ClusterConnectionString cs(hostnames, LiteralStringRef("TestCluster:0")); + } catch (Error& e) { + ASSERT(e.code() == error_code_connection_string_invalid); + } } return Void(); @@ -380,6 +284,7 @@ ACTOR Future> tryResolveHostnamesImpl(ClusterConnect allCoordinatorsSet.insert(coord); } std::vector allCoordinators(allCoordinatorsSet.begin(), allCoordinatorsSet.end()); + std::sort(allCoordinators.begin(), allCoordinators.end()); return allCoordinators; } @@ -484,17 +389,22 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/fuzz") { } ClusterConnectionString::ClusterConnectionString(const std::vector& servers, Key key) - : status(RESOLVED), coords(servers) { + : coords(servers) { + std::set s(servers.begin(), servers.end()); + if (s.size() != servers.size()) { + throw connection_string_invalid(); + } std::string keyString = key.toString(); parseKey(keyString); - resetConnectionString(); } -ClusterConnectionString::ClusterConnectionString(const std::vector& hosts, Key key) - : status(UNRESOLVED), hostnames(hosts) { +ClusterConnectionString::ClusterConnectionString(const std::vector& hosts, Key key) : hostnames(hosts) { + std::set h(hosts.begin(), hosts.end()); + if (h.size() != hosts.size()) { + throw connection_string_invalid(); + } std::string keyString = key.toString(); parseKey(keyString); - resetConnectionString(); } void ClusterConnectionString::parseKey(const std::string& key) { @@ -529,13 +439,11 @@ void ClusterConnectionString::parseKey(const std::string& key) { std::string ClusterConnectionString::toString() const { std::string s = key.toString(); s += '@'; - for (int i = 0; i < coords.size(); i++) { - if (networkAddressToHostname.find(coords[i]) == networkAddressToHostname.end()) { - if (s.find('@') != s.length() - 1) { - s += ','; - } - s += coords[i].toString(); + for (auto const& coord : coords) { + if (s.find('@') != s.length() - 1) { + s += ','; } + s += coord.toString(); } for (auto const& host : hostnames) { if (s.find('@') != s.length() - 1) { @@ -547,11 +455,14 @@ std::string ClusterConnectionString::toString() const { } ClientCoordinators::ClientCoordinators(Reference ccr) : ccr(ccr) { - ASSERT(ccr->connectionStringStatus() == ClusterConnectionString::RESOLVED); ClusterConnectionString cs = ccr->getConnectionString(); - for (auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s) - clientLeaderServers.push_back(ClientLeaderRegInterface(*s)); clusterKey = cs.clusterKey(); + for (auto h : cs.hostnames) { + clientLeaderServers.push_back(ClientLeaderRegInterface(h)); + } + for (auto s : cs.coordinators()) { + clientLeaderServers.push_back(ClientLeaderRegInterface(s)); + } } ClientCoordinators::ClientCoordinators(Key clusterKey, std::vector coordinators) @@ -576,49 +487,32 @@ ClientLeaderRegInterface::ClientLeaderRegInterface(INetwork* local) { // Nominee is the worker among all workers that are considered as leader by one coordinator // This function contacts a coordinator coord to ask who is its nominee. -// Note: for coordinators whose NetworkAddress is parsed out of a hostname, a connection failure will cause this actor -// to throw `coordinators_changed()` error ACTOR Future monitorNominee(Key key, ClientLeaderRegInterface coord, AsyncTrigger* nomineeChange, - Optional* info, - Optional hostname = Optional()) { + Optional* info) { loop { state Optional li; - - if (coord.getLeader.getEndpoint().getPrimaryAddress().fromHostname) { - state ErrorOr> rep = - wait(coord.getLeader.tryGetReply(GetLeaderRequest(key, info->present() ? info->get().changeID : UID()), - TaskPriority::CoordinationReply)); - if (rep.isError()) { - // Connecting to nominee failed, most likely due to connection failed. - TraceEvent("MonitorNomineeError") - .error(rep.getError()) - .detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname") - .detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString()); - if (rep.getError().code() == error_code_request_maybe_delivered) { - // Delay to prevent tight resolving loop due to outdated DNS cache - wait(delay(FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL)); - throw coordinators_changed(); - } else { - throw rep.getError(); - } - } else if (rep.present()) { - li = rep.get(); - } + if (coord.hostname.present()) { + wait(store(li, + retryGetReplyFromHostname(GetLeaderRequest(key, info->present() ? info->get().changeID : UID()), + coord.hostname.get(), + WLTOKEN_CLIENTLEADERREG_GETLEADER, + TaskPriority::CoordinationReply))); } else { - Optional tmp = - wait(retryBrokenPromise(coord.getLeader, - GetLeaderRequest(key, info->present() ? info->get().changeID : UID()), - TaskPriority::CoordinationReply)); - li = tmp; + wait(store(li, + retryBrokenPromise(coord.getLeader, + GetLeaderRequest(key, info->present() ? info->get().changeID : UID()), + TaskPriority::CoordinationReply))); } wait(Future(Void())); // Make sure we weren't cancelled TraceEvent("GetLeaderReply") .suppressFor(1.0) - .detail("Coordinator", coord.getLeader.getEndpoint().getPrimaryAddress()) + .detail("Coordinator", + coord.hostname.present() ? coord.hostname.get().toString() + : coord.getLeader.getEndpoint().getPrimaryAddress().toString()) .detail("Nominee", li.present() ? li.get().changeID : UID()) .detail("ClusterKey", key.printable()); @@ -687,74 +581,54 @@ Optional> getLeader(const std::vector monitorLeaderOneGeneration(Reference connRecord, Reference> outSerializedLeaderInfo, MonitorLeaderInfo info) { + state ClientCoordinators coordinators(info.intermediateConnRecord); + state AsyncTrigger nomineeChange; + state std::vector> nominees; + state Future allActors; + + nominees.resize(coordinators.clientLeaderServers.size()); + + state std::vector> actors; + // Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator. + actors.reserve(coordinators.clientLeaderServers.size()); + for (int i = 0; i < coordinators.clientLeaderServers.size(); i++) { + actors.push_back( + monitorNominee(coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i])); + } + allActors = waitForAll(actors); + loop { - wait(connRecord->resolveHostnames()); - wait(info.intermediateConnRecord->resolveHostnames()); - state ClientCoordinators coordinators(info.intermediateConnRecord); - state AsyncTrigger nomineeChange; - state std::vector> nominees; - state Future allActors; - - nominees.resize(coordinators.clientLeaderServers.size()); - - state std::vector> actors; - // Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator. - actors.reserve(coordinators.clientLeaderServers.size()); - for (int i = 0; i < coordinators.clientLeaderServers.size(); i++) { - Optional hostname; - auto r = connRecord->getConnectionString().networkAddressToHostname.find( - coordinators.clientLeaderServers[i].getLeader.getEndpoint().getPrimaryAddress()); - if (r != connRecord->getConnectionString().networkAddressToHostname.end()) { - hostname = r->second; - } - actors.push_back(monitorNominee( - coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i], hostname)); - } - allActors = waitForAll(actors); - - loop { - Optional> leader = getLeader(nominees); - TraceEvent("MonitorLeaderChange") - .detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1, 1)); - if (leader.present()) { - if (leader.get().first.forward) { - TraceEvent("MonitorLeaderForwarding") - .detail("NewConnStr", leader.get().first.serializedInfo.toString()) - .detail("OldConnStr", info.intermediateConnRecord->getConnectionString().toString()) - .trackLatest("MonitorLeaderForwarding"); - info.intermediateConnRecord = connRecord->makeIntermediateRecord( - ClusterConnectionString(leader.get().first.serializedInfo.toString())); - return info; - } - if (connRecord != info.intermediateConnRecord) { - if (!info.hasConnected) { - TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection") - .detail("ClusterFile", connRecord->toString()) - .detail("StoredConnectionString", connRecord->getConnectionString().toString()) - .detail("CurrentConnectionString", - info.intermediateConnRecord->getConnectionString().toString()); - } - connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString()); - info.intermediateConnRecord = connRecord; - } - - info.hasConnected = true; - connRecord->notifyConnected(); - - outSerializedLeaderInfo->set(leader.get().first.serializedInfo); - } - try { - wait(nomineeChange.onTrigger() || allActors); - } catch (Error& e) { - if (e.code() == error_code_coordinators_changed) { - TraceEvent("MonitorLeaderCoordinatorsChanged").suppressFor(1.0); - connRecord->getConnectionString().resetToUnresolved(); - break; - } else { - throw e; - } + Optional> leader = getLeader(nominees); + TraceEvent("MonitorLeaderChange") + .detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1, 1)); + if (leader.present()) { + if (leader.get().first.forward) { + TraceEvent("MonitorLeaderForwarding") + .detail("NewConnStr", leader.get().first.serializedInfo.toString()) + .detail("OldConnStr", info.intermediateConnRecord->getConnectionString().toString()) + .trackLatest("MonitorLeaderForwarding"); + info.intermediateConnRecord = connRecord->makeIntermediateRecord( + ClusterConnectionString(leader.get().first.serializedInfo.toString())); + return info; } + if (connRecord != info.intermediateConnRecord) { + if (!info.hasConnected) { + TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection") + .detail("ClusterFile", connRecord->toString()) + .detail("StoredConnectionString", connRecord->getConnectionString().toString()) + .detail("CurrentConnectionString", + info.intermediateConnRecord->getConnectionString().toString()); + } + connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString()); + info.intermediateConnRecord = connRecord; + } + + info.hasConnected = true; + connRecord->notifyConnected(); + + outSerializedLeaderInfo->set(leader.get().first.serializedInfo); } + wait(nomineeChange.onTrigger() || allActors); } } @@ -885,10 +759,10 @@ ACTOR Future getClientInfoFromLeader(Reference monitorLeaderAndGetClientInfo(Key clusterKey, + std::vector hostnames, std::vector coordinators, ClientData* clientData, - Reference>> leaderInfo, - Reference> coordinatorsChanged) { + Reference>> leaderInfo) { state std::vector clientLeaderServers; state AsyncTrigger nomineeChange; state std::vector> nominees; @@ -896,8 +770,12 @@ ACTOR Future monitorLeaderAndGetClientInfo(Key clusterKey, state Reference>> knownLeader( new AsyncVar>{}); - for (auto s = coordinators.begin(); s != coordinators.end(); ++s) { - clientLeaderServers.push_back(ClientLeaderRegInterface(*s)); + clientLeaderServers.reserve(hostnames.size() + coordinators.size()); + for (auto h : hostnames) { + clientLeaderServers.push_back(ClientLeaderRegInterface(h)); + } + for (auto s : coordinators) { + clientLeaderServers.push_back(ClientLeaderRegInterface(s)); } nominees.resize(clientLeaderServers.size()); @@ -936,14 +814,7 @@ ACTOR Future monitorLeaderAndGetClientInfo(Key clusterKey, leaderInfo->set(leader.get().first); } } - try { - wait(nomineeChange.onTrigger() || allActors); - } catch (Error& e) { - if (e.code() == error_code_coordinators_changed) { - coordinatorsChanged->trigger(); - } - throw e; - } + wait(nomineeChange.onTrigger() || allActors); } } @@ -995,7 +866,8 @@ ACTOR Future monitorProxiesOneGeneration( Reference>>> supportedVersions, Key traceLogGroup) { state ClusterConnectionString cs = info.intermediateConnRecord->getConnectionString(); - state std::vector addrs = cs.coordinators(); + state std::vector hostnames = cs.hostnames; + state int coordinatorsSize = hostnames.size() + cs.coordinators().size(); state int index = 0; state int successIndex = 0; state Optional incorrectTime; @@ -1003,15 +875,26 @@ ACTOR Future monitorProxiesOneGeneration( state std::vector lastCommitProxies; state std::vector lastGrvProxyUIDs; state std::vector lastGrvProxies; + state std::vector clientLeaderServers; + + clientLeaderServers.reserve(coordinatorsSize); + for (const auto& h : hostnames) { + clientLeaderServers.push_back(ClientLeaderRegInterface(h)); + } + for (const auto& c : cs.coordinators()) { + clientLeaderServers.push_back(ClientLeaderRegInterface(c)); + } + + deterministicRandom()->randomShuffle(clientLeaderServers); - deterministicRandom()->randomShuffle(addrs); loop { - state ClientLeaderRegInterface clientLeaderServer(addrs[index]); + state ClientLeaderRegInterface clientLeaderServer = clientLeaderServers[index]; state OpenDatabaseCoordRequest req; coordinator->set(clientLeaderServer); req.clusterKey = cs.clusterKey(); + req.hostnames = hostnames; req.coordinators = cs.coordinators(); req.knownClientInfoID = clientInfo->get().id; req.supportedVersions = supportedVersions->get(); @@ -1040,8 +923,16 @@ ACTOR Future monitorProxiesOneGeneration( incorrectTime = Optional(); } - state ErrorOr> rep = - wait(clientLeaderServer.openDatabase.tryGetReply(req, TaskPriority::CoordinationReply)); + state ErrorOr> rep; + if (clientLeaderServer.hostname.present()) { + wait(store(rep, + tryGetReplyFromHostname(req, + clientLeaderServer.hostname.get(), + WLTOKEN_CLIENTLEADERREG_OPENDATABASE, + TaskPriority::CoordinationReply))); + } else { + wait(store(rep, clientLeaderServer.openDatabase.tryGetReply(req, TaskPriority::CoordinationReply))); + } if (rep.present()) { if (rep.get().read().forward.present()) { TraceEvent("MonitorProxiesForwarding") @@ -1072,15 +963,10 @@ ACTOR Future monitorProxiesOneGeneration( successIndex = index; } else { TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cant talk to cluster controller - if (rep.getError().code() == error_code_coordinators_changed) { - throw coordinators_changed(); - } - index = (index + 1) % addrs.size(); + TEST(rep.getError().code() == error_code_lookup_failed); // Coordinator hostname resolving failure + index = (index + 1) % coordinatorsSize; if (index == successIndex) { wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY)); - // When the client fails talking to all coordinators, we throw coordinators_changed() and let the caller - // re-resolve the connection string and retry. - throw coordinators_changed(); } } } @@ -1092,27 +978,16 @@ ACTOR Future monitorProxies( Reference>> coordinator, Reference>>> supportedVersions, Key traceLogGroup) { - wait(connRecord->get()->resolveHostnames()); state MonitorLeaderInfo info(connRecord->get()); loop { - try { - wait(info.intermediateConnRecord->resolveHostnames()); - choose { - when(MonitorLeaderInfo _info = wait(monitorProxiesOneGeneration( - connRecord->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) { - info = _info; - } - when(wait(connRecord->onChange())) { - info.hasConnected = false; - info.intermediateConnRecord = connRecord->get(); - } + choose { + when(MonitorLeaderInfo _info = wait(monitorProxiesOneGeneration( + connRecord->get(), clientInfo, coordinator, info, supportedVersions, traceLogGroup))) { + info = _info; } - } catch (Error& e) { - if (e.code() == error_code_coordinators_changed) { - TraceEvent("MonitorProxiesCoordinatorsChanged").suppressFor(1.0); - info.intermediateConnRecord->getConnectionString().resetToUnresolved(); - } else { - throw e; + when(wait(connRecord->onChange())) { + info.hasConnected = false; + info.intermediateConnRecord = connRecord->get(); } } } diff --git a/fdbclient/MonitorLeader.h b/fdbclient/MonitorLeader.h index 6ceed69a63..8819b5702b 100644 --- a/fdbclient/MonitorLeader.h +++ b/fdbclient/MonitorLeader.h @@ -75,10 +75,10 @@ Future monitorLeader(Reference const& connFile, // nominees, the nominee with the most nomination is the leader, and collects client data from the leader. This function // also monitors the change of the leader. Future monitorLeaderAndGetClientInfo(Key const& clusterKey, + std::vector const& hostnames, std::vector const& coordinators, ClientData* const& clientData, - Reference>> const& leaderInfo, - Reference> const& coordinatorsChanged); + Reference>> const& leaderInfo); Future monitorProxies( Reference>> const& connRecord, diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 61c68bc18e..ec8409ac91 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6927,11 +6927,18 @@ Future> Transaction::getVersionstamp() { } // Gets the protocol version reported by a coordinator via the protocol info interface -ACTOR Future getCoordinatorProtocol(NetworkAddressList coordinatorAddresses) { - RequestStream requestStream{ Endpoint::wellKnown({ coordinatorAddresses }, - WLTOKEN_PROTOCOL_INFO) }; - ProtocolInfoReply reply = wait(retryBrokenPromise(requestStream, ProtocolInfoRequest{})); - +ACTOR Future getCoordinatorProtocol( + Reference> const> coordinator) { + state ProtocolInfoReply reply; + if (coordinator->get().get().hostname.present()) { + wait(store(reply, + retryGetReplyFromHostname( + ProtocolInfoRequest{}, coordinator->get().get().hostname.get(), WLTOKEN_PROTOCOL_INFO))); + } else { + RequestStream requestStream( + Endpoint::wellKnown({ coordinator->get().get().getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO)); + wait(store(reply, retryBrokenPromise(requestStream, ProtocolInfoRequest{}))); + } return reply.version; } @@ -6940,8 +6947,16 @@ ACTOR Future getCoordinatorProtocol(NetworkAddressList coordina // function will return with an unset result. // If an expected version is given, this future won't return if the actual protocol version matches the expected version ACTOR Future> getCoordinatorProtocolFromConnectPacket( - NetworkAddress coordinatorAddress, + Reference> const> coordinator, Optional expectedVersion) { + state NetworkAddress coordinatorAddress; + if (coordinator->get().get().hostname.present()) { + Hostname h = coordinator->get().get().hostname.get(); + wait(store(coordinatorAddress, h.resolveWithRetry())); + } else { + coordinatorAddress = coordinator->get().get().getLeader.getEndpoint().getPrimaryAddress(); + } + state Reference> const> protocolVersion = FlowTransport::transport().getPeerProtocolAsyncVar(coordinatorAddress); @@ -6976,11 +6991,10 @@ ACTOR Future getClusterProtocolImpl( if (!coordinator->get().present()) { wait(coordinator->onChange()); } else { - Endpoint coordinatorEndpoint = coordinator->get().get().getLeader.getEndpoint(); if (needToConnect) { // Even though we typically rely on the connect packet to get the protocol version, we need to send some // request in order to start a connection. This protocol version request serves that purpose. - protocolVersion = getCoordinatorProtocol(coordinatorEndpoint.addresses); + protocolVersion = getCoordinatorProtocol(coordinator); needToConnect = false; } choose { @@ -6996,8 +7010,8 @@ ACTOR Future getClusterProtocolImpl( // Older versions of FDB don't have an endpoint to return the protocol version, so we get this info from // the connect packet - when(Optional pv = wait(getCoordinatorProtocolFromConnectPacket( - coordinatorEndpoint.getPrimaryAddress(), expectedVersion))) { + when(Optional pv = + wait(getCoordinatorProtocolFromConnectPacket(coordinator, expectedVersion))) { if (pv.present()) { return pv.get(); } else { @@ -8171,14 +8185,20 @@ ACTOR Future checkSafeExclusions(Database cx, std::vectorgetConnectionRecord()->resolveHostnames()); state ClientCoordinators coordinatorList(cx->getConnectionRecord()); state std::vector>> leaderServers; leaderServers.reserve(coordinatorList.clientLeaderServers.size()); for (int i = 0; i < coordinatorList.clientLeaderServers.size(); i++) { - leaderServers.push_back(retryBrokenPromise(coordinatorList.clientLeaderServers[i].getLeader, - GetLeaderRequest(coordinatorList.clusterKey, UID()), - TaskPriority::CoordinationReply)); + if (coordinatorList.clientLeaderServers[i].hostname.present()) { + leaderServers.push_back(retryGetReplyFromHostname(GetLeaderRequest(coordinatorList.clusterKey, UID()), + coordinatorList.clientLeaderServers[i].hostname.get(), + WLTOKEN_CLIENTLEADERREG_GETLEADER, + TaskPriority::CoordinationReply)); + } else { + leaderServers.push_back(retryBrokenPromise(coordinatorList.clientLeaderServers[i].getLeader, + GetLeaderRequest(coordinatorList.clusterKey, UID()), + TaskPriority::CoordinationReply)); + } } // Wait for quorum so we don't dismiss live coordinators as unreachable by acting too fast choose { diff --git a/fdbclient/PaxosConfigTransaction.actor.cpp b/fdbclient/PaxosConfigTransaction.actor.cpp index b6fc75fec9..50ebbd1a4c 100644 --- a/fdbclient/PaxosConfigTransaction.actor.cpp +++ b/fdbclient/PaxosConfigTransaction.actor.cpp @@ -59,8 +59,14 @@ class CommitQuorum { ConfigGeneration generation, ConfigTransactionInterface cti) { try { - wait(timeoutError(cti.commit.getReply(self->getCommitRequest(generation)), - CLIENT_KNOBS->COMMIT_QUORUM_TIMEOUT)); + if (cti.hostname.present()) { + wait(timeoutError(retryGetReplyFromHostname( + self->getCommitRequest(generation), cti.hostname.get(), WLTOKEN_CONFIGTXN_COMMIT), + CLIENT_KNOBS->COMMIT_QUORUM_TIMEOUT)); + } else { + wait(timeoutError(cti.commit.getReply(self->getCommitRequest(generation)), + CLIENT_KNOBS->COMMIT_QUORUM_TIMEOUT)); + } ++self->successful; } catch (Error& e) { // self might be destroyed if this actor is cancelled @@ -122,9 +128,20 @@ class GetGenerationQuorum { ACTOR static Future addRequestActor(GetGenerationQuorum* self, ConfigTransactionInterface cti) { loop { try { - ConfigTransactionGetGenerationReply reply = wait(timeoutError( - cti.getGeneration.getReply(ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion }), - CLIENT_KNOBS->GET_GENERATION_QUORUM_TIMEOUT)); + state ConfigTransactionGetGenerationReply reply; + if (cti.hostname.present()) { + wait(timeoutError(store(reply, + retryGetReplyFromHostname( + ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion }, + cti.hostname.get(), + WLTOKEN_CONFIGTXN_GETGENERATION)), + CLIENT_KNOBS->GET_GENERATION_QUORUM_TIMEOUT)); + } else { + wait(timeoutError(store(reply, + cti.getGeneration.getReply( + ConfigTransactionGetGenerationRequest{ self->lastSeenLiveVersion })), + CLIENT_KNOBS->GET_GENERATION_QUORUM_TIMEOUT)); + } ++self->totalRepliesReceived; auto gen = reply.generation; @@ -225,9 +242,18 @@ class PaxosConfigTransactionImpl { state ConfigKey configKey = ConfigKey::decodeKey(key); loop { try { - ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); - state Reference configNodes( - new ConfigTransactionInfo(self->getGenerationQuorum.getReadReplicas())); + state ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); + state std::vector readReplicas = + self->getGenerationQuorum.getReadReplicas(); + std::vector> fs; + for (ConfigTransactionInterface& readReplica : readReplicas) { + if (readReplica.hostname.present()) { + fs.push_back(tryInitializeRequestStream( + &readReplica.get, readReplica.hostname.get(), WLTOKEN_CONFIGTXN_GET)); + } + } + wait(waitForAll(fs)); + state Reference configNodes(new ConfigTransactionInfo(readReplicas)); ConfigTransactionGetReply reply = wait(timeoutError(basicLoadBalance(configNodes, &ConfigTransactionInterface::get, @@ -248,9 +274,17 @@ class PaxosConfigTransactionImpl { } ACTOR static Future getConfigClasses(PaxosConfigTransactionImpl* self) { - ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); - state Reference configNodes( - new ConfigTransactionInfo(self->getGenerationQuorum.getReadReplicas())); + state ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); + state std::vector readReplicas = self->getGenerationQuorum.getReadReplicas(); + std::vector> fs; + for (ConfigTransactionInterface& readReplica : readReplicas) { + if (readReplica.hostname.present()) { + fs.push_back(tryInitializeRequestStream( + &readReplica.getClasses, readReplica.hostname.get(), WLTOKEN_CONFIGTXN_GETCLASSES)); + } + } + wait(waitForAll(fs)); + state Reference configNodes(new ConfigTransactionInfo(readReplicas)); ConfigTransactionGetConfigClassesReply reply = wait(basicLoadBalance(configNodes, &ConfigTransactionInterface::getClasses, @@ -264,9 +298,17 @@ class PaxosConfigTransactionImpl { } ACTOR static Future getKnobs(PaxosConfigTransactionImpl* self, Optional configClass) { - ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); - state Reference configNodes( - new ConfigTransactionInfo(self->getGenerationQuorum.getReadReplicas())); + state ConfigGeneration generation = wait(self->getGenerationQuorum.getGeneration()); + state std::vector readReplicas = self->getGenerationQuorum.getReadReplicas(); + std::vector> fs; + for (ConfigTransactionInterface& readReplica : readReplicas) { + if (readReplica.hostname.present()) { + fs.push_back(tryInitializeRequestStream( + &readReplica.getKnobs, readReplica.hostname.get(), WLTOKEN_CONFIGTXN_GETKNOBS)); + } + } + wait(waitForAll(fs)); + state Reference configNodes(new ConfigTransactionInfo(readReplicas)); ConfigTransactionGetKnobsReply reply = wait(basicLoadBalance(configNodes, &ConfigTransactionInterface::getKnobs, @@ -366,10 +408,13 @@ public: Future commit() { return commit(this); } PaxosConfigTransactionImpl(Database const& cx) : cx(cx) { - auto coordinators = cx->getConnectionRecord()->getConnectionString().coordinators(); - ctis.reserve(coordinators.size()); - for (const auto& coordinator : coordinators) { - ctis.emplace_back(coordinator); + const ClusterConnectionString& cs = cx->getConnectionRecord()->getConnectionString(); + ctis.reserve(cs.hostnames.size() + cs.coordinators().size()); + for (const auto& h : cs.hostnames) { + ctis.emplace_back(h); + } + for (const auto& c : cs.coordinators()) { + ctis.emplace_back(c); } getGenerationQuorum = GetGenerationQuorum{ ctis }; commitQuorum = CommitQuorum{ ctis }; diff --git a/fdbclient/SimpleConfigTransaction.actor.cpp b/fdbclient/SimpleConfigTransaction.actor.cpp index ecb8380d4c..2e71cd73c4 100644 --- a/fdbclient/SimpleConfigTransaction.actor.cpp +++ b/fdbclient/SimpleConfigTransaction.actor.cpp @@ -41,9 +41,15 @@ class SimpleConfigTransactionImpl { if (self->dID.present()) { TraceEvent("SimpleConfigTransactionGettingReadVersion", self->dID.get()); } - ConfigTransactionGetGenerationRequest req; - ConfigTransactionGetGenerationReply reply = - wait(retryBrokenPromise(self->cti.getGeneration, ConfigTransactionGetGenerationRequest{})); + state ConfigTransactionGetGenerationReply reply; + if (self->cti.hostname.present()) { + wait(store(reply, + retryGetReplyFromHostname(ConfigTransactionGetGenerationRequest{}, + self->cti.hostname.get(), + WLTOKEN_CONFIGTXN_GETGENERATION))); + } else { + wait(store(reply, retryBrokenPromise(self->cti.getGeneration, ConfigTransactionGetGenerationRequest{}))); + } if (self->dID.present()) { TraceEvent("SimpleConfigTransactionGotReadVersion", self->dID.get()) .detail("Version", reply.generation.liveVersion); @@ -62,8 +68,15 @@ class SimpleConfigTransactionImpl { .detail("ConfigClass", configKey.configClass) .detail("KnobName", configKey.knobName); } - ConfigTransactionGetReply reply = - wait(retryBrokenPromise(self->cti.get, ConfigTransactionGetRequest{ generation, configKey })); + state ConfigTransactionGetReply reply; + if (self->cti.hostname.present()) { + wait(store(reply, + retryGetReplyFromHostname(ConfigTransactionGetRequest{ generation, configKey }, + self->cti.hostname.get(), + WLTOKEN_CONFIGTXN_GET))); + } else { + wait(store(reply, retryBrokenPromise(self->cti.get, ConfigTransactionGetRequest{ generation, configKey }))); + } if (self->dID.present()) { TraceEvent("SimpleConfigTransactionGotValue", self->dID.get()) .detail("Value", reply.value.get().toString()); @@ -80,8 +93,17 @@ class SimpleConfigTransactionImpl { self->getGenerationFuture = getGeneration(self); } ConfigGeneration generation = wait(self->getGenerationFuture); - ConfigTransactionGetConfigClassesReply reply = - wait(retryBrokenPromise(self->cti.getClasses, ConfigTransactionGetConfigClassesRequest{ generation })); + state ConfigTransactionGetConfigClassesReply reply; + if (self->cti.hostname.present()) { + wait(store(reply, + retryGetReplyFromHostname(ConfigTransactionGetConfigClassesRequest{ generation }, + self->cti.hostname.get(), + WLTOKEN_CONFIGTXN_GETCLASSES))); + } else { + wait(store( + reply, + retryBrokenPromise(self->cti.getClasses, ConfigTransactionGetConfigClassesRequest{ generation }))); + } RangeResult result; for (const auto& configClass : reply.configClasses) { result.push_back_deep(result.arena(), KeyValueRef(configClass, ""_sr)); @@ -94,8 +116,17 @@ class SimpleConfigTransactionImpl { self->getGenerationFuture = getGeneration(self); } ConfigGeneration generation = wait(self->getGenerationFuture); - ConfigTransactionGetKnobsReply reply = - wait(retryBrokenPromise(self->cti.getKnobs, ConfigTransactionGetKnobsRequest{ generation, configClass })); + state ConfigTransactionGetKnobsReply reply; + if (self->cti.hostname.present()) { + wait(store(reply, + retryGetReplyFromHostname(ConfigTransactionGetKnobsRequest{ generation, configClass }, + self->cti.hostname.get(), + WLTOKEN_CONFIGTXN_GETKNOBS))); + } else { + wait(store( + reply, + retryBrokenPromise(self->cti.getKnobs, ConfigTransactionGetKnobsRequest{ generation, configClass }))); + } RangeResult result; for (const auto& knobName : reply.knobNames) { result.push_back_deep(result.arena(), KeyValueRef(knobName, ""_sr)); @@ -109,7 +140,11 @@ class SimpleConfigTransactionImpl { } wait(store(self->toCommit.generation, self->getGenerationFuture)); self->toCommit.annotation.timestamp = now(); - wait(retryBrokenPromise(self->cti.commit, self->toCommit)); + if (self->cti.hostname.present()) { + wait(retryGetReplyFromHostname(self->toCommit, self->cti.hostname.get(), WLTOKEN_CONFIGTXN_COMMIT)); + } else { + wait(retryBrokenPromise(self->cti.commit, self->toCommit)); + } self->committed = true; return Void(); } @@ -126,9 +161,14 @@ class SimpleConfigTransactionImpl { public: SimpleConfigTransactionImpl(Database const& cx) : cx(cx) { - auto coordinators = cx->getConnectionRecord()->getConnectionString().coordinators(); - std::sort(coordinators.begin(), coordinators.end()); - cti = ConfigTransactionInterface(coordinators[0]); + const ClusterConnectionString& cs = cx->getConnectionRecord()->getConnectionString(); + if (cs.coordinators().size()) { + std::vector coordinators = cs.coordinators(); + std::sort(coordinators.begin(), coordinators.end()); + cti = ConfigTransactionInterface(coordinators[0]); + } else { + cti = ConfigTransactionInterface(cs.hostnames[0]); + } } SimpleConfigTransactionImpl(ConfigTransactionInterface const& cti) : cti(cti) {} diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 7c95e3aa02..f018c0fc2b 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1644,13 +1644,10 @@ void TracingOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key CoordinatorsImpl::CoordinatorsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} -Future CoordinatorsImpl::getRange(ReadYourWritesTransaction* ryw, - KeyRangeRef kr, - GetRangeLimits limitsHint) const { +ACTOR Future coordinatorsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRef prefix, KeyRangeRef kr) { + state ClusterConnectionString cs = ryw->getDatabase()->getConnectionRecord()->getConnectionString(); + state std::vector coordinator_processes = wait(cs.tryResolveHostnames()); RangeResult result; - KeyRef prefix(getKeyRange().begin); - auto cs = ryw->getDatabase()->getConnectionRecord()->getConnectionString(); - auto coordinator_processes = cs.coordinators(); Key cluster_decription_key = prefix.withSuffix(LiteralStringRef("cluster_description")); if (kr.contains(cluster_decription_key)) { result.push_back_deep(result.arena(), KeyValueRef(cluster_decription_key, cs.clusterKeyName())); @@ -1673,10 +1670,16 @@ Future CoordinatorsImpl::getRange(ReadYourWritesTransaction* ryw, return rywGetRange(ryw, kr, result); } +Future CoordinatorsImpl::getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr, + GetRangeLimits limitsHint) const { + KeyRef prefix(getKeyRange().begin); + return coordinatorsGetRangeActor(ryw, prefix, kr); +} + ACTOR static Future> coordinatorsCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { state Reference change; - state ClusterConnectionString - conn; // We don't care about the Key here, it will be overrode in changeQuorumChecker(). + state ClusterConnectionString conn; // We don't care about the Key here. state std::vector process_address_or_hostname_strs; state Optional msg; state int index; @@ -1700,7 +1703,6 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite try { if (Hostname::isHostname(process_address_or_hostname_strs[index])) { conn.hostnames.push_back(Hostname::parse(process_address_or_hostname_strs[index])); - conn.status = ClusterConnectionString::ConnectionStringStatus::UNRESOLVED; } else { NetworkAddress a = NetworkAddress::parse(process_address_or_hostname_strs[index]); if (!a.isValid()) { @@ -1717,18 +1719,19 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite if (parse_error) { std::string error = "ERROR: \'" + process_address_or_hostname_strs[index] + "\' is not a valid network endpoint address\n"; - if (process_address_or_hostname_strs[index].find(":tls") != std::string::npos) - error += " Do not include the `:tls' suffix when naming a process\n"; return ManagementAPIError::toJsonString(false, "coordinators", error); } } } - wait(conn.resolveHostnames()); - if (conn.coordinators().size()) - change = specifiedQuorumChange(conn.coordinators()); - else + std::vector addressesVec = wait(conn.tryResolveHostnames()); + if (addressesVec.size() != conn.hostnames.size() + conn.coordinators().size()) { + return ManagementAPIError::toJsonString(false, "coordinators", "One or more hostnames are not resolvable."); + } else if (addressesVec.size()) { + change = specifiedQuorumChange(addressesVec); + } else { change = noQuorumChange(); + } // check update for cluster_description Key cluster_decription_key = LiteralStringRef("cluster_description").withPrefix(kr.begin); @@ -1740,19 +1743,18 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite change = nameQuorumChange(entry.second.get().toString(), change); } else { // throw the error - return Optional(ManagementAPIError::toJsonString( - false, "coordinators", "Cluster description must match [A-Za-z0-9_]+")); + return ManagementAPIError::toJsonString( + false, "coordinators", "Cluster description must match [A-Za-z0-9_]+"); } } ASSERT(change.isValid()); TraceEvent(SevDebug, "SKSChangeCoordinatorsStart") - .detail("NewHostnames", conn.hostnames.size() ? describe(conn.hostnames) : "N/A") - .detail("NewAddresses", describe(conn.coordinators())) + .detail("NewAddresses", describe(addressesVec)) .detail("Description", entry.first ? entry.second.get().toString() : ""); - Optional r = wait(changeQuorumChecker(&ryw->getTransaction(), change, &conn)); + Optional r = wait(changeQuorumChecker(&ryw->getTransaction(), change, addressesVec)); TraceEvent(SevDebug, "SKSChangeCoordinatorsFinish") .detail("Result", r.present() ? static_cast(r.get()) : -1); // -1 means success @@ -1804,9 +1806,10 @@ ACTOR static Future CoordinatorsAutoImplActor(ReadYourWritesTransac state ClusterConnectionString old(currentKey.get().toString()); state CoordinatorsResult result = CoordinatorsResult::SUCCESS; + std::vector oldCoordinators = wait(old.tryResolveHostnames()); std::vector _desiredCoordinators = wait(autoQuorumChange()->getDesiredCoordinators( &tr, - old.coordinators(), + oldCoordinators, Reference(new ClusterConnectionMemoryRecord(old)), result)); diff --git a/fdbclient/StatusClient.actor.cpp b/fdbclient/StatusClient.actor.cpp index 6b329ba1b9..653b8496e8 100644 --- a/fdbclient/StatusClient.actor.cpp +++ b/fdbclient/StatusClient.actor.cpp @@ -307,23 +307,35 @@ ACTOR Future> clientCoordinatorsStatusFetcher(ReferenceresolveHostnames()); state ClientCoordinators coord(connRecord); state StatusObject statusObj; state std::vector>> leaderServers; leaderServers.reserve(coord.clientLeaderServers.size()); - for (int i = 0; i < coord.clientLeaderServers.size(); i++) - leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader, - GetLeaderRequest(coord.clusterKey, UID()), - TaskPriority::CoordinationReply)); + for (int i = 0; i < coord.clientLeaderServers.size(); i++) { + if (coord.clientLeaderServers[i].hostname.present()) { + leaderServers.push_back(retryGetReplyFromHostname(GetLeaderRequest(coord.clusterKey, UID()), + coord.clientLeaderServers[i].hostname.get(), + WLTOKEN_CLIENTLEADERREG_GETLEADER, + TaskPriority::CoordinationReply)); + } else { + leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader, + GetLeaderRequest(coord.clusterKey, UID()), + TaskPriority::CoordinationReply)); + } + } state std::vector> coordProtocols; coordProtocols.reserve(coord.clientLeaderServers.size()); for (int i = 0; i < coord.clientLeaderServers.size(); i++) { - RequestStream requestStream{ Endpoint::wellKnown( - { coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO) }; - coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{})); + if (coord.clientLeaderServers[i].hostname.present()) { + coordProtocols.push_back(retryGetReplyFromHostname( + ProtocolInfoRequest{}, coord.clientLeaderServers[i].hostname.get(), WLTOKEN_PROTOCOL_INFO)); + } else { + RequestStream requestStream{ Endpoint::wellKnown( + { coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO) }; + coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{})); + } } wait(smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.5) && @@ -337,8 +349,12 @@ ACTOR Future> clientCoordinatorsStatusFetcher(Reference retryBrokenPromise(RequestStream to, Req request } } +ACTOR template +Future tryInitializeRequestStream(RequestStream* stream, Hostname hostname, WellKnownEndpoints token) { + Optional address = wait(hostname.resolve()); + if (!address.present()) { + return Void(); + } + if (stream == nullptr) { + stream = new RequestStream(Endpoint::wellKnown({ address.get() }, token)); + } else { + *stream = RequestStream(Endpoint::wellKnown({ address.get() }, token)); + } + return Void(); +} + ACTOR template Future> tryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token) { // A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname. diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index a8160f82a5..4be2faca3f 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1110,10 +1110,10 @@ ACTOR Future registerWorker(RegisterWorkerRequest req, newPriorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController); bool isCoordinator = - (std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), req.wi.address()) != + (std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), w.address()) != coordinatorAddresses.end()) || - (req.wi.secondaryAddress().present() && - std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), req.wi.secondaryAddress().get()) != + (w.secondaryAddress().present() && + std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), w.secondaryAddress().get()) != coordinatorAddresses.end()); for (auto it : req.incompatiblePeers) { @@ -2481,12 +2481,11 @@ ACTOR Future workerHealthMonitor(ClusterControllerData* self) { } } -ACTOR Future clusterControllerCore(Reference connRecord, - ClusterControllerFullInterface interf, +ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, Future leaderFail, + ServerCoordinators coordinators, LocalityData locality, ConfigDBType configDBType) { - state ServerCoordinators coordinators(connRecord); state ClusterControllerData self(interf, locality, coordinators); state ConfigBroadcaster configBroadcaster(coordinators, configDBType); state Future coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY); @@ -2621,7 +2620,7 @@ ACTOR Future replaceInterface(ClusterControllerFullInterface interf) { } } -ACTOR Future clusterController(Reference connRecord, +ACTOR Future clusterController(ServerCoordinators coordinators, Reference>> currentCC, bool hasConnected, Reference> asyncPriorityInfo, @@ -2632,10 +2631,9 @@ ACTOR Future clusterController(Reference connRec state bool inRole = false; cci.initEndpoints(); try { - wait(connRecord->resolveHostnames()); // Register as a possible leader; wait to be elected state Future leaderFail = - tryBecomeLeader(connRecord, cci, currentCC, hasConnected, asyncPriorityInfo); + tryBecomeLeader(coordinators, cci, currentCC, hasConnected, asyncPriorityInfo); state Future shouldReplace = replaceInterface(cci); while (!currentCC->get().present() || currentCC->get().get() != cci) { @@ -2654,7 +2652,7 @@ ACTOR Future clusterController(Reference connRec startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID()); inRole = true; - wait(clusterControllerCore(connRecord, cci, leaderFail, locality, configDBType)); + wait(clusterControllerCore(cci, leaderFail, coordinators, locality, configDBType)); } } catch (Error& e) { if (inRole) @@ -2683,7 +2681,8 @@ ACTOR Future clusterController(Reference connRec state bool hasConnected = false; loop { try { - wait(clusterController(connRecord, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType)); + ServerCoordinators coordinators(connRecord); + wait(clusterController(coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType)); hasConnected = true; } catch (Error& e) { if (e.code() != error_code_coordinators_changed) diff --git a/fdbserver/ClusterRecovery.actor.cpp b/fdbserver/ClusterRecovery.actor.cpp index 10ed726809..86e22f7776 100644 --- a/fdbserver/ClusterRecovery.actor.cpp +++ b/fdbserver/ClusterRecovery.actor.cpp @@ -537,8 +537,7 @@ ACTOR Future changeCoordinators(Reference self) { } try { - state ClusterConnectionString conn(changeCoordinatorsRequest.newConnectionString.toString()); - wait(conn.resolveHostnames()); + ClusterConnectionString conn(changeCoordinatorsRequest.newConnectionString.toString()); wait(self->cstate.move(conn)); } catch (Error& e) { if (e.code() != error_code_actor_cancelled) diff --git a/fdbserver/CoordinatedState.actor.cpp b/fdbserver/CoordinatedState.actor.cpp index 7a2c5e028c..72e75bf3be 100644 --- a/fdbserver/CoordinatedState.actor.cpp +++ b/fdbserver/CoordinatedState.actor.cpp @@ -26,21 +26,29 @@ #include "fdbserver/LeaderElection.h" #include "flow/actorcompiler.h" // has to be last include -ACTOR Future waitAndSendRead(RequestStream to, - GenerationRegReadRequest req) { +ACTOR Future waitAndSendRead(GenerationRegInterface stateServer, GenerationRegReadRequest req) { if (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) wait(delay(SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY * deterministicRandom()->random01())); - state GenerationRegReadReply reply = wait(retryBrokenPromise(to, req)); + state GenerationRegReadReply reply; + if (stateServer.hostname.present()) { + wait(store(reply, retryGetReplyFromHostname(req, stateServer.hostname.get(), WLTOKEN_GENERATIONREG_READ))); + } else { + wait(store(reply, retryBrokenPromise(stateServer.read, req))); + } if (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) wait(delay(SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY * deterministicRandom()->random01())); return reply; } -ACTOR Future waitAndSendWrite(RequestStream to, - GenerationRegWriteRequest req) { +ACTOR Future waitAndSendWrite(GenerationRegInterface stateServer, GenerationRegWriteRequest req) { if (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) wait(delay(SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY * deterministicRandom()->random01())); - state UniqueGeneration reply = wait(retryBrokenPromise(to, req)); + state UniqueGeneration reply; + if (stateServer.hostname.present()) { + wait(store(reply, retryGetReplyFromHostname(req, stateServer.hostname.get(), WLTOKEN_GENERATIONREG_WRITE))); + } else { + wait(store(reply, retryBrokenPromise(stateServer.write, req))); + } if (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) wait(delay(SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY * deterministicRandom()->random01())); return reply; @@ -152,7 +160,7 @@ struct CoordinatedStateImpl { state std::vector> rep_reply; for (int i = 0; i < replicas.size(); i++) { Future reply = - waitAndSendRead(replicas[i].read, GenerationRegReadRequest(req.key, req.gen)); + waitAndSendRead(replicas[i], GenerationRegReadRequest(req.key, req.gen)); rep_empty_reply.push_back(nonemptyToNever(reply)); rep_reply.push_back(emptyToNever(reply)); self->ac.add(success(reply)); @@ -192,8 +200,7 @@ struct CoordinatedStateImpl { state std::vector& replicas = self->coordinators.stateServers; state std::vector> wrep_reply; for (int i = 0; i < replicas.size(); i++) { - Future reply = - waitAndSendWrite(replicas[i].write, GenerationRegWriteRequest(req.kv, req.gen)); + Future reply = waitAndSendWrite(replicas[i], GenerationRegWriteRequest(req.kv, req.gen)); wrep_reply.push_back(reply); self->ac.add(success(reply)); } diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index 53086e5516..a3746c4f5e 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -98,12 +98,16 @@ LeaderElectionRegInterface::LeaderElectionRegInterface(INetwork* local) : Client } ServerCoordinators::ServerCoordinators(Reference ccr) : ClientCoordinators(ccr) { - ASSERT(ccr->connectionStringStatus() == ClusterConnectionString::RESOLVED); ClusterConnectionString cs = ccr->getConnectionString(); - for (auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s) { - leaderElectionServers.emplace_back(*s); - stateServers.emplace_back(*s); - configServers.emplace_back(*s); + for (auto h : cs.hostnames) { + leaderElectionServers.emplace_back(h); + stateServers.emplace_back(h); + configServers.emplace_back(h); + } + for (auto s : cs.coordinators()) { + leaderElectionServers.emplace_back(s); + stateServers.emplace_back(s); + configServers.emplace_back(s); } } @@ -208,10 +212,8 @@ ACTOR Future openDatabase(ClientData* db, int* clientCount, Reference> hasConnectedClients, OpenDatabaseCoordRequest req, - Future checkStuck, - Reference> coordinatorsChanged) { + Future checkStuck) { state ErrorOr> replyContents; - state Future coordinatorsChangedOnChange = coordinatorsChanged->onChange(); state Future clientInfoOnChange = db->clientInfo->onChange(); ++(*clientCount); @@ -233,11 +235,6 @@ ACTOR Future openDatabase(ClientData* db, clientInfoOnChange = db->clientInfo->onChange(); replyContents = db->clientInfo->get(); } - when(wait(coordinatorsChangedOnChange)) { - coordinatorsChangedOnChange = coordinatorsChanged->onChange(); - replyContents = coordinators_changed(); - break; - } when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) { if (db->clientInfo->get().read().id.isValid()) { replyContents = db->clientInfo->get(); @@ -268,10 +265,7 @@ ACTOR Future openDatabase(ClientData* db, ACTOR Future remoteMonitorLeader(int* clientCount, Reference> hasConnectedClients, Reference>> currentElectedLeader, - ElectionResultRequest req, - Reference> coordinatorsChanged) { - state bool coordinatorsChangeDetected = false; - state Future coordinatorsChangedOnChange = coordinatorsChanged->onChange(); + ElectionResultRequest req) { state Future currentElectedLeaderOnChange = currentElectedLeader->onChange(); ++(*clientCount); hasConnectedClients->set(true); @@ -281,20 +275,11 @@ ACTOR Future remoteMonitorLeader(int* clientCount, when(wait(yieldedFuture(currentElectedLeaderOnChange))) { currentElectedLeaderOnChange = currentElectedLeader->onChange(); } - when(wait(coordinatorsChangedOnChange)) { - coordinatorsChangedOnChange = coordinatorsChanged->onChange(); - coordinatorsChangeDetected = true; - break; - } when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) { break; } } } - if (coordinatorsChangeDetected) { - req.reply.sendError(coordinators_changed()); - } else { - req.reply.send(currentElectedLeader->get()); - } + req.reply.send(currentElectedLeader->get()); if (--(*clientCount) == 0) { hasConnectedClients->set(false); @@ -325,8 +310,6 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { state Reference>> currentElectedLeader = makeReference>>(); state LivenessChecker canConnectToLeader(SERVER_KNOBS->COORDINATOR_LEADER_CONNECTION_TIMEOUT); - state Reference> coordinatorsChanged = makeReference>(); - state Future coordinatorsChangedOnChange = coordinatorsChanged->onChange(); state Future hasConnectedClientsOnChange = hasConnectedClients->onChange(); loop choose { @@ -338,14 +321,10 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { } else { if (!leaderMon.isValid()) { leaderMon = monitorLeaderAndGetClientInfo( - req.clusterKey, req.coordinators, &clientData, currentElectedLeader, coordinatorsChanged); + req.clusterKey, req.hostnames, req.coordinators, &clientData, currentElectedLeader); } - actors.add(openDatabase(&clientData, - &clientCount, - hasConnectedClients, - req, - canConnectToLeader.checkStuck(), - coordinatorsChanged)); + actors.add( + openDatabase(&clientData, &clientCount, hasConnectedClients, req, canConnectToLeader.checkStuck())); } } when(ElectionResultRequest req = waitNext(interf.electionResult.getFuture())) { @@ -355,10 +334,9 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { } else { if (!leaderMon.isValid()) { leaderMon = monitorLeaderAndGetClientInfo( - req.key, req.coordinators, &clientData, currentElectedLeader, coordinatorsChanged); + req.key, req.hostnames, req.coordinators, &clientData, currentElectedLeader); } - actors.add(remoteMonitorLeader( - &clientCount, hasConnectedClients, currentElectedLeader, req, coordinatorsChanged)); + actors.add(remoteMonitorLeader(&clientCount, hasConnectedClients, currentElectedLeader, req)); } } when(GetLeaderRequest req = waitNext(interf.getLeader.getFuture())) { @@ -499,10 +477,6 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { } } when(wait(actors.getResult())) {} - when(wait(coordinatorsChangedOnChange)) { - leaderMon = Future(); - coordinatorsChangedOnChange = coordinatorsChanged->onChange(); - } } } diff --git a/fdbserver/CoordinationInterface.h b/fdbserver/CoordinationInterface.h index b966678cc6..2c8c037afa 100644 --- a/fdbserver/CoordinationInterface.h +++ b/fdbserver/CoordinationInterface.h @@ -153,17 +153,21 @@ struct CandidacyRequest { struct ElectionResultRequest { constexpr static FileIdentifier file_identifier = 11815465; Key key; + std::vector hostnames; std::vector coordinators; UID knownLeader; ReplyPromise> reply; ElectionResultRequest() = default; - ElectionResultRequest(Key key, std::vector coordinators, UID knownLeader) - : key(key), coordinators(std::move(coordinators)), knownLeader(knownLeader) {} + ElectionResultRequest(Key key, + std::vector hostnames, + std::vector coordinators, + UID knownLeader) + : key(key), hostnames(std::move(hostnames)), coordinators(std::move(coordinators)), knownLeader(knownLeader) {} template void serialize(Ar& ar) { - serializer(ar, key, coordinators, knownLeader, reply); + serializer(ar, key, hostnames, coordinators, knownLeader, reply); } }; diff --git a/fdbserver/LeaderElection.actor.cpp b/fdbserver/LeaderElection.actor.cpp index 272e07c1ec..3c23827e71 100644 --- a/fdbserver/LeaderElection.actor.cpp +++ b/fdbserver/LeaderElection.actor.cpp @@ -27,44 +27,29 @@ // Keep trying to become a leader by submitting itself to all coordinators. // Monitor the health of all coordinators at the same time. -// Note: for coordinators whose NetworkAddress is parsed out of a hostname, a connection failure will cause this actor -// to throw `coordinators_changed()` error ACTOR Future submitCandidacy(Key key, LeaderElectionRegInterface coord, LeaderInfo myInfo, UID prevChangeID, AsyncTrigger* nomineeChange, - Optional* nominee, - Optional hostname = Optional()) { + Optional* nominee) { loop { state Optional li; - - if (coord.candidacy.getEndpoint().getPrimaryAddress().fromHostname) { - state ErrorOr> rep = wait(coord.candidacy.tryGetReply( - CandidacyRequest(key, myInfo, nominee->present() ? nominee->get().changeID : UID(), prevChangeID), - TaskPriority::CoordinationReply)); - if (rep.isError()) { - // Connecting to nominee failed, most likely due to connection failed. - TraceEvent("SubmitCandadicyError") - .error(rep.getError()) - .detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname") - .detail("OldAddr", coord.candidacy.getEndpoint().getPrimaryAddress().toString()); - if (rep.getError().code() == error_code_request_maybe_delivered) { - // Delay to prevent tight resolving loop due to outdated DNS cache - wait(delay(FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL)); - throw coordinators_changed(); - } else { - throw rep.getError(); - } - } else if (rep.present()) { - li = rep.get(); - } + if (coord.hostname.present()) { + wait(store( + li, + retryGetReplyFromHostname( + CandidacyRequest(key, myInfo, nominee->present() ? nominee->get().changeID : UID(), prevChangeID), + coord.hostname.get(), + WLTOKEN_LEADERELECTIONREG_CANDIDACY, + TaskPriority::CoordinationReply))); } else { - Optional tmp = wait(retryBrokenPromise( - coord.candidacy, - CandidacyRequest(key, myInfo, nominee->present() ? nominee->get().changeID : UID(), prevChangeID), - TaskPriority::CoordinationReply)); - li = tmp; + wait(store( + li, + retryBrokenPromise( + coord.candidacy, + CandidacyRequest(key, myInfo, nominee->present() ? nominee->get().changeID : UID(), prevChangeID), + TaskPriority::CoordinationReply))); } wait(Future(Void())); // Make sure we weren't cancelled @@ -104,20 +89,26 @@ Future buggifyDelayedAsyncVar(Reference>& var) { ACTOR Future changeLeaderCoordinators(ServerCoordinators coordinators, Value forwardingInfo) { std::vector> forwardRequests; forwardRequests.reserve(coordinators.leaderElectionServers.size()); - for (int i = 0; i < coordinators.leaderElectionServers.size(); i++) - forwardRequests.push_back(retryBrokenPromise(coordinators.leaderElectionServers[i].forward, - ForwardRequest(coordinators.clusterKey, forwardingInfo))); + for (int i = 0; i < coordinators.leaderElectionServers.size(); i++) { + if (coordinators.leaderElectionServers[i].hostname.present()) { + forwardRequests.push_back(retryGetReplyFromHostname(ForwardRequest(coordinators.clusterKey, forwardingInfo), + coordinators.leaderElectionServers[i].hostname.get(), + WLTOKEN_LEADERELECTIONREG_FORWARD)); + } else { + forwardRequests.push_back(retryBrokenPromise(coordinators.leaderElectionServers[i].forward, + ForwardRequest(coordinators.clusterKey, forwardingInfo))); + } + } int quorum_size = forwardRequests.size() / 2 + 1; wait(quorum(forwardRequests, quorum_size)); return Void(); } -ACTOR Future tryBecomeLeaderInternal(Reference connRecord, +ACTOR Future tryBecomeLeaderInternal(ServerCoordinators coordinators, Value proposedSerializedInterface, Reference> outSerializedLeader, bool hasConnected, Reference> asyncPriorityInfo) { - state ServerCoordinators coordinators(connRecord); state AsyncTrigger nomineeChange; state std::vector> nominees; state LeaderInfo myInfo; @@ -134,6 +125,8 @@ ACTOR Future tryBecomeLeaderInternal(Reference c wait(delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY)); } + nominees.resize(coordinators.leaderElectionServers.size()); + myInfo.serializedInfo = proposedSerializedInterface; outSerializedLeader->set(Value()); @@ -141,9 +134,6 @@ ACTOR Future tryBecomeLeaderInternal(Reference c (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) ? buggifyDelayedAsyncVar(outSerializedLeader) : Void(); while (!iAmLeader) { - wait(connRecord->resolveHostnames()); - coordinators = ServerCoordinators(connRecord); - nominees.resize(coordinators.leaderElectionServers.size()); state Future badCandidateTimeout; myInfo.changeID = deterministicRandom()->randomUniqueID(); @@ -153,19 +143,12 @@ ACTOR Future tryBecomeLeaderInternal(Reference c std::vector> cand; cand.reserve(coordinators.leaderElectionServers.size()); for (int i = 0; i < coordinators.leaderElectionServers.size(); i++) { - Optional hostname; - auto r = connRecord->getConnectionString().networkAddressToHostname.find( - coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress()); - if (r != connRecord->getConnectionString().networkAddressToHostname.end()) { - hostname = r->second; - } cand.push_back(submitCandidacy(coordinators.clusterKey, coordinators.leaderElectionServers[i], myInfo, prevChangeID, &nomineeChange, - &nominees[i], - hostname)); + &nominees[i])); } candidacies = waitForAll(cand); @@ -220,24 +203,15 @@ ACTOR Future tryBecomeLeaderInternal(Reference c } else badCandidateTimeout = Future(); - try { - choose { - when(wait(nomineeChange.onTrigger())) {} - when(wait(badCandidateTimeout.isValid() ? badCandidateTimeout : Never())) { - TEST(true); // Bad candidate timeout - TraceEvent("LeaderBadCandidateTimeout", myInfo.changeID).log(); - break; - } - when(wait(candidacies)) { ASSERT(false); } - when(wait(asyncPriorityInfo->onChange())) { break; } - } - } catch (Error& e) { - if (e.code() == error_code_coordinators_changed) { - connRecord->getConnectionString().resetToUnresolved(); + choose { + when(wait(nomineeChange.onTrigger())) {} + when(wait(badCandidateTimeout.isValid() ? badCandidateTimeout : Never())) { + TEST(true); // Bad candidate timeout + TraceEvent("LeaderBadCandidateTimeout", myInfo.changeID).log(); break; - } else { - throw e; } + when(wait(candidacies)) { ASSERT(false); } + when(wait(asyncPriorityInfo->onChange())) { break; } } } @@ -258,10 +232,17 @@ ACTOR Future tryBecomeLeaderInternal(Reference c state std::vector> true_heartbeats; state std::vector> false_heartbeats; for (int i = 0; i < coordinators.leaderElectionServers.size(); i++) { - Future hb = - retryBrokenPromise(coordinators.leaderElectionServers[i].leaderHeartbeat, - LeaderHeartbeatRequest(coordinators.clusterKey, myInfo, prevChangeID), - TaskPriority::CoordinationReply); + Future hb; + if (coordinators.leaderElectionServers[i].hostname.present()) { + hb = retryGetReplyFromHostname(LeaderHeartbeatRequest(coordinators.clusterKey, myInfo, prevChangeID), + coordinators.leaderElectionServers[i].hostname.get(), + WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT, + TaskPriority::CoordinationReply); + } else { + hb = retryBrokenPromise(coordinators.leaderElectionServers[i].leaderHeartbeat, + LeaderHeartbeatRequest(coordinators.clusterKey, myInfo, prevChangeID), + TaskPriority::CoordinationReply); + } true_heartbeats.push_back(onEqual(hb, LeaderHeartbeatReply{ true })); false_heartbeats.push_back(onEqual(hb, LeaderHeartbeatReply{ false })); } diff --git a/fdbserver/LeaderElection.h b/fdbserver/LeaderElection.h index 9639116ec5..ad5d959ce5 100644 --- a/fdbserver/LeaderElection.h +++ b/fdbserver/LeaderElection.h @@ -37,7 +37,7 @@ class ServerCoordinators; // eventually be set. If the return value is cancelled, the candidacy or leadership of the proposedInterface // will eventually end. template -Future tryBecomeLeader(Reference const& connRecord, +Future tryBecomeLeader(ServerCoordinators const& coordinators, LeaderInterface const& proposedInterface, Reference>> const& outKnownLeader, bool hasConnected, @@ -50,20 +50,20 @@ Future changeLeaderCoordinators(ServerCoordinators const& coordinators, Va #pragma region Implementation #endif // __INTEL_COMPILER -Future tryBecomeLeaderInternal(Reference const& connRecord, +Future tryBecomeLeaderInternal(ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference> const& outSerializedLeader, bool const& hasConnected, Reference> const& asyncPriorityInfo); template -Future tryBecomeLeader(Reference const& connRecord, +Future tryBecomeLeader(ServerCoordinators const& coordinators, LeaderInterface const& proposedInterface, Reference>> const& outKnownLeader, bool hasConnected, Reference> const& asyncPriorityInfo) { auto serializedInfo = makeReference>(); - Future m = tryBecomeLeaderInternal(connRecord, + Future m = tryBecomeLeaderInternal(coordinators, ObjectWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, diff --git a/fdbserver/PaxosConfigConsumer.actor.cpp b/fdbserver/PaxosConfigConsumer.actor.cpp index 934efccfa3..eb8e4f70f3 100644 --- a/fdbserver/PaxosConfigConsumer.actor.cpp +++ b/fdbserver/PaxosConfigConsumer.actor.cpp @@ -99,8 +99,17 @@ class GetCommittedVersionQuorum { // Now roll node forward to match the largest committed version of // the replies. - state Reference quorumCfi(new ConfigFollowerInfo(self->replies[target])); try { + state std::vector interfs = self->replies[target]; + std::vector> fs; + for (ConfigFollowerInterface& interf : interfs) { + if (interf.hostname.present()) { + fs.push_back(tryInitializeRequestStream( + &interf.getChanges, interf.hostname.get(), WLTOKEN_CONFIGFOLLOWER_GETCHANGES)); + } + } + wait(waitForAll(fs)); + state Reference quorumCfi(new ConfigFollowerInfo(interfs)); state Version lastSeenVersion = std::max( rollback.present() ? rollback.get() : nodeVersion.lastCommitted, self->largestCompactedResponse); ConfigFollowerGetChangesReply reply = @@ -129,9 +138,18 @@ class GetCommittedVersionQuorum { ACTOR static Future getCommittedVersionActor(GetCommittedVersionQuorum* self, ConfigFollowerInterface cfi) { try { - ConfigFollowerGetCommittedVersionReply reply = - wait(timeoutError(cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}), - SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT)); + state ConfigFollowerGetCommittedVersionReply reply; + if (cfi.hostname.present()) { + wait(timeoutError(store(reply, + retryGetReplyFromHostname(ConfigFollowerGetCommittedVersionRequest{}, + cfi.hostname.get(), + WLTOKEN_CONFIGFOLLOWER_GETCOMMITTEDVERSION)), + SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT)); + } else { + wait(timeoutError( + store(reply, cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{})), + SERVER_KNOBS->GET_COMMITTED_VERSION_TIMEOUT)); + } ++self->totalRepliesReceived; self->largestCompactedResponse = std::max(self->largestCompactedResponse, reply.lastCompacted); @@ -279,7 +297,15 @@ class PaxosConfigConsumerImpl { std::vector> compactionRequests; compactionRequests.reserve(compactionRequests.size()); for (const auto& cfi : self->cfis) { - compactionRequests.push_back(cfi.compact.getReply(ConfigFollowerCompactRequest{ compactionVersion })); + if (cfi.hostname.present()) { + compactionRequests.push_back( + retryGetReplyFromHostname(ConfigFollowerCompactRequest{ compactionVersion }, + cfi.hostname.get(), + WLTOKEN_CONFIGFOLLOWER_COMPACT)); + } else { + compactionRequests.push_back( + cfi.compact.getReply(ConfigFollowerCompactRequest{ compactionVersion })); + } } try { wait(timeoutError(waitForAll(compactionRequests), 1.0)); @@ -294,8 +320,18 @@ class PaxosConfigConsumerImpl { self->resetCommittedVersionQuorum(); // TODO: This seems to fix a segfault, investigate more try { state Version committedVersion = wait(getCommittedVersion(self)); - state Reference configNodes( - new ConfigFollowerInfo(self->getCommittedVersionQuorum.getReadReplicas())); + state std::vector readReplicas = + self->getCommittedVersionQuorum.getReadReplicas(); + std::vector> fs; + for (ConfigFollowerInterface& readReplica : readReplicas) { + if (readReplica.hostname.present()) { + fs.push_back(tryInitializeRequestStream(&readReplica.getSnapshotAndChanges, + readReplica.hostname.get(), + WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOTANDCHANGES)); + } + } + wait(waitForAll(fs)); + state Reference configNodes(new ConfigFollowerInfo(readReplicas)); ConfigFollowerGetSnapshotAndChangesReply reply = wait(timeoutError(basicLoadBalance(configNodes, &ConfigFollowerInterface::getSnapshotAndChanges, @@ -349,8 +385,18 @@ class PaxosConfigConsumerImpl { // returned would be 1. if (committedVersion > self->lastSeenVersion) { ASSERT(self->getCommittedVersionQuorum.getReadReplicas().size() >= self->cfis.size() / 2 + 1); - state Reference configNodes( - new ConfigFollowerInfo(self->getCommittedVersionQuorum.getReadReplicas())); + state std::vector readReplicas = + self->getCommittedVersionQuorum.getReadReplicas(); + std::vector> fs; + for (ConfigFollowerInterface& readReplica : readReplicas) { + if (readReplica.hostname.present()) { + fs.push_back(tryInitializeRequestStream(&readReplica.getChanges, + readReplica.hostname.get(), + WLTOKEN_CONFIGFOLLOWER_GETCHANGES)); + } + } + wait(waitForAll(fs)); + state Reference configNodes(new ConfigFollowerInfo(readReplicas)); ConfigFollowerGetChangesReply reply = wait(timeoutError( basicLoadBalance(configNodes, &ConfigFollowerInterface::getChanges, diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index b69d7930ee..014e091b79 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -161,9 +161,8 @@ ACTOR Future> getCoordWorkers(Database cx, if (!coordinators.present()) { throw operation_failed(); } - state ClusterConnectionString ccs(coordinators.get().toString()); - wait(ccs.resolveHostnames()); - std::vector coordinatorsAddr = ccs.coordinators(); + ClusterConnectionString ccs(coordinators.get().toString()); + std::vector coordinatorsAddr = wait(ccs.tryResolveHostnames()); std::set coordinatorsAddrSet; for (const auto& addr : coordinatorsAddr) { TraceEvent(SevDebug, "CoordinatorAddress").detail("Addr", addr); diff --git a/fdbserver/SimpleConfigConsumer.actor.cpp b/fdbserver/SimpleConfigConsumer.actor.cpp index a2cbfba9db..ddad99c80d 100644 --- a/fdbserver/SimpleConfigConsumer.actor.cpp +++ b/fdbserver/SimpleConfigConsumer.actor.cpp @@ -44,15 +44,29 @@ class SimpleConfigConsumerImpl { loop { state Version compactionVersion = self->lastSeenVersion; wait(delayJittered(self->compactionInterval.get())); - wait(self->cfi.compact.getReply(ConfigFollowerCompactRequest{ compactionVersion })); + if (self->cfi.hostname.present()) { + wait(retryGetReplyFromHostname(ConfigFollowerCompactRequest{ compactionVersion }, + self->cfi.hostname.get(), + WLTOKEN_CONFIGFOLLOWER_COMPACT)); + } else { + wait(self->cfi.compact.getReply(ConfigFollowerCompactRequest{ compactionVersion })); + } ++self->compactRequest; broadcaster->compact(compactionVersion); } } ACTOR static Future getCommittedVersion(SimpleConfigConsumerImpl* self) { - ConfigFollowerGetCommittedVersionReply committedVersionReply = - wait(self->cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{})); + state ConfigFollowerGetCommittedVersionReply committedVersionReply; + if (self->cfi.hostname.present()) { + wait(store(committedVersionReply, + retryGetReplyFromHostname(ConfigFollowerGetCommittedVersionRequest{}, + self->cfi.hostname.get(), + WLTOKEN_CONFIGFOLLOWER_GETCOMMITTEDVERSION))); + } else { + wait(store(committedVersionReply, + self->cfi.getCommittedVersion.getReply(ConfigFollowerGetCommittedVersionRequest{}))); + } return committedVersionReply.lastCommitted; } @@ -63,8 +77,18 @@ class SimpleConfigConsumerImpl { state Version committedVersion = wait(getCommittedVersion(self)); ASSERT_GE(committedVersion, self->lastSeenVersion); if (committedVersion > self->lastSeenVersion) { - ConfigFollowerGetChangesReply reply = wait(self->cfi.getChanges.getReply( - ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion })); + state ConfigFollowerGetChangesReply reply; + if (self->cfi.hostname.present()) { + wait(store(reply, + retryGetReplyFromHostname( + ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }, + self->cfi.hostname.get(), + WLTOKEN_CONFIGFOLLOWER_GETCHANGES))); + } else { + wait(store(reply, + self->cfi.getChanges.getReply( + ConfigFollowerGetChangesRequest{ self->lastSeenVersion, committedVersion }))); + } ++self->successfulChangeRequest; for (const auto& versionedMutation : reply.changes) { TraceEvent te(SevDebug, "ConsumerFetchedMutation", self->id); @@ -96,8 +120,17 @@ class SimpleConfigConsumerImpl { ACTOR static Future getSnapshotAndChanges(SimpleConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) { state Version committedVersion = wait(getCommittedVersion(self)); - ConfigFollowerGetSnapshotAndChangesReply reply = wait( - self->cfi.getSnapshotAndChanges.getReply(ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion })); + state ConfigFollowerGetSnapshotAndChangesReply reply; + if (self->cfi.hostname.present()) { + wait(store(reply, + retryGetReplyFromHostname(ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }, + self->cfi.hostname.get(), + WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOTANDCHANGES))); + } else { + wait(store(reply, + self->cfi.getSnapshotAndChanges.getReply( + ConfigFollowerGetSnapshotAndChangesRequest{ committedVersion }))); + } ++self->snapshotRequest; TraceEvent(SevDebug, "ConfigConsumerGotSnapshotAndChanges", self->id) .detail("SnapshotVersion", reply.snapshotVersion) diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 4a183ac1de..9e825aa276 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -1980,8 +1980,8 @@ void setupSimulatedSystem(std::vector>* systemActors, TEST(useIPv6); // Use IPv6 TEST(!useIPv6); // Use IPv4 - // TODO(renxuan): Use hostname 25% of the time, unless it is disabled - bool useHostname = false; // !testConfig.disableHostname && deterministicRandom()->random01() < 0.25; + // Use hostname 25% of the time, unless it is disabled + bool useHostname = !testConfig.disableHostname && deterministicRandom()->random01() < 0.25; TEST(useHostname); // Use hostname TEST(!useHostname); // Use IP address NetworkAddressFromHostname fromHostname = diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index f086265d49..52ef7c20b5 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -831,7 +831,8 @@ ACTOR static Future processStatusFetcher( } } - for (auto& coordinator : coordinators.ccr->getConnectionString().coordinators()) { + std::vector addressVec = wait(coordinators.ccr->getConnectionString().tryResolveHostnames()); + for (const auto& coordinator : addressVec) { roles.addCoordinatorRole(coordinator); } @@ -1689,8 +1690,7 @@ static JsonBuilderObject configurationFetcher(Optional co } statusObj["excluded_servers"] = excludedServersArr; } - std::vector coordinatorLeaderServers = coordinators.clientLeaderServers; - int count = coordinatorLeaderServers.size(); + int count = coordinators.clientLeaderServers.size(); statusObj["coordinators_count"] = count; } catch (Error&) { incomplete_reasons->insert("Could not retrieve all configuration status information."); @@ -2505,7 +2505,8 @@ static JsonBuilderArray tlogFetcher(int* logFaultTolerance, static JsonBuilderObject faultToleranceStatusFetcher(DatabaseConfiguration configuration, ServerCoordinators coordinators, - std::vector& workers, + const std::vector& coordinatorAddresses, + const std::vector& workers, int extraTlogEligibleZones, int minStorageReplicasRemaining, int oldLogFaultTolerance, @@ -2521,11 +2522,11 @@ static JsonBuilderObject faultToleranceStatusFetcher(DatabaseConfiguration confi int maxCoordinatorFailures = (coordinators.clientLeaderServers.size() - 1) / 2; std::map workerZones; - for (auto& worker : workers) { + for (const auto& worker : workers) { workerZones[worker.interf.address()] = worker.interf.locality.zoneId().orDefault(LiteralStringRef("")); } std::map coordinatorZoneCounts; - for (auto& coordinator : coordinators.ccr->getConnectionString().coordinators()) { + for (const auto& coordinator : coordinatorAddresses) { auto zone = workerZones[coordinator]; coordinatorZoneCounts[zone] += 1; } @@ -3061,6 +3062,9 @@ ACTOR Future clusterGetStatus( state std::vector workerStatuses = wait(getAll(futures2)); wait(success(primaryDCFO)); + std::vector coordinatorAddresses = + wait(coordinators.ccr->getConnectionString().tryResolveHostnames()); + int logFaultTolerance = 100; if (db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) { statusObj["logs"] = tlogFetcher(&logFaultTolerance, db, address_workers); @@ -3070,6 +3074,7 @@ ACTOR Future clusterGetStatus( statusObj["fault_tolerance"] = faultToleranceStatusFetcher(configuration.get(), coordinators, + coordinatorAddresses, workers, extraTlogEligibleZones, minStorageReplicasRemaining, diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 3a2cc88f0a..1a2f244b0b 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -859,9 +859,9 @@ std::pair buildNetworkAddresses( NetworkAddressList publicNetworkAddresses; NetworkAddressList listenNetworkAddresses; - connectionRecord.resolveHostnamesBlocking(); - auto& coordinators = connectionRecord.getConnectionString().coordinators(); - ASSERT(coordinators.size() > 0); + std::vector& hostnames = connectionRecord.getConnectionString().hostnames; + const std::vector& coords = connectionRecord.getConnectionString().coordinators(); + ASSERT(hostnames.size() + coords.size() > 0); for (int ii = 0; ii < publicAddressStrs.size(); ++ii) { const std::string& publicAddressStr = publicAddressStrs[ii]; @@ -930,13 +930,26 @@ std::pair buildNetworkAddresses( listenNetworkAddresses.secondaryAddress = currentListenAddress; } - bool hasSameCoord = std::all_of(coordinators.begin(), coordinators.end(), [&](const NetworkAddress& address) { + bool matchCoordinatorsTls = std::all_of(coords.begin(), coords.end(), [&](const NetworkAddress& address) { if (address.ip == currentPublicAddress.ip && address.port == currentPublicAddress.port) { return address.isTLS() == currentPublicAddress.isTLS(); } return true; }); - if (!hasSameCoord) { + // If true, further check hostnames. + if (matchCoordinatorsTls) { + matchCoordinatorsTls = std::all_of(hostnames.begin(), hostnames.end(), [&](Hostname& hostname) { + Optional resolvedAddress = hostname.resolveBlocking(); + if (resolvedAddress.present()) { + NetworkAddress address = resolvedAddress.get(); + if (address.ip == currentPublicAddress.ip && address.port == currentPublicAddress.port) { + return address.isTLS() == currentPublicAddress.isTLS(); + } + } + return true; + }); + } + if (!matchCoordinatorsTls) { fprintf(stderr, "ERROR: TLS state of public address %s does not match in coordinator list.\n", publicAddressStr.c_str()); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index a777588037..932a6d9a8a 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -2977,21 +2977,43 @@ ACTOR Future monitorLeaderWithDelayedCandidacyImplOneGenerati Reference connRecord, Reference> result, MonitorLeaderInfo info) { - state ClusterConnectionString ccf = info.intermediateConnRecord->getConnectionString(); - state std::vector addrs = ccf.coordinators(); + ClusterConnectionString cs = info.intermediateConnRecord->getConnectionString(); + std::vector hostnames; + state int coordinatorsSize = cs.hostnames.size() + cs.coordinators().size(); state ElectionResultRequest request; state int index = 0; state int successIndex = 0; - request.key = ccf.clusterKey(); - request.coordinators = ccf.coordinators(); + state std::vector leaderElectionServers; - deterministicRandom()->randomShuffle(addrs); + hostnames.reserve(cs.hostnames.size()); + leaderElectionServers.reserve(coordinatorsSize); + for (const auto& h : cs.hostnames) { + hostnames.push_back(h); + leaderElectionServers.push_back(LeaderElectionRegInterface(h)); + } + for (const auto& c : cs.coordinators()) { + leaderElectionServers.push_back(LeaderElectionRegInterface(c)); + } + deterministicRandom()->randomShuffle(leaderElectionServers); + + request.key = cs.clusterKey(); + request.hostnames = hostnames; + request.coordinators = cs.coordinators(); loop { - LeaderElectionRegInterface interf(addrs[index]); + LeaderElectionRegInterface interf = leaderElectionServers[index]; + bool usingHostname = interf.hostname.present(); request.reply = ReplyPromise>(); - ErrorOr> leader = wait(interf.electionResult.tryGetReply(request)); + state ErrorOr> leader; + if (usingHostname) { + wait(store( + leader, + tryGetReplyFromHostname(request, interf.hostname.get(), WLTOKEN_LEADERELECTIONREG_ELECTIONRESULT))); + } else { + wait(store(leader, interf.electionResult.tryGetReply(request))); + } + if (leader.present()) { if (leader.get().present()) { if (leader.get().get().forward) { @@ -3027,14 +3049,9 @@ ACTOR Future monitorLeaderWithDelayedCandidacyImplOneGenerati } successIndex = index; } else { - if (leader.isError() && leader.getError().code() == error_code_coordinators_changed) { - info.intermediateConnRecord->getConnectionString().resetToUnresolved(); - throw coordinators_changed(); - } - index = (index + 1) % addrs.size(); + index = (index + 1) % coordinatorsSize; if (index == successIndex) { wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY)); - throw coordinators_changed(); } } } @@ -3042,22 +3059,11 @@ ACTOR Future monitorLeaderWithDelayedCandidacyImplOneGenerati ACTOR Future monitorLeaderWithDelayedCandidacyImplInternal(Reference connRecord, Reference> outSerializedLeaderInfo) { - wait(connRecord->resolveHostnames()); state MonitorLeaderInfo info(connRecord); loop { - try { - wait(info.intermediateConnRecord->resolveHostnames()); - MonitorLeaderInfo _info = - wait(monitorLeaderWithDelayedCandidacyImplOneGeneration(connRecord, outSerializedLeaderInfo, info)); - info = _info; - } catch (Error& e) { - if (e.code() == error_code_coordinators_changed) { - TraceEvent("MonitorLeaderWithDelayedCandidacyCoordinatorsChanged").suppressFor(1.0); - info.intermediateConnRecord->getConnectionString().resetToUnresolved(); - } else { - throw e; - } - } + MonitorLeaderInfo _info = + wait(monitorLeaderWithDelayedCandidacyImplOneGeneration(connRecord, outSerializedLeaderInfo, info)); + info = _info; } } @@ -3191,6 +3197,7 @@ ACTOR Future fdbd(Reference connRecord, actors.push_back(serveProcess()); try { + ServerCoordinators coordinators(connRecord); if (g_network->isSimulated()) { whitelistBinPaths = ",, random_path, /bin/snap_create.sh,,"; } diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index 16d39a7f1e..fd352ab299 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -2096,7 +2096,8 @@ struct ConsistencyCheckWorkload : TestWorkload { return false; } - state ClusterConnectionString old(currentKey.get().toString()); + ClusterConnectionString old(currentKey.get().toString()); + state std::vector oldCoordinators = wait(old.tryResolveHostnames()); std::vector workers = wait(::getWorkers(&tr)); @@ -2106,7 +2107,7 @@ struct ConsistencyCheckWorkload : TestWorkload { } std::set>> checkDuplicates; - for (const auto& addr : old.coordinators()) { + for (const auto& addr : oldCoordinators) { auto findResult = addr_locality.find(addr); if (findResult != addr_locality.end()) { if (checkDuplicates.count(findResult->second.zoneId())) { diff --git a/fdbserver/workloads/RemoveServersSafely.actor.cpp b/fdbserver/workloads/RemoveServersSafely.actor.cpp index d2550394d1..6be4a58cee 100644 --- a/fdbserver/workloads/RemoveServersSafely.actor.cpp +++ b/fdbserver/workloads/RemoveServersSafely.actor.cpp @@ -541,7 +541,12 @@ struct RemoveServersSafelyWorkload : TestWorkload { state AddressExclusion coordExcl; // Exclude a coordinator under buggify, but only if fault tolerance is > 0 and kill set is non-empty already if (BUGGIFY && toKill.size()) { - std::vector coordinators = wait(getCoordinators(cx)); + Optional csOptional = wait(getConnectionString(cx)); + state std::vector coordinators; + if (csOptional.present()) { + ClusterConnectionString cs = csOptional.get(); + wait(store(coordinators, cs.tryResolveHostnames())); + } if (coordinators.size() > 2) { auto randomCoordinator = deterministicRandom()->randomChoice(coordinators); coordExcl = AddressExclusion(randomCoordinator.ip, randomCoordinator.port); diff --git a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp index 47698572b3..cd5d2b7d80 100644 --- a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp +++ b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp @@ -957,9 +957,9 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { boost::split( process_addresses, coordinator_processes_key.get().toString(), [](char c) { return c == ','; }); ASSERT(process_addresses.size() == cs.coordinators().size() + cs.hostnames.size()); - wait(cs.resolveHostnames()); // compare the coordinator process network addresses one by one - for (const auto& network_address : cs.coordinators()) { + std::vector coordinators = wait(cs.tryResolveHostnames()); + for (const auto& network_address : coordinators) { ASSERT(std::find(process_addresses.begin(), process_addresses.end(), network_address.toString()) != process_addresses.end()); } @@ -1077,19 +1077,20 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); Optional res = wait(tx->get(coordinatorsKey)); ASSERT(res.present()); // Otherwise, database is in a bad state - state ClusterConnectionString csNew(res.get().toString()); - wait(csNew.resolveHostnames()); - ASSERT(csNew.coordinators().size() == old_coordinators_processes.size() + 1); + ClusterConnectionString csNew(res.get().toString()); + // verify the cluster decription + ASSERT(new_cluster_description == csNew.clusterKeyName().toString()); + ASSERT(csNew.hostnames.size() + csNew.coordinators().size() == + old_coordinators_processes.size() + 1); + std::vector newCoordinators = wait(csNew.tryResolveHostnames()); // verify the coordinators' addresses - for (const auto& network_address : csNew.coordinators()) { + for (const auto& network_address : newCoordinators) { std::string address_str = network_address.toString(); ASSERT(std::find(old_coordinators_processes.begin(), old_coordinators_processes.end(), address_str) != old_coordinators_processes.end() || new_coordinator_process == address_str); } - // verify the cluster decription - ASSERT(new_cluster_description == csNew.clusterKeyName().toString()); tx->reset(); } catch (Error& e) { wait(tx->onError(e));