From 9553458b787639fc58a19cb8452dac2b36bc5ed0 Mon Sep 17 00:00:00 2001 From: Alvin Moore Date: Mon, 19 Jun 2017 16:48:15 -0700 Subject: [PATCH] Updated simulation to support managing exclusion and inclusion address Added method for identifying acceptable availability process classes Extended cluster availability function to ensure coordinators can be auto configured Fixed availability function to allow protected processes to be considered as dead if not available Added debug trace events for providing machine state when considering availability Added trace event for protected coordinators --- fdbclient/ManagementAPI.actor.cpp | 4 +- fdbrpc/sim2.actor.cpp | 75 +++++++++++++++++++++------- fdbrpc/simulator.h | 37 ++++++++++++-- fdbserver/SimulatedCluster.actor.cpp | 25 ++++++---- 4 files changed, 107 insertions(+), 34 deletions(-) diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 81a306c9d4..fad3176af2 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -705,7 +705,9 @@ ACTOR Future changeQuorum( Database cx, ReferenceisSimulated()) { for(int i = 0; i < (desiredCoordinators.size()/2)+1; i++) { - g_simulator.protectedAddresses.insert( NetworkAddress(desiredCoordinators[i].ip,desiredCoordinators[i].port,true,false) ); + auto address = NetworkAddress(desiredCoordinators[i].ip,desiredCoordinators[i].port,true,false); + g_simulator.protectedAddresses.insert(address); + TraceEvent("ProtectCoordinator").detail("Address", address).backtrace(); } } diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 89b5df40db..79b7d77668 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -946,11 +946,12 @@ public: machine.processes.push_back(m); currentlyRebootingProcesses.erase(address); addressMap[ m->address ] = m; + m->excluded = g_simulator.isExcluded(address); m->setGlobal(enTDMetrics, (flowGlobalType) &m->tdmetrics); m->setGlobal(enNetworkConnections, (flowGlobalType) m->network); - TraceEvent("NewMachine").detail("Name", name).detail("Address", m->address).detailext("zoneId", m->locality.zoneId()); + TraceEvent("NewMachine").detail("Name", name).detail("Address", m->address).detailext("zoneId", m->locality.zoneId()).detail("Excluded", m->excluded); // FIXME: Sometimes, connections to/from this process will explicitly close @@ -962,14 +963,14 @@ public: for (auto processInfo : getAllProcesses()) { // Add non-test processes (ie. datahall is not be set for test processes) - if (processInfo->startingClass != ProcessClass::TesterClass) { - // Do not kill protected processes - if (protectedAddresses.count(processInfo->address)) - processesLeft.push_back(processInfo); - else if (processInfo->isAvailable()) + if (processInfo->isAvailableClass()) { + // Mark all of the unavailable as dead + if (!processInfo->isAvailable()) + processesDead.push_back(processInfo); + else if (protectedAddresses.count(processInfo->address)) processesLeft.push_back(processInfo); else - processesDead.push_back(processInfo); + processesLeft.push_back(processInfo); } } return canKillProcesses(processesLeft, processesDead, KillInstantly, NULL); @@ -979,16 +980,20 @@ public: virtual bool canKillProcesses(std::vector const& availableProcesses, std::vector const& deadProcesses, KillType kt, KillType* newKillType) const { bool canSurvive = true; + int nQuorum = ((desiredCoordinators+1)/2)*2-1; + KillType newKt = kt; if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)) { LocalityGroup processesLeft, processesDead; std::vector localitiesDead, localitiesLeft, badCombo; + std::set>> uniqueMachines; ASSERT(storagePolicy); ASSERT(tLogPolicy); for (auto processInfo : availableProcesses) { processesLeft.add(processInfo->locality); localitiesLeft.push_back(processInfo->locality); + uniqueMachines.insert(processInfo->locality.machineId()); } for (auto processInfo : deadProcesses) { processesDead.add(processInfo->locality); @@ -1023,8 +1028,13 @@ public: canSurvive = false; TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Reason", "storagePolicy does not validates against remaining processes."); } + else if ((kt != RebootAndDelete) && (kt != RebootProcessAndDelete) && (nQuorum > uniqueMachines.size())) { + auto newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot; + canSurvive = false; + TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("Reason", "Not enough unique machines to perform auto configuration of coordinators."); + } else { - TraceEvent("CanSurviveKills").detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("DeadZones", ::describeZones(localitiesDead)).detail("DeadDataHalls", ::describeDataHalls(localitiesDead)).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()); + TraceEvent("CanSurviveKills").detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("DeadZones", ::describeZones(localitiesDead)).detail("DeadDataHalls", ::describeDataHalls(localitiesDead)).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("ZonesLeft", ::describeZones(localitiesLeft)).detail("ValidateRemaining", processesLeft.validate(tLogPolicy)); } } if (newKillType) *newKillType = newKt; @@ -1032,7 +1042,7 @@ public: } virtual void destroyProcess( ISimulator::ProcessInfo *p ) { - TraceEvent("ProcessDestroyed").detail("Name", p->name).detail("Address", p->address).detailext("zoneId", p->locality.zoneId()).backtrace(); + TraceEvent("ProcessDestroyed").detail("Name", p->name).detail("Address", p->address).detailext("zoneId", p->locality.zoneId()); currentlyRebootingProcesses.insert(std::pair(p->address, p)); std::vector& processes = machines[ p->locality.zoneId().get() ].processes; if( p != processes.back() ) { @@ -1105,7 +1115,7 @@ public: auto ktOrig = kt; if (killIsSafe) ASSERT( kt == ISimulator::RebootAndDelete ); // Only types of "safe" kill supported so far - TEST(true); // Trying to killing a machine + TEST(true); // Trying to killing a machine TEST(kt == KillInstantly); // Trying to kill instantly TEST(kt == InjectFaults); // Trying to kill by injecting faults @@ -1124,6 +1134,12 @@ public: processesOnMachine++; } + // Do nothing, if no processes to kill + if (processesOnMachine == 0) { + TraceEvent(SevWarn, "AbortedKill", zoneId).detailext("ZoneId", zoneId).detail("Reason", "The target had no processes running.").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace(); + return false; + } + // Check if machine can be removed, if requested if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)) { @@ -1132,12 +1148,13 @@ public: for (auto machineRec : machines) { for (auto processInfo : machineRec.second.processes) { // Add non-test processes (ie. datahall is not be set for test processes) - if (processInfo->startingClass != ProcessClass::TesterClass) { - if (protectedAddresses.count(processInfo->address)) + if (processInfo->isAvailableClass()) { + if (!processInfo->isAvailable()) + processesDead.push_back(processInfo); + else if (protectedAddresses.count(processInfo->address)) processesLeft.push_back(processInfo); - else if (processInfo->isAvailable() && (machineRec.second.zoneId != zoneId)) { + else if (machineRec.second.zoneId != zoneId) processesLeft.push_back(processInfo); - } // Add processes from dead machines and datacenter machines to dead group else processesDead.push_back(processInfo); @@ -1155,9 +1172,18 @@ public: for (auto process : processesLeft) { TraceEvent("DeadMachineSurvivors", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", describe(*process)); } + for (auto process : processesDead) { + TraceEvent("DeadMachineVictims", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", describe(*process)); + } } else { TraceEvent("ClearMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()); + for (auto process : processesLeft) { + TraceEvent("ClearMachineSurvivors", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", describe(*process)); + } + for (auto process : processesDead) { + TraceEvent("ClearMachineVictims", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", describe(*process)); + } } } @@ -1170,6 +1196,14 @@ public: return false; } + // Check if any processes on machine are rebooting + if ( processesOnMachine != processesPerMachine) { + TEST(true); //Attempted reboot, but the target did not have all of its processes running + TraceEvent(SevWarn, "AbortedKill", zoneId).detailext("ZoneId", zoneId).detail("Reason", "The target did not have all of its processes running.").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace(); + return false; + } + + TraceEvent("KillMachine", zoneId).detailext("ZoneId", zoneId).detail("Kt", kt).detail("KtOrig", ktOrig).detail("KilledMachines", killedMachines).detail("KillableMachines", processesOnMachine).detail("ProcessPerMachine", processesPerMachine).detail("KillChanged", kt!=ktOrig).detail("killIsSafe", killIsSafe); if (kt < RebootAndDelete ) { for (auto& process : machines[zoneId].processes) { @@ -1215,13 +1249,15 @@ public: for (auto machineRec : machines) { for (auto processInfo : machineRec.second.processes) { // Add non-test processes (ie. datahall is not be set for test processes) - if (processInfo->startingClass != ProcessClass::TesterClass) { - // Do not kill protected processes - if (protectedAddresses.count(processInfo->address)) + if (processInfo->isAvailableClass()) { + // Mark all of the unavailable as dead + if (!processInfo->isAvailable()) + processesDead.push_back(processInfo); + else if (protectedAddresses.count(processInfo->address)) processesLeft.push_back(processInfo); - else if (processInfo->isAvailable() && (datacenterZones.find(machineRec.second.zoneId) == datacenterZones.end())) { + // Keep all not in the datacenter zones + else if (datacenterZones.find(machineRec.second.zoneId) == datacenterZones.end()) processesLeft.push_back(processInfo); - } else processesDead.push_back(processInfo); } @@ -1391,6 +1427,7 @@ public: std::map>, MachineInfo > machines; std::map addressMap; std::map> filesDeadMap; + std::set exclusionSet; //tasks is guarded by ISimulator::mutex std::priority_queue> tasks; diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 92e4594fbf..dd69291dcd 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -71,20 +71,35 @@ public: : name(name), locality(locality), startingClass(startingClass), address(address), dataFolder(dataFolder), network(net), coordinationFolder(coordinationFolder), failed(false), excluded(false), cpuTicks(0), rebooting(false), fault_injection_p1(0), fault_injection_p2(0), - fault_injection_r(0), machine(0), io_timeout_injected(false) - {} + fault_injection_r(0), machine(0), io_timeout_injected(false) {} Future onShutdown() { return shutdownSignal.getFuture(); } bool isReliable() const { return !failed && fault_injection_p1 == 0 && fault_injection_p2 == 0; } bool isAvailable() const { return !excluded && isReliable(); } + // Returns true if the class represents an acceptable worker + bool isAvailableClass() const { + switch (startingClass._class) { + case ProcessClass::UnsetClass: return true; + case ProcessClass::StorageClass: return true; + case ProcessClass::TransactionClass: return true; + case ProcessClass::ResolutionClass: return false; + case ProcessClass::ProxyClass: return false; + case ProcessClass::MasterClass: return false; + case ProcessClass::TesterClass: return false; + case ProcessClass::StatelessClass: return false; + case ProcessClass::LogClass: return true; + default: return false; + } + } + inline flowGlobalType global(int id) { return (globals.size() > id) ? globals[id] : NULL; }; inline void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; }; std::string toString() const { - return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s coord: %s data: %s", - name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), coordinationFolder, dataFolder); } + return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s coord: %s data: %s excluded: %d", + name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), coordinationFolder, dataFolder, excluded); } // Members not for external use Promise shutdownSignal; @@ -135,6 +150,19 @@ public: virtual bool canKillProcesses(std::vector const& availableProcesses, std::vector const& deadProcesses, KillType kt, KillType* newKillType) const = 0; virtual bool isAvailable() const = 0; + virtual void excludeAddress(NetworkAddress const& address) { + excludedAddresses.insert(address); + } + virtual void includeAddress(NetworkAddress const& address) { + excludedAddresses.erase(address); + } + virtual void includeAllAddresses() { + excludedAddresses.clear(); + } + virtual bool isExcluded(NetworkAddress const& address) const { + return excludedAddresses.count(address) == 0; + } + virtual void disableSwapToMachine(Optional> zoneId ) { swapsDisabled.insert(zoneId); } @@ -201,6 +229,7 @@ protected: private: std::set>> swapsDisabled; + std::set excludedAddresses; bool allSwapsDisabled; }; diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 9eacebc403..b774cf5beb 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -198,15 +198,16 @@ ACTOR Future simulatedFDBDRebooter( bool runBackupAgents) { state ISimulator::ProcessInfo *simProcess = g_simulator.getCurrentProcess(); - state int cycles =0; + state UID randomId = g_nondeterministic_random->randomUniqueID(); + state int cycles = 0; loop { auto waitTime = SERVER_KNOBS->MIN_REBOOT_TIME + (SERVER_KNOBS->MAX_REBOOT_TIME - SERVER_KNOBS->MIN_REBOOT_TIME) * g_random->random01(); cycles ++; - TraceEvent("SimulatedFDBDWait").detail("Cycles", cycles) + TraceEvent("SimulatedFDBDWait").detail("Cycles", cycles).detail("RandomId", randomId) .detail("ProcessAddress", NetworkAddress(ip, port, true, false)) .detailext("ZoneId", localities.zoneId()) - .detail("waitTime", waitTime); + .detail("waitTime", waitTime).detail("Port", port); Void _ = wait( delay( waitTime ) ); @@ -215,10 +216,11 @@ ACTOR Future simulatedFDBDRebooter( state Future onShutdown = process->onShutdown(); try { - TraceEvent("SimulatedRebooterStarting", localities.zoneId()).detail("Cycles", cycles) + TraceEvent("SimulatedRebooterStarting", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId) .detailext("ZoneId", localities.zoneId()) .detailext("DataHall", localities.dataHallId()) .detail("ProcessAddress", process->address.toString()) + .detail("ProcessExcluded", process->excluded) .detail("UsingSSL", useSSL); TraceEvent("ProgramStart").detail("Cycles", cycles) .detail("SourceVersion", getHGVersion()) @@ -255,8 +257,9 @@ ACTOR Future simulatedFDBDRebooter( TraceEvent(e.code() == error_code_actor_cancelled || e.code() == error_code_file_not_found || destructed ? SevInfo : SevError, "SimulatedFDBDTerminated", localities.zoneId()).error(e, true); } - TraceEvent("SimulatedFDBDDone", localities.zoneId()).detail("Cycles", cycles) + TraceEvent("SimulatedFDBDDone", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId) .detail("ProcessAddress", process->address) + .detail("ProcessExcluded", process->excluded) .detailext("ZoneId", localities.zoneId()) .detail("KillType", onShutdown.isReady() ? onShutdown.get() : ISimulator::None); @@ -280,21 +283,23 @@ ACTOR Future simulatedFDBDRebooter( g_simulator.destroyProcess( process ); // Leak memory here; the process may be used in other parts of the simulation auto shutdownResult = onShutdown.get(); - TraceEvent("SimulatedFDBDShutdown", localities.zoneId()).detail("Cycles", cycles) + TraceEvent("SimulatedFDBDShutdown", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId) .detail("ProcessAddress", process->address) + .detail("ProcessExcluded", process->excluded) .detailext("ZoneId", localities.zoneId()) .detail("KillType", shutdownResult); if( shutdownResult < ISimulator::RebootProcessAndDelete ) { - TraceEvent("SimulatedFDBDLowerReboot", localities.zoneId()).detail("Cycles", cycles) + TraceEvent("SimulatedFDBDLowerReboot", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId) .detail("ProcessAddress", process->address) + .detail("ProcessExcluded", process->excluded) .detailext("ZoneId", localities.zoneId()) .detail("KillType", shutdownResult); return onShutdown.get(); } if( onShutdown.get() == ISimulator::RebootProcessAndDelete ) { - TraceEvent("SimulatedFDBDRebootAndDelete", localities.zoneId()).detail("Cycles", cycles) + TraceEvent("SimulatedFDBDRebootAndDelete", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId) .detail("ProcessAddress", process->address) .detailext("ZoneId", localities.zoneId()) .detail("KillType", shutdownResult); @@ -311,7 +316,7 @@ ACTOR Future simulatedFDBDRebooter( } } else { - TraceEvent("SimulatedFDBDJustRepeat", localities.zoneId()).detail("Cycles", cycles) + TraceEvent("SimulatedFDBDJustRepeat", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId) .detail("ProcessAddress", process->address) .detailext("ZoneId", localities.zoneId()) .detail("KillType", shutdownResult); @@ -744,7 +749,7 @@ void setupSimulatedSystem( vector> *systemActors, std::string baseF g_random->randomShuffle(coordinatorAddresses); for(int i = 0; i < (coordinatorAddresses.size()/2)+1; i++) { - TraceEvent("ProtectMachine").detail("Address", coordinatorAddresses[i]).detail("Coordinators", coordinatorAddresses.size()).backtrace(); + TraceEvent("ProtectCoordinator").detail("Address", coordinatorAddresses[i]).detail("Coordinators", describe(coordinatorAddresses)).backtrace(); g_simulator.protectedAddresses.insert(NetworkAddress(coordinatorAddresses[i].ip,coordinatorAddresses[i].port,true,false)); } g_random->randomShuffle(coordinatorAddresses);