From 64b45088ae703aa4202d5a477750dde6c41c9764 Mon Sep 17 00:00:00 2001 From: "Johannes M. Scheuermann" Date: Fri, 20 Oct 2023 15:33:08 +0200 Subject: [PATCH] Make sure server list is validated against the excluded localites --- fdbcli/ExcludeCommand.actor.cpp | 48 +++++++++++-------- fdbclient/DatabaseConfiguration.cpp | 16 +------ fdbclient/ManagementAPI.actor.cpp | 47 +++++++++++++++--- fdbclient/SpecialKeySpace.actor.cpp | 40 +++++++++++++++- .../include/fdbclient/DatabaseConfiguration.h | 1 - .../include/fdbclient/ManagementAPI.actor.h | 12 +++-- fdbrpc/Locality.cpp | 1 - fdbrpc/include/fdbrpc/Locality.h | 1 - 8 files changed, 116 insertions(+), 50 deletions(-) diff --git a/fdbcli/ExcludeCommand.actor.cpp b/fdbcli/ExcludeCommand.actor.cpp index 916da53a55..39a0d0bc08 100644 --- a/fdbcli/ExcludeCommand.actor.cpp +++ b/fdbcli/ExcludeCommand.actor.cpp @@ -170,9 +170,8 @@ ACTOR Future> getFailedLocalities(Reference } ACTOR Future> checkForExcludingServers(Reference db, - std::vector excl, + std::set exclusions, bool waitForAllExcluded) { - state std::set exclusions(excl.begin(), excl.end()); state std::set inProgressExclusion; state Reference tr = db->createTransaction(); loop { @@ -220,7 +219,7 @@ ACTOR Future> checkForExcludingServers(Reference checkForCoordinators(Reference db, std::vector exclusionVector) { +ACTOR Future checkForCoordinators(Reference db, std::set exclusions) { state bool foundCoordinator = false; state std::vector coordinatorList; @@ -237,9 +236,10 @@ ACTOR Future checkForCoordinators(Reference db, std::vectoronError(e))); } } + for (const auto& c : coordinatorList) { - if (std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip, c.port)) || - std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip))) { + if (exclusions.find(AddressExclusion(c.ip, c.port)) != exclusions.end() || + exclusions.find(AddressExclusion(c.ip)) != exclusions.end()) { fprintf(stderr, "WARNING: %s is a coordinator!\n", c.toString().c_str()); foundCoordinator = true; } @@ -310,7 +310,6 @@ ACTOR Future excludeCommandActor(Reference db, std::vector exclusionVector; state std::set exclusionSet; state std::vector exclusionAddresses; state std::unordered_set exclusionLocalities; @@ -319,9 +318,11 @@ ACTOR Future excludeCommandActor(Reference db, std::vector workers; - bool result = wait(fdb_cli::getWorkers(db, &workers)); - if (!result) - return false; + state std::map server_interfaces; + state Future future_workers = fdb_cli::getWorkers(db, &workers); + state Future future_server_interfaces = fdb_cli::getStorageServerInterfaces(db, &server_interfaces); + wait(success(future_workers) && success(future_server_interfaces)); + for (auto t = tokens.begin() + 1; t != tokens.end(); ++t) { if (*t == "FORCE"_sr) { force = true; @@ -331,15 +332,21 @@ ACTOR Future excludeCommandActor(Reference db, std::vectorstartsWith(LocalityData::ExcludeLocalityPrefix) && t->toString().find(':') != std::string::npos) { - std::set localityAddresses = getAddressesByLocality(workers, t->toString()); - if (localityAddresses.empty()) { + exclusionLocalities.insert(t->toString()); + auto localityAddresses = getAddressesByLocality(workers, t->toString()); + auto localityServerAddresses = getServerAddressesByLocality(server_interfaces, t->toString()); + if (localityAddresses.empty() && localityServerAddresses.empty()) { noMatchLocalities.push_back(t->toString()); - } else { - // add all the server ipaddresses that belong to the given localities to the exclusionSet. - exclusionVector.insert(exclusionVector.end(), localityAddresses.begin(), localityAddresses.end()); + continue; + } + + if (!localityAddresses.empty()) { exclusionSet.insert(localityAddresses.begin(), localityAddresses.end()); } - exclusionLocalities.insert(t->toString()); + + if (!localityServerAddresses.empty()) { + exclusionSet.insert(localityServerAddresses.begin(), localityServerAddresses.end()); + } } else { auto a = AddressExclusion::parse(*t); if (!a.isValid()) { @@ -350,13 +357,12 @@ ACTOR Future excludeCommandActor(Reference db, std::vector excludeCommandActor(Reference db, std::vector notExcludedServers = - wait(checkForExcludingServers(db, exclusionVector, waitForAllExcluded)); + wait(checkForExcludingServers(db, exclusionSet, waitForAllExcluded)); std::map> workerPorts; for (auto addr : workers) workerPorts[addr.address.ip].insert(addr.address.port); // Print a list of all excluded addresses that don't have a corresponding worker std::set absentExclusions; - for (const auto& addr : exclusionVector) { + for (const auto& addr : exclusionSet) { auto worker = workerPorts.find(addr.ip); if (worker == workerPorts.end()) absentExclusions.insert(addr); @@ -389,7 +395,7 @@ ACTOR Future excludeCommandActor(Reference db, std::vector excludeCommandActor(Reference db, std::vector DatabaseConfiguration::getExcludedLocalities() const { // TODO: revisit all const_cast usages @@ -897,4 +883,4 @@ TEST_CASE("/fdbclient/databaseConfiguration/overwriteCommitProxy") { ASSERT(conf1.getDesiredCommitProxies() == conf2.getDesiredCommitProxies()); return Void(); -} \ No newline at end of file +} diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 61da39673c..c43d526177 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -2146,21 +2146,56 @@ std::pair decodeLocality(const std::string& locality) return std::make_pair("", ""); } +// Returns the list of IPAddresses of the servers that match the given locality. +// Example: locality="dcid:primary" returns all the ip addresses of the servers in the primary dc. +std::set getServerAddressesByLocality( + const std::map server_interfaces, + const std::string& locality) { + std::pair locality_key_value = decodeLocality(locality); + std::set locality_addresses; + + for (auto& server : server_interfaces) { + auto locality_value = server.second.locality.get(locality_key_value.first); + if (!locality_value.present()) { + continue; + } + + if (locality_value.get() != locality_key_value.second) { + continue; + } + + auto primary_address = server.second.address(); + locality_addresses.insert(AddressExclusion(primary_address.ip, primary_address.port)); + if (server.second.secondaryAddress().present()) { + auto secondary_address = server.second.secondaryAddress().get(); + locality_addresses.insert(AddressExclusion(secondary_address.ip, secondary_address.port)); + } + } + + return locality_addresses; +} + // Returns the list of IPAddresses of the workers that match the given locality. // Example: locality="locality_dcid:primary" returns all the ip addresses of the workers in the primary dc. std::set getAddressesByLocality(const std::vector& workers, const std::string& locality) { - std::pair localityKeyValue = decodeLocality(locality); + std::pair locality_key_value = decodeLocality(locality); + std::set locality_addresses; - std::set localityAddresses; for (int i = 0; i < workers.size(); i++) { - auto localityValue = workers[i].locality.get(localityKeyValue.first); - if (localityValue.present() && localityValue.get() == localityKeyValue.second) { - localityAddresses.insert(AddressExclusion(workers[i].address.ip, workers[i].address.port)); + auto locality_value = workers[i].locality.get(locality_key_value.first); + if (!locality_value.present()) { + continue; } + + if (locality_value.get() != locality_key_value.second) { + continue; + } + + locality_addresses.insert(AddressExclusion(workers[i].address.ip, workers[i].address.port)); } - return localityAddresses; + return locality_addresses; } ACTOR Future printHealthyZone(Database cx) { diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 8bfd32ffc0..fbe04ad838 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1219,7 +1219,12 @@ ACTOR Future ExclusionInProgressActor(ReadYourWritesTransaction* ry tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); // necessary? tr.setOption(FDBTransactionOptions::LOCK_AWARE); - state std::vector excl = wait((getAllExcludedServers(&tr))); + state Future> fExclusions = getAllExcludedServers(&tr); + state Future> fExcludedLocalities = getAllExcludedLocalities(&tr); + + wait(success(fExclusions) && success(fExcludedLocalities)); + + state std::vector excl = fExclusions.get(); state std::set exclusions(excl.begin(), excl.end()); state std::set inProgressExclusion; // Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed @@ -1227,18 +1232,48 @@ ACTOR Future ExclusionInProgressActor(ReadYourWritesTransaction* ry state RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY)); ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY); + // We have to make use of the localities here to verify if a server is still in the server list, + // even if it might be missing in the workers as the server is not running anymore. + state std::vector excludedLocalities = fExcludedLocalities.get(); + // Decode the excluded localities to check if any server is excluded by locality. + state std::vector> decodedExcludedLocalities; + for (auto& excludedLocality : excludedLocalities) { + decodedExcludedLocalities.push_back(decodeLocality(excludedLocality)); + } + for (auto& s : serverList) { - auto addresses = decodeServerListValue(s.value).getKeyValues.getEndpoint().addresses; + auto decodedServer = decodeServerListValue(s.value); + auto addresses = decodedServer.getKeyValues.getEndpoint().addresses; if (addressExcluded(exclusions, addresses.address)) { inProgressExclusion.insert(addresses.address); } + if (addresses.secondaryAddress.present() && addressExcluded(exclusions, addresses.secondaryAddress.get())) { inProgressExclusion.insert(addresses.secondaryAddress.get()); } + + // Check if the server is excluded based on a locality. + for (auto& excludedLocality : decodedExcludedLocalities) { + if (!decodedServer.locality.isPresent(excludedLocality.first)) { + continue; + } + + if (decodedServer.locality.get(excludedLocality.first) != excludedLocality.second) { + continue; + } + + inProgressExclusion.insert(addresses.address); + if (addresses.secondaryAddress.present()) { + inProgressExclusion.insert(addresses.secondaryAddress.get()); + } + } } Optional> value = wait(tr.get(logsKey)); ASSERT(value.present()); + // TODO(jscheuermann): The logs key range doesn't hold any information about localities. This is a limitation + // for locality based exclusions. The problematic edge case here is a log server that still has mutation on it + // but is currently not part of the worker list, e.g. because it was shutdown or is partitioned. auto logs = decodeLogsValue(value.get()); for (auto const& log : logs.first) { if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) { @@ -1264,6 +1299,7 @@ ACTOR Future ExclusionInProgressActor(ReadYourWritesTransaction* ry result.arena().dependsOn(addrKey.arena()); } } + return result; } diff --git a/fdbclient/include/fdbclient/DatabaseConfiguration.h b/fdbclient/include/fdbclient/DatabaseConfiguration.h index 3100a2ddd9..d3675d4662 100644 --- a/fdbclient/include/fdbclient/DatabaseConfiguration.h +++ b/fdbclient/include/fdbclient/DatabaseConfiguration.h @@ -266,7 +266,6 @@ struct DatabaseConfiguration { // Excluded servers (no state should be here) bool isExcludedServer(NetworkAddressList, const LocalityData& locality) const; bool isExcludedLocality(const LocalityData& locality) const; - bool isMachineExcluded(const LocalityData& locality) const; std::set getExcludedServers() const; std::set getExcludedLocalities() const; diff --git a/fdbclient/include/fdbclient/ManagementAPI.actor.h b/fdbclient/include/fdbclient/ManagementAPI.actor.h index 3fe3b4e734..e7d436c7a3 100644 --- a/fdbclient/include/fdbclient/ManagementAPI.actor.h +++ b/fdbclient/include/fdbclient/ManagementAPI.actor.h @@ -108,11 +108,17 @@ ACTOR Future> getExcludedLocalityList(Transaction* tr); // Get the current list of failed localities. ACTOR Future> getExcludedFailedLocalityList(Transaction* tr); +// Decodes the locality string to a pair of locality prefix and its value. +// The prefix could be dcid, processid, machineid, processid. +std::pair decodeLocality(const std::string& locality); +std::set getServerAddressesByLocality( + const std::map server_interfaces, + const std::string& locality); std::set getAddressesByLocality(const std::vector& workers, const std::string& locality); -// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is -// true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and -// unless any of them are explicitly included with includeServers() +// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion +// is true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, +// until and unless any of them are explicitly included with includeServers() ACTOR Future> checkForExcludingServers(Database cx, std::vector servers, bool waitForAllExcluded); diff --git a/fdbrpc/Locality.cpp b/fdbrpc/Locality.cpp index c59cf9a769..d555ac23c8 100644 --- a/fdbrpc/Locality.cpp +++ b/fdbrpc/Locality.cpp @@ -26,7 +26,6 @@ const StringRef LocalityData::keyZoneId = "zoneid"_sr; const StringRef LocalityData::keyDcId = "dcid"_sr; const StringRef LocalityData::keyMachineId = "machineid"_sr; const StringRef LocalityData::keyDataHallId = "data_hall"_sr; -const StringRef LocalityData::ExcludeLocalityKeyMachineIdPrefix = "locality_machineid:"_sr; const StringRef LocalityData::ExcludeLocalityPrefix = "locality_"_sr; ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const { diff --git a/fdbrpc/include/fdbrpc/Locality.h b/fdbrpc/include/fdbrpc/Locality.h index eafac29ea8..176f70ed5e 100644 --- a/fdbrpc/include/fdbrpc/Locality.h +++ b/fdbrpc/include/fdbrpc/Locality.h @@ -397,7 +397,6 @@ public: } static const UID UNSET_ID; - static const StringRef ExcludeLocalityKeyMachineIdPrefix; static const StringRef ExcludeLocalityPrefix; };