diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 83f034dbbf..953d4dd761 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -2391,7 +2391,7 @@ ACTOR Future coordinators(Database db, std::vector tokens, bool return err; } -// Includes the servers that could be ipaddresses or localities back to the cluster. +// Includes the servers that could be IP addresses or localities back to the cluster. ACTOR Future include(Database db, std::vector tokens) { std::vector addresses; state std::vector localities; @@ -2423,14 +2423,14 @@ ACTOR Future include(Database db, std::vector tokens) { std::vector includeAll; includeAll.push_back(AddressExclusion()); wait(makeInterruptable(includeServers(db, includeAll, failed))); - wait(makeInterruptable(includeLocalities(db, &localities, failed, all))); + wait(makeInterruptable(includeLocalities(db, localities, failed, all))); } else { if (!addresses.empty()) { wait(makeInterruptable(includeServers(db, addresses, failed))); } if (!localities.empty()) { // includes the servers that belong to given localities. - wait(makeInterruptable(includeLocalities(db, &localities, failed, all))); + wait(makeInterruptable(includeLocalities(db, localities, failed, all))); } } return false; @@ -2492,7 +2492,7 @@ ACTOR Future exclude(Database db, noMatchLocalities.push_back(t->toString()); } else { // add all the server ipaddresses that belong to the given localities to the exclusionSet. - std::copy(localityAddresses.begin(), localityAddresses.end(), back_inserter(exclusionVector)); + exclusionVector.insert(exclusionVector.end(), localityAddresses.begin(), localityAddresses.end()); exclusionSet.insert(localityAddresses.begin(), localityAddresses.end()); } exclusionLocalities.insert(t->toString()); @@ -2632,7 +2632,7 @@ ACTOR Future exclude(Database db, wait(makeInterruptable(excludeServers(db, exclusionAddresses, markFailed))); } if (!exclusionLocalities.empty()) { - wait(makeInterruptable(excludeLocalities(db, &exclusionLocalities, markFailed))); + wait(makeInterruptable(excludeLocalities(db, exclusionLocalities, markFailed))); } if (waitForAllExcluded) { diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 96268bda36..a66b1df4b9 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -1597,7 +1597,7 @@ ACTOR Future excludeServers(Database cx, vector servers, } // excludes localities by setting the keys in api version below 7.0 -void excludeLocalities(Transaction& tr, std::unordered_set* localities, bool failed) { +void excludeLocalities(Transaction& tr, std::unordered_set localities, bool failed) { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); tr.setOption(FDBTransactionOptions::LOCK_AWARE); @@ -1606,35 +1606,35 @@ void excludeLocalities(Transaction& tr, std::unordered_set* localit auto localityVersionKey = failed ? failedLocalityVersionKey : excludedLocalityVersionKey; tr.addReadConflictRange(singleKeyRange(localityVersionKey)); // To conflict with parallel includeLocalities tr.set(localityVersionKey, excludeVersionKey); - for (const auto& l : *localities) { + for (const auto& l : localities) { if (failed) { tr.set(encodeFailedLocalityKey(l), StringRef()); } else { tr.set(encodeExcludedLocalityKey(l), StringRef()); } } - TraceEvent("ExcludeLocalitiesCommit").detail("Localities", describe(*localities)).detail("ExcludeFailed", failed); + TraceEvent("ExcludeLocalitiesCommit").detail("Localities", describe(localities)).detail("ExcludeFailed", failed); } // Exclude the servers matching the given set of localities from use as state servers. // excludes localities by setting the keys. -ACTOR Future excludeLocalities(Database cx, std::unordered_set* localities, bool failed) { +ACTOR Future excludeLocalities(Database cx, std::unordered_set localities, bool failed) { if (cx->apiVersionAtLeast(700)) { state ReadYourWritesTransaction ryw(cx); loop { try { ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); ryw.set(SpecialKeySpace::getManagementApiCommandOptionSpecialKey( - failed ? "failedlocality" : "excludedlocality", "force"), + failed ? "failed_locality" : "excluded_locality", "force"), ValueRef()); - for (const auto& l : *localities) { + for (const auto& l : localities) { Key addr = failed ? SpecialKeySpace::getManagementApiCommandPrefix("failedlocality").withSuffix(l) : SpecialKeySpace::getManagementApiCommandPrefix("excludedlocality").withSuffix(l); ryw.set(addr, ValueRef()); } TraceEvent("ExcludeLocalitiesSpecialKeySpaceCommit") - .detail("Localities", describe(*localities)) + .detail("Localities", describe(localities)) .detail("ExcludeFailed", failed); wait(ryw.commit()); @@ -1762,7 +1762,7 @@ ACTOR Future includeServers(Database cx, vector servers, // Remove the given localities from the exclusion list. // include localities by clearing the keys. -ACTOR Future includeLocalities(Database cx, vector* localities, bool failed, bool includeAll) { +ACTOR Future includeLocalities(Database cx, vector localities, bool failed, bool includeAll) { state std::string versionKey = deterministicRandom()->randomUniqueID().toString(); if (cx->apiVersionAtLeast(700)) { state ReadYourWritesTransaction ryw(cx); @@ -1776,7 +1776,7 @@ ACTOR Future includeLocalities(Database cx, vector* localitie ryw.clear(SpecialKeySpace::getManamentApiCommandRange("excludedlocality")); } } else { - for (const auto& l : *localities) { + for (const auto& l : localities) { Key locality = failed ? SpecialKeySpace::getManagementApiCommandPrefix("failedlocality").withSuffix(l) : SpecialKeySpace::getManagementApiCommandPrefix("excludedlocality").withSuffix(l); @@ -1784,7 +1784,7 @@ ACTOR Future includeLocalities(Database cx, vector* localitie } } TraceEvent("IncludeLocalitiesCommit") - .detail("Localities", describe(*localities)) + .detail("Localities", describe(localities)) .detail("Failed", failed) .detail("IncludeAll", includeAll); @@ -1822,7 +1822,7 @@ ACTOR Future includeLocalities(Database cx, vector* localitie tr.clear(excludedLocalityKeys); } } else { - for (const auto& l : *localities) { + for (const auto& l : localities) { if (failed) { tr.clear(encodeFailedLocalityKey(l)); } else { @@ -1832,7 +1832,7 @@ ACTOR Future includeLocalities(Database cx, vector* localitie } TraceEvent("IncludeLocalitiesCommit") - .detail("Localities", describe(*localities)) + .detail("Localities", describe(localities)) .detail("Failed", failed) .detail("IncludeAll", includeAll); @@ -1955,7 +1955,7 @@ ACTOR Future> getExcludedLocalities(Database cx) { // Decodes the locality string to a pair of locality prefix and its value. // The prefix could be dcid, processid, machineid, processid. -std::pair decodeLocality(std::string& locality) { +std::pair decodeLocality(const std::string& locality) { StringRef localityRef(locality.c_str()); if (localityRef.startsWith(ExcludeLocalityKeyDcIdPrefix)) { return std::make_pair(LocalityData::keyDcId.toString(), @@ -1976,7 +1976,8 @@ std::pair decodeLocality(std::string& locality) { // Returns the list of IPAddresses of the workers that match the given locality. // Example: locality="dcid:primary" returns all the ip addresses of the workers in the primary dc. -std::set getAddressesByLocality(std::vector& workers, std::string locality) { +std::set getAddressesByLocality(const std::vector& workers, + const std::string& locality) { std::pair localityKeyValue = decodeLocality(locality); std::set localityAddresses; diff --git a/fdbclient/ManagementAPI.actor.h b/fdbclient/ManagementAPI.actor.h index 7660f62f06..4cf46a3cf8 100644 --- a/fdbclient/ManagementAPI.actor.h +++ b/fdbclient/ManagementAPI.actor.h @@ -165,8 +165,8 @@ void excludeServers(Transaction& tr, vector& servers, bool fai // Exclude the servers matching the given set of localities from use as state servers. Returns as soon as the change // is durable, without necessarily waiting for the servers to be evacuated. -ACTOR Future excludeLocalities(Database cx, std::unordered_set* localities, bool failed = false); -void excludeLocalities(Transaction& tr, std::unordered_set* localities, bool failed = false); +ACTOR Future excludeLocalities(Database cx, std::unordered_set localities, bool failed = false); +void excludeLocalities(Transaction& tr, std::unordered_set localities, bool failed = false); // Remove the given servers from the exclusion list. A NetworkAddress with a port of 0 means all servers on the given // IP. A NetworkAddress() means all servers (don't exclude anything) @@ -174,7 +174,7 @@ ACTOR Future includeServers(Database cx, vector servers, // Remove the given localities from the exclusion list. ACTOR Future includeLocalities(Database cx, - vector* localities, + vector localities, bool failed = false, bool includeAll = false); @@ -190,7 +190,7 @@ ACTOR Future> getExcludedServers(Transaction* tr); ACTOR Future> getExcludedLocalities(Database cx); ACTOR Future> getExcludedLocalities(Transaction* tr); -std::set getAddressesByLocality(std::vector& workers, std::string locality); +std::set getAddressesByLocality(const std::vector& workers, const std::string& locality); // Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is // true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 2e4ec84dbd..0269ab006e 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -747,7 +747,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "coordinators_count":1, "excluded_servers":[ { - "address":"10.0.4.1" + "address":"10.0.4.1", + "locality":"processid:e9816ca4a89ff64ddb7ba2a5ec10b75b" } ], "auto_commit_proxies":3, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 2db85f4c77..5b5b20d174 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -106,8 +106,8 @@ std::unordered_map SpecialKeySpace::managementApiCommandT std::set SpecialKeySpace::options = { "excluded/force", "failed/force", - "excludedlocality/force", - "failedlocality/force" }; + "excluded_locality/force", + "failed_locality/force" }; std::set SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey }; @@ -2211,7 +2211,7 @@ ACTOR Future> excludeLocalityCommitActor(ReadYourWritesTra return result; // If force option is not set, we need to do safety check auto force = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey( - failed ? "failedlocality" : "excludedlocality", "force")]; + failed ? "failed_locality" : "excluded_locality", "force")]; // only do safety check when we have localities to be excluded and the force option key is not set if (localities.size() && !(force.first && force.second.present())) { bool safe = wait(checkExclusion(ryw->getDatabase(), &addresses, &exclusions, failed, &result)); @@ -2219,7 +2219,7 @@ ACTOR Future> excludeLocalityCommitActor(ReadYourWritesTra return result; } - excludeLocalities(ryw->getTransaction(), &localities, failed); + excludeLocalities(ryw->getTransaction(), localities, failed); includeLocalities(ryw); return result; diff --git a/fdbserver/workloads/RemoveServersSafely.actor.cpp b/fdbserver/workloads/RemoveServersSafely.actor.cpp index 7fdb9c72ff..3c9b6b041f 100644 --- a/fdbserver/workloads/RemoveServersSafely.actor.cpp +++ b/fdbserver/workloads/RemoveServersSafely.actor.cpp @@ -35,7 +35,7 @@ struct RemoveServersSafelyWorkload : TestWorkload { bool enabled, killProcesses; int minMachinesToKill, maxMachinesToKill, maxSafetyCheckRetries; double minDelay, maxDelay; - double kill1Timeout, kill2Timeout, kill3Timeout; + double kill1Timeout, kill2Timeout; std::set toKill1, toKill2; std::map>> machine_ids; // ip -> Locality Zone id @@ -52,7 +52,6 @@ struct RemoveServersSafelyWorkload : TestWorkload { maxDelay = getOption(options, LiteralStringRef("maxDelay"), 60.0); kill1Timeout = getOption(options, LiteralStringRef("kill1Timeout"), 60.0); kill2Timeout = getOption(options, LiteralStringRef("kill2Timeout"), 6000.0); - kill3Timeout = getOption(options, LiteralStringRef("kill3Timeout"), 6000.0); killProcesses = deterministicRandom()->random01() < 0.5; if (g_network->isSimulated()) { g_simulator.allowLogSetKills = false; @@ -144,7 +143,7 @@ struct RemoveServersSafelyWorkload : TestWorkload { if (!enabled) return Void(); double delay = deterministicRandom()->random01() * (maxDelay - minDelay) + minDelay; - return workloadMain(this, cx, delay, toKill1, toKill2, minMachinesToKill, maxMachinesToKill); + return workloadMain(this, cx, delay, toKill1, toKill2); } Future check(Database const& cx) override { return true; } @@ -338,9 +337,7 @@ struct RemoveServersSafelyWorkload : TestWorkload { Database cx, double waitSeconds, std::set toKill1, - std::set toKill2, - int minMachinesToKill, - int maxMachinesToKill) { + std::set toKill2) { wait(updateProcessIds(cx)); wait(delay(waitSeconds)); @@ -381,6 +378,7 @@ struct RemoveServersSafelyWorkload : TestWorkload { // Include the servers, if unable to exclude // Reinclude when buggify is on to increase the surface area of the next set of excludes + state bool failed = true; if (!bClearedFirst || BUGGIFY) { // Get the updated list of processes which may have changed due to reboots, deletes, etc TraceEvent("RemoveAndKill") @@ -389,6 +387,8 @@ struct RemoveServersSafelyWorkload : TestWorkload { .detail("ToKill", describe(toKill1)) .detail("ClusterAvailable", g_simulator.isAvailable()); wait(includeServers(cx, vector(1))); + wait(includeLocalities(cx, vector(), failed, true)); + wait(includeLocalities(cx, vector(), !failed, true)); self->includeAddresses(toKill1); } @@ -425,85 +425,16 @@ struct RemoveServersSafelyWorkload : TestWorkload { .detail("ToKill", describe(toKill2)) .detail("ClusterAvailable", g_simulator.isAvailable()); - // Reinclude all of the machine, if buggified - if (BUGGIFY) { - // Get the updated list of processes which may have changed due to reboots, deletes, etc - TraceEvent("RemoveAndKill") - .detail("Step", "include all second") - .detail("KillTotal", toKill2.size()) - .detail("ToKill", describe(toKill2)) - .detail("ClusterAvailable", g_simulator.isAvailable()); - wait(includeServers(cx, vector(1))); - self->includeAddresses(toKill2); - } - - std::unordered_set localities; - auto processes = getServers(); - // collecting all localities - for (const auto& it : processes) { - if (it->locality.dcId().present()) - localities.insert(ExcludeLocalityKeyDcIdPrefix.toString() + it->locality.dcId().get().toString()); - if (it->locality.zoneId().present()) - localities.insert(ExcludeLocalityKeyZoneIdPrefix.toString() + it->locality.zoneId().get().toString()); - if (it->locality.machineId().present()) - localities.insert(ExcludeLocalityKeyMachineIdPrefix.toString() + - it->locality.machineId().get().toString()); - if (it->locality.processId().present()) - localities.insert(ExcludeLocalityKeyProcessIdPrefix.toString() + - it->locality.processId().get().toString()); - } - int localitiesCount = localities.size(); - int nToKill3 = deterministicRandom()->randomInt(std::min(localitiesCount, minMachinesToKill), - std::min(localitiesCount, maxMachinesToKill) + 1); - // get random subset of localities. - state std::set toKill3 = random_subset(localities, nToKill3); - - TraceEvent("RemoveAndKillLocalities") - .detail("LocalitiesSize", localities.size()) - .detail("Kill3Size", toKill3.size()) - .detail("ToKill3", describe(toKill3)); - - // toKill3 may kill too many servers to make cluster unavailable. - // Get the processes in toKill3 that are safe to kill - state std::set toKill3Addresses = wait(self->getLocalitiesAddresses(cx, toKill3)); - killProcArray = self->protectServers(toKill3Addresses); - - // Update the kill networks to the killable localities - toKill3 = self->getSafeLocalitiesToKill(killProcArray); - state bool failed = - deterministicRandom()->randomInt(0, 2) ? true : false; // excluding with random failed option - - TraceEvent("RemoveAndKillLocalities") - .detail("Step", "exclude third list") - .detail("ToKill3AddressesSize", toKill3Addresses.size()) - .detail("KillProcArraySize", killProcArray.size()) - .detail("Kill3Size", toKill3.size()) - .detail("ToKill3", describe(toKill3)) + // Get the updated list of processes which may have changed due to reboots, deletes, etc + TraceEvent("RemoveAndKill") + .detail("Step", "include all second") + .detail("KillTotal", toKill2.size()) + .detail("ToKill", describe(toKill2)) .detail("ClusterAvailable", g_simulator.isAvailable()); - // exclude localities - wait(reportErrors( - timeoutError(removeAndKillLocalities(self, cx, toKill3, bClearedFirst ? &toKill2 : nullptr, failed), - self->kill3Timeout), - "RemoveServersSafelyError", - UID())); - - TraceEvent("RemoveAndKillLocalities") - .detail("Step", "excluded thrid list") - .detail("Kill3Size", toKill3.size()) - .detail("ToKill3", describe(toKill3)) - .detail("ClusterAvailable", g_simulator.isAvailable()); - - // Reinclude all of the machine, if buggified - if (BUGGIFY) { - // Get the updated list of processes which may have changed due to reboots, deletes, etc - TraceEvent("RemoveAndKillLocalities") - .detail("Step", "include all third") - .detail("Kill3Size", toKill3.size()) - .detail("ToKill3", describe(toKill3)) - .detail("ClusterAvailable", g_simulator.isAvailable()); - vector emptyLocalities; - wait(includeLocalities(cx, &emptyLocalities, failed, true)); - } + wait(includeServers(cx, vector(1))); + wait(includeLocalities(cx, vector(), failed, true)); + wait(includeLocalities(cx, vector(), !failed, true)); + self->includeAddresses(toKill2); return Void(); } @@ -591,7 +522,10 @@ struct RemoveServersSafelyWorkload : TestWorkload { .detail("Step", "Including all") .detail("ClusterAvailable", g_simulator.isAvailable()) .detail("MarkExcludeAsFailed", markExcludeAsFailed); + state bool failed = true; wait(includeServers(cx, vector(1))); + wait(includeLocalities(cx, vector(), failed, true)); + wait(includeLocalities(cx, vector(), !failed, true)); TraceEvent("RemoveAndKill", functionId) .detail("Step", "Included all") .detail("ClusterAvailable", g_simulator.isAvailable()) @@ -692,20 +626,60 @@ struct RemoveServersSafelyWorkload : TestWorkload { .detail("FailedAddresses", describe(toKillMarkFailedArray)) .detail("ClusterAvailable", g_simulator.isAvailable()) .detail("MarkExcludeAsFailed", markExcludeAsFailed); + + state bool excludeLocalitiesInsteadOfServers = deterministicRandom()->coinflip(); if (markExcludeAsFailed) { - wait(excludeServers(cx, toKillMarkFailedArray, true)); + if (excludeLocalitiesInsteadOfServers) { + state std::unordered_set toKillLocalitiesFailed = + self->getLocalitiesFromAddresses(toKillMarkFailedArray); + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Excluding localities with failed option") + .detail("FailedAddressesSize", toKillMarkFailedArray.size()) + .detail("FailedAddresses", describe(toKillMarkFailedArray)) + .detail("FailedLocaitiesSize", toKillLocalitiesFailed.size()) + .detail("FailedLocaities", describe(toKillLocalitiesFailed)); + + wait(excludeLocalities(cx, toKillLocalitiesFailed, true)); + } else { + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Excluding servers with failed option") + .detail("FailedAddressesSize", toKillMarkFailedArray.size()) + .detail("FailedAddresses", describe(toKillMarkFailedArray)); + + wait(excludeServers(cx, toKillMarkFailedArray, true)); + } + } + + if (excludeLocalitiesInsteadOfServers) { + state std::unordered_set toKillLocalities = self->getLocalitiesFromAddresses(toKillArray); + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Excluding localities without failed option") + .detail("AddressesSize", toKillArray.size()) + .detail("Addresses", describe(toKillArray)) + .detail("LocaitiesSize", toKillLocalities.size()) + .detail("Locaities", describe(toKillLocalities)); + + wait(excludeLocalities(cx, toKillLocalities, false)); + } else { + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Excluding servers without failed option") + .detail("AddressesSize", toKillArray.size()) + .detail("Addresses", describe(toKillArray)); + + wait(excludeServers(cx, toKillArray)); } - wait(excludeServers(cx, toKillArray)); // We need to skip at least the quorum change if there's nothing to kill, because there might not be enough // servers left alive to do a coordinators auto (?) if (toKill.size()) { - // Wait for removal to be safe - TraceEvent("RemoveAndKill", functionId) - .detail("Step", "Wait For Server Exclusion") - .detail("Addresses", describe(toKill)) - .detail("ClusterAvailable", g_simulator.isAvailable()); - wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */))); + if (!excludeLocalitiesInsteadOfServers) { + // Wait for removal to be safe + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Wait For Server Exclusion") + .detail("Addresses", describe(toKill)) + .detail("ClusterAvailable", g_simulator.isAvailable()); + wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */))); + } TraceEvent("RemoveAndKill", functionId) .detail("Step", "coordinators auto") @@ -768,181 +742,63 @@ struct RemoveServersSafelyWorkload : TestWorkload { return subset; } - // creates a random set of size n from given unordered_set. - template - static std::set random_subset(std::unordered_set& s, int n) { - return random_subset(std::vector(s.begin(), s.end()), n); - } - - // creates a random set of size n from given set. - template - static std::set random_subset(std::set& s, int n) { - return random_subset(std::vector(s.begin(), s.end()), n); - } - bool killContainsProcess(AddressExclusion kill, NetworkAddress process) { return kill.excludes(process) || (machineProcesses.find(kill) != machineProcesses.end() && machineProcesses[kill].count(AddressExclusion(process.ip, process.port)) > 0); } - // Returns the list of IPAddresses of the workers that match the given list of localities. - ACTOR Future> getLocalitiesAddresses(Database cx, std::set localities) { - state Transaction tr(cx); - state std::vector workers = wait(getWorkers(&tr)); - state std::set addressesSet; - for (const auto& l : localities) { - std::set localityAddresses = getAddressesByLocality(workers, l); - addressesSet.insert(localityAddresses.begin(), localityAddresses.end()); - } - - return addressesSet; - } - - // Finds the safe localities list that can be excluded from the killable safeProcesses list. + // Finds the localities list that can be excluded from the safe killable addresses list. // If excluding based on a particular locality of the safe process, kills any other process, that - // particular locality is not included in the killable safeLocalities list. - std::set getSafeLocalitiesToKill(std::vector const& safeProcesses) { - std::unordered_map safeLocalitiesCount; - for (const auto& processInfo : safeProcesses) { - if (processInfo->locality.dcId().present()) - safeLocalitiesCount[ExcludeLocalityKeyDcIdPrefix.toString() + - processInfo->locality.dcId().get().toString()]++; - if (processInfo->locality.machineId().present()) - safeLocalitiesCount[ExcludeLocalityKeyMachineIdPrefix.toString() + - processInfo->locality.machineId().get().toString()]++; - if (processInfo->locality.processId().present()) - safeLocalitiesCount[ExcludeLocalityKeyProcessIdPrefix.toString() + - processInfo->locality.processId().get().toString()]++; - if (processInfo->locality.zoneId().present()) - safeLocalitiesCount[ExcludeLocalityKeyZoneIdPrefix.toString() + - processInfo->locality.zoneId().get().toString()]++; - } - - auto processes = getServers(); + // particular locality is not included in the killable localities list. + std::unordered_set getLocalitiesFromAddresses(const std::vector& addresses) { std::unordered_map allLocalitiesCount; + std::unordered_map killableLocalitiesCount; + auto processes = getServers(); for (const auto& processInfo : processes) { if (processInfo->locality.dcId().present()) allLocalitiesCount[ExcludeLocalityKeyDcIdPrefix.toString() + processInfo->locality.dcId().get().toString()]++; + if (processInfo->locality.zoneId().present()) + allLocalitiesCount[ExcludeLocalityKeyZoneIdPrefix.toString() + + processInfo->locality.zoneId().get().toString()]++; if (processInfo->locality.machineId().present()) allLocalitiesCount[ExcludeLocalityKeyMachineIdPrefix.toString() + processInfo->locality.machineId().get().toString()]++; if (processInfo->locality.processId().present()) allLocalitiesCount[ExcludeLocalityKeyProcessIdPrefix.toString() + processInfo->locality.processId().get().toString()]++; - if (processInfo->locality.zoneId().present()) - allLocalitiesCount[ExcludeLocalityKeyZoneIdPrefix.toString() + - processInfo->locality.zoneId().get().toString()]++; + + AddressExclusion pAddr(processInfo->address.ip, processInfo->address.port); + if (std::find(addresses.begin(), addresses.end(), pAddr) != addresses.end()) { + if (processInfo->locality.dcId().present()) + killableLocalitiesCount[ExcludeLocalityKeyDcIdPrefix.toString() + + processInfo->locality.dcId().get().toString()]++; + if (processInfo->locality.zoneId().present()) + killableLocalitiesCount[ExcludeLocalityKeyZoneIdPrefix.toString() + + processInfo->locality.zoneId().get().toString()]++; + if (processInfo->locality.machineId().present()) + killableLocalitiesCount[ExcludeLocalityKeyMachineIdPrefix.toString() + + processInfo->locality.machineId().get().toString()]++; + if (processInfo->locality.processId().present()) + killableLocalitiesCount[ExcludeLocalityKeyProcessIdPrefix.toString() + + processInfo->locality.processId().get().toString()]++; + } } - std::set safeLocalities; - for (const auto& l : safeLocalitiesCount) { + std::unordered_set toKillLocalities; + for (const auto& l : killableLocalitiesCount) { if (l.second == allLocalitiesCount[l.first]) { - safeLocalities.insert(l.first); + toKillLocalities.insert(l.first); } } - return safeLocalities; - } - - // Attempts to exclude a set of localities, and once the exclusion is successful it kills them. - // If markExcludeAsFailed is true, then it is an error if we cannot complete the exclusion. - ACTOR static Future removeAndKillLocalities(RemoveServersSafelyWorkload* self, - Database cx, - std::set toKill, - std::set* pIncAddrs, - bool markExcludeAsFailed) { - state UID functionId = nondeterministicRandom()->randomUniqueID(); - - // First clear the exclusion list and exclude the given list - TraceEvent("RemoveAndKillLocalities", functionId) - .detail("Step", "Including all") - .detail("ClusterAvailable", g_simulator.isAvailable()) - .detail("MarkExcludeAsFailed", markExcludeAsFailed); - wait(includeServers(cx, vector(1))); - TraceEvent("RemoveAndKillLocalities", functionId) - .detail("Step", "Included all") - .detail("ClusterAvailable", g_simulator.isAvailable()) - .detail("MarkExcludeAsFailed", markExcludeAsFailed); - // Reinclude the addresses that were excluded, if present - if (pIncAddrs) { - self->includeAddresses(*pIncAddrs); - } - - state std::set toKillMarkFailedSet; - // if markExcludedasFailed is true, get random subset of tokill, check for - // safe exclusions and exclude them with failed option. - if (markExcludeAsFailed) { - state int retries = 0; - loop { - state bool safe = false; - toKillMarkFailedSet = random_subset(toKill, deterministicRandom()->randomInt(0, toKill.size() + 1)); - state std::set toKillMarkFailedAddressesSet = - wait(self->getLocalitiesAddresses(cx, toKillMarkFailedSet)); - state std::vector toKillMarkFailedAddressesVector( - toKillMarkFailedAddressesSet.begin(), toKillMarkFailedAddressesSet.end()); - TraceEvent("RemoveAndKillLocalities", functionId) - .detail("Step", "SafetyCheck") - .detail("Exclusion Localities", describe(toKillMarkFailedSet)) - .detail("Exclusion Addresses", describe(toKillMarkFailedAddressesSet)); - choose { - when(bool _safe = wait(checkSafeExclusions(cx, toKillMarkFailedAddressesVector))) { - safe = _safe && self->protectServers(toKillMarkFailedAddressesSet).size() == - toKillMarkFailedAddressesSet.size(); - } - when(wait(delay(5.0))) { - TraceEvent("RemoveAndKillLocalities", functionId) - .detail("Step", "SafetyCheckTimedOut") - .detail("Exclusion Localities", describe(toKillMarkFailedSet)) - .detail("Exclusion Addresses", describe(toKillMarkFailedAddressesSet)); - } - } - if (retries == self->maxSafetyCheckRetries) { - // Do not mark as failed if limit is reached - TraceEvent("RemoveAndKillLocalities", functionId) - .detail("Step", "SafetyCheckLimitReached") - .detail("Retries", retries); - markExcludeAsFailed = false; - safe = true; - } - if (safe) - break; - retries++; - } - } - - state std::unordered_set toKillUnorderedSet; - state std::unordered_set toKillMarkFailedUnorderedSet; - toKillUnorderedSet.insert(toKill.begin(), toKill.end()); - toKillMarkFailedUnorderedSet.insert(toKillMarkFailedSet.begin(), toKillMarkFailedSet.end()); - - TraceEvent("RemoveAndKillLocalities", functionId) - .detail("Step", "Activate Localities Exclusion") - .detail("KillLocalitiesSize", toKill.size()) - .detail("ToKill", describe(toKill)) - .detail("Localities", describe(toKillUnorderedSet)) - .detail("FailedLocalities", describe(toKillMarkFailedUnorderedSet)) - .detail("ClusterAvailable", g_simulator.isAvailable()) - .detail("MarkExcludeAsFailed", markExcludeAsFailed); - // exclude localities with failed option as true - if (markExcludeAsFailed) { - wait(excludeLocalities(cx, &toKillMarkFailedUnorderedSet, true)); - } - // exclude localities with failed option as false. - wait(excludeLocalities(cx, &toKillUnorderedSet)); - - TraceEvent("RemoveAndKillLocalities", functionId) - .detail("Step", "done") - .detail("ClusterAvailable", g_simulator.isAvailable()); - - return Void(); + return toKillLocalities; } // Update the g_simulator processes list with the process ids // of the workers, that are generated as part of worker creation. ACTOR static Future updateProcessIds(Database cx) { - Transaction tr(cx); - std::vector workers = wait(getWorkers(&tr)); + std::vector workers = wait(getWorkers(cx)); std::unordered_map addressToIndexMap; for (int i = 0; i < workers.size(); i++) { addressToIndexMap[workers[i].address] = i;