Updated simulation to support managing exclusion and inclusion address

Added method for identifying acceptable availability process classes
Extended cluster availability function to ensure coordinators can be auto configured
Fixed availability function to allow protected processes to be considered as dead if not available
Added debug trace events for providing machine state when considering availability
Added trace event for protected coordinators
This commit is contained in:
Alvin Moore 2017-06-19 16:48:15 -07:00
parent cefaa2391d
commit 9553458b78
4 changed files with 107 additions and 34 deletions

View File

@ -705,7 +705,9 @@ ACTOR Future<CoordinatorsResult::Type> changeQuorum( Database cx, Reference<IQuo
if(g_network->isSimulated()) {
for(int i = 0; i < (desiredCoordinators.size()/2)+1; i++) {
g_simulator.protectedAddresses.insert( NetworkAddress(desiredCoordinators[i].ip,desiredCoordinators[i].port,true,false) );
auto address = NetworkAddress(desiredCoordinators[i].ip,desiredCoordinators[i].port,true,false);
g_simulator.protectedAddresses.insert(address);
TraceEvent("ProtectCoordinator").detail("Address", address).backtrace();
}
}

View File

@ -946,11 +946,12 @@ public:
machine.processes.push_back(m);
currentlyRebootingProcesses.erase(address);
addressMap[ m->address ] = m;
m->excluded = g_simulator.isExcluded(address);
m->setGlobal(enTDMetrics, (flowGlobalType) &m->tdmetrics);
m->setGlobal(enNetworkConnections, (flowGlobalType) m->network);
TraceEvent("NewMachine").detail("Name", name).detail("Address", m->address).detailext("zoneId", m->locality.zoneId());
TraceEvent("NewMachine").detail("Name", name).detail("Address", m->address).detailext("zoneId", m->locality.zoneId()).detail("Excluded", m->excluded);
// FIXME: Sometimes, connections to/from this process will explicitly close
@ -962,14 +963,14 @@ public:
for (auto processInfo : getAllProcesses()) {
// Add non-test processes (ie. datahall is not be set for test processes)
if (processInfo->startingClass != ProcessClass::TesterClass) {
// Do not kill protected processes
if (protectedAddresses.count(processInfo->address))
processesLeft.push_back(processInfo);
else if (processInfo->isAvailable())
if (processInfo->isAvailableClass()) {
// Mark all of the unavailable as dead
if (!processInfo->isAvailable())
processesDead.push_back(processInfo);
else if (protectedAddresses.count(processInfo->address))
processesLeft.push_back(processInfo);
else
processesDead.push_back(processInfo);
processesLeft.push_back(processInfo);
}
}
return canKillProcesses(processesLeft, processesDead, KillInstantly, NULL);
@ -979,16 +980,20 @@ public:
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const
{
bool canSurvive = true;
int nQuorum = ((desiredCoordinators+1)/2)*2-1;
KillType newKt = kt;
if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))
{
LocalityGroup processesLeft, processesDead;
std::vector<LocalityData> localitiesDead, localitiesLeft, badCombo;
std::set<Optional<Standalone<StringRef>>> uniqueMachines;
ASSERT(storagePolicy);
ASSERT(tLogPolicy);
for (auto processInfo : availableProcesses) {
processesLeft.add(processInfo->locality);
localitiesLeft.push_back(processInfo->locality);
uniqueMachines.insert(processInfo->locality.machineId());
}
for (auto processInfo : deadProcesses) {
processesDead.add(processInfo->locality);
@ -1023,8 +1028,13 @@ public:
canSurvive = false;
TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Reason", "storagePolicy does not validates against remaining processes.");
}
else if ((kt != RebootAndDelete) && (kt != RebootProcessAndDelete) && (nQuorum > uniqueMachines.size())) {
auto newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot;
canSurvive = false;
TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("Reason", "Not enough unique machines to perform auto configuration of coordinators.");
}
else {
TraceEvent("CanSurviveKills").detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("DeadZones", ::describeZones(localitiesDead)).detail("DeadDataHalls", ::describeDataHalls(localitiesDead)).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
TraceEvent("CanSurviveKills").detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("DeadZones", ::describeZones(localitiesDead)).detail("DeadDataHalls", ::describeDataHalls(localitiesDead)).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("ZonesLeft", ::describeZones(localitiesLeft)).detail("ValidateRemaining", processesLeft.validate(tLogPolicy));
}
}
if (newKillType) *newKillType = newKt;
@ -1032,7 +1042,7 @@ public:
}
virtual void destroyProcess( ISimulator::ProcessInfo *p ) {
TraceEvent("ProcessDestroyed").detail("Name", p->name).detail("Address", p->address).detailext("zoneId", p->locality.zoneId()).backtrace();
TraceEvent("ProcessDestroyed").detail("Name", p->name).detail("Address", p->address).detailext("zoneId", p->locality.zoneId());
currentlyRebootingProcesses.insert(std::pair<NetworkAddress, ProcessInfo*>(p->address, p));
std::vector<ProcessInfo*>& processes = machines[ p->locality.zoneId().get() ].processes;
if( p != processes.back() ) {
@ -1105,7 +1115,7 @@ public:
auto ktOrig = kt;
if (killIsSafe) ASSERT( kt == ISimulator::RebootAndDelete ); // Only types of "safe" kill supported so far
TEST(true); // Trying to killing a machine
TEST(true); // Trying to killing a machine
TEST(kt == KillInstantly); // Trying to kill instantly
TEST(kt == InjectFaults); // Trying to kill by injecting faults
@ -1124,6 +1134,12 @@ public:
processesOnMachine++;
}
// Do nothing, if no processes to kill
if (processesOnMachine == 0) {
TraceEvent(SevWarn, "AbortedKill", zoneId).detailext("ZoneId", zoneId).detail("Reason", "The target had no processes running.").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace();
return false;
}
// Check if machine can be removed, if requested
if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))
{
@ -1132,12 +1148,13 @@ public:
for (auto machineRec : machines) {
for (auto processInfo : machineRec.second.processes) {
// Add non-test processes (ie. datahall is not be set for test processes)
if (processInfo->startingClass != ProcessClass::TesterClass) {
if (protectedAddresses.count(processInfo->address))
if (processInfo->isAvailableClass()) {
if (!processInfo->isAvailable())
processesDead.push_back(processInfo);
else if (protectedAddresses.count(processInfo->address))
processesLeft.push_back(processInfo);
else if (processInfo->isAvailable() && (machineRec.second.zoneId != zoneId)) {
else if (machineRec.second.zoneId != zoneId)
processesLeft.push_back(processInfo);
}
// Add processes from dead machines and datacenter machines to dead group
else
processesDead.push_back(processInfo);
@ -1155,9 +1172,18 @@ public:
for (auto process : processesLeft) {
TraceEvent("DeadMachineSurvivors", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", describe(*process));
}
for (auto process : processesDead) {
TraceEvent("DeadMachineVictims", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", describe(*process));
}
}
else {
TraceEvent("ClearMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
for (auto process : processesLeft) {
TraceEvent("ClearMachineSurvivors", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", describe(*process));
}
for (auto process : processesDead) {
TraceEvent("ClearMachineVictims", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", describe(*process));
}
}
}
@ -1170,6 +1196,14 @@ public:
return false;
}
// Check if any processes on machine are rebooting
if ( processesOnMachine != processesPerMachine) {
TEST(true); //Attempted reboot, but the target did not have all of its processes running
TraceEvent(SevWarn, "AbortedKill", zoneId).detailext("ZoneId", zoneId).detail("Reason", "The target did not have all of its processes running.").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace();
return false;
}
TraceEvent("KillMachine", zoneId).detailext("ZoneId", zoneId).detail("Kt", kt).detail("KtOrig", ktOrig).detail("KilledMachines", killedMachines).detail("KillableMachines", processesOnMachine).detail("ProcessPerMachine", processesPerMachine).detail("KillChanged", kt!=ktOrig).detail("killIsSafe", killIsSafe);
if (kt < RebootAndDelete ) {
for (auto& process : machines[zoneId].processes) {
@ -1215,13 +1249,15 @@ public:
for (auto machineRec : machines) {
for (auto processInfo : machineRec.second.processes) {
// Add non-test processes (ie. datahall is not be set for test processes)
if (processInfo->startingClass != ProcessClass::TesterClass) {
// Do not kill protected processes
if (protectedAddresses.count(processInfo->address))
if (processInfo->isAvailableClass()) {
// Mark all of the unavailable as dead
if (!processInfo->isAvailable())
processesDead.push_back(processInfo);
else if (protectedAddresses.count(processInfo->address))
processesLeft.push_back(processInfo);
else if (processInfo->isAvailable() && (datacenterZones.find(machineRec.second.zoneId) == datacenterZones.end())) {
// Keep all not in the datacenter zones
else if (datacenterZones.find(machineRec.second.zoneId) == datacenterZones.end())
processesLeft.push_back(processInfo);
}
else
processesDead.push_back(processInfo);
}
@ -1391,6 +1427,7 @@ public:
std::map<Optional<Standalone<StringRef>>, MachineInfo > machines;
std::map<NetworkAddress, ProcessInfo*> addressMap;
std::map<ProcessInfo*, Promise<Void>> filesDeadMap;
std::set<AddressExclusion> exclusionSet;
//tasks is guarded by ISimulator::mutex
std::priority_queue<Task, std::vector<Task>> tasks;

View File

@ -71,20 +71,35 @@ public:
: name(name), locality(locality), startingClass(startingClass), address(address), dataFolder(dataFolder),
network(net), coordinationFolder(coordinationFolder), failed(false), excluded(false), cpuTicks(0),
rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
fault_injection_r(0), machine(0), io_timeout_injected(false)
{}
fault_injection_r(0), machine(0), io_timeout_injected(false) {}
Future<KillType> onShutdown() { return shutdownSignal.getFuture(); }
bool isReliable() const { return !failed && fault_injection_p1 == 0 && fault_injection_p2 == 0; }
bool isAvailable() const { return !excluded && isReliable(); }
// Returns true if the class represents an acceptable worker
bool isAvailableClass() const {
switch (startingClass._class) {
case ProcessClass::UnsetClass: return true;
case ProcessClass::StorageClass: return true;
case ProcessClass::TransactionClass: return true;
case ProcessClass::ResolutionClass: return false;
case ProcessClass::ProxyClass: return false;
case ProcessClass::MasterClass: return false;
case ProcessClass::TesterClass: return false;
case ProcessClass::StatelessClass: return false;
case ProcessClass::LogClass: return true;
default: return false;
}
}
inline flowGlobalType global(int id) { return (globals.size() > id) ? globals[id] : NULL; };
inline void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; };
std::string toString() const {
return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s coord: %s data: %s",
name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), coordinationFolder, dataFolder); }
return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s coord: %s data: %s excluded: %d",
name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), coordinationFolder, dataFolder, excluded); }
// Members not for external use
Promise<KillType> shutdownSignal;
@ -135,6 +150,19 @@ public:
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const = 0;
virtual bool isAvailable() const = 0;
virtual void excludeAddress(NetworkAddress const& address) {
excludedAddresses.insert(address);
}
virtual void includeAddress(NetworkAddress const& address) {
excludedAddresses.erase(address);
}
virtual void includeAllAddresses() {
excludedAddresses.clear();
}
virtual bool isExcluded(NetworkAddress const& address) const {
return excludedAddresses.count(address) == 0;
}
virtual void disableSwapToMachine(Optional<Standalone<StringRef>> zoneId ) {
swapsDisabled.insert(zoneId);
}
@ -201,6 +229,7 @@ protected:
private:
std::set<Optional<Standalone<StringRef>>> swapsDisabled;
std::set<NetworkAddress> excludedAddresses;
bool allSwapsDisabled;
};

View File

@ -198,15 +198,16 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
bool runBackupAgents)
{
state ISimulator::ProcessInfo *simProcess = g_simulator.getCurrentProcess();
state int cycles =0;
state UID randomId = g_nondeterministic_random->randomUniqueID();
state int cycles = 0;
loop {
auto waitTime = SERVER_KNOBS->MIN_REBOOT_TIME + (SERVER_KNOBS->MAX_REBOOT_TIME - SERVER_KNOBS->MIN_REBOOT_TIME) * g_random->random01();
cycles ++;
TraceEvent("SimulatedFDBDWait").detail("Cycles", cycles)
TraceEvent("SimulatedFDBDWait").detail("Cycles", cycles).detail("RandomId", randomId)
.detail("ProcessAddress", NetworkAddress(ip, port, true, false))
.detailext("ZoneId", localities.zoneId())
.detail("waitTime", waitTime);
.detail("waitTime", waitTime).detail("Port", port);
Void _ = wait( delay( waitTime ) );
@ -215,10 +216,11 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
state Future<ISimulator::KillType> onShutdown = process->onShutdown();
try {
TraceEvent("SimulatedRebooterStarting", localities.zoneId()).detail("Cycles", cycles)
TraceEvent("SimulatedRebooterStarting", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
.detailext("ZoneId", localities.zoneId())
.detailext("DataHall", localities.dataHallId())
.detail("ProcessAddress", process->address.toString())
.detail("ProcessExcluded", process->excluded)
.detail("UsingSSL", useSSL);
TraceEvent("ProgramStart").detail("Cycles", cycles)
.detail("SourceVersion", getHGVersion())
@ -255,8 +257,9 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
TraceEvent(e.code() == error_code_actor_cancelled || e.code() == error_code_file_not_found || destructed ? SevInfo : SevError, "SimulatedFDBDTerminated", localities.zoneId()).error(e, true);
}
TraceEvent("SimulatedFDBDDone", localities.zoneId()).detail("Cycles", cycles)
TraceEvent("SimulatedFDBDDone", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
.detail("ProcessAddress", process->address)
.detail("ProcessExcluded", process->excluded)
.detailext("ZoneId", localities.zoneId())
.detail("KillType", onShutdown.isReady() ? onShutdown.get() : ISimulator::None);
@ -280,21 +283,23 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
g_simulator.destroyProcess( process ); // Leak memory here; the process may be used in other parts of the simulation
auto shutdownResult = onShutdown.get();
TraceEvent("SimulatedFDBDShutdown", localities.zoneId()).detail("Cycles", cycles)
TraceEvent("SimulatedFDBDShutdown", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
.detail("ProcessAddress", process->address)
.detail("ProcessExcluded", process->excluded)
.detailext("ZoneId", localities.zoneId())
.detail("KillType", shutdownResult);
if( shutdownResult < ISimulator::RebootProcessAndDelete ) {
TraceEvent("SimulatedFDBDLowerReboot", localities.zoneId()).detail("Cycles", cycles)
TraceEvent("SimulatedFDBDLowerReboot", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
.detail("ProcessAddress", process->address)
.detail("ProcessExcluded", process->excluded)
.detailext("ZoneId", localities.zoneId())
.detail("KillType", shutdownResult);
return onShutdown.get();
}
if( onShutdown.get() == ISimulator::RebootProcessAndDelete ) {
TraceEvent("SimulatedFDBDRebootAndDelete", localities.zoneId()).detail("Cycles", cycles)
TraceEvent("SimulatedFDBDRebootAndDelete", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
.detail("ProcessAddress", process->address)
.detailext("ZoneId", localities.zoneId())
.detail("KillType", shutdownResult);
@ -311,7 +316,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
}
}
else {
TraceEvent("SimulatedFDBDJustRepeat", localities.zoneId()).detail("Cycles", cycles)
TraceEvent("SimulatedFDBDJustRepeat", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
.detail("ProcessAddress", process->address)
.detailext("ZoneId", localities.zoneId())
.detail("KillType", shutdownResult);
@ -744,7 +749,7 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
g_random->randomShuffle(coordinatorAddresses);
for(int i = 0; i < (coordinatorAddresses.size()/2)+1; i++) {
TraceEvent("ProtectMachine").detail("Address", coordinatorAddresses[i]).detail("Coordinators", coordinatorAddresses.size()).backtrace();
TraceEvent("ProtectCoordinator").detail("Address", coordinatorAddresses[i]).detail("Coordinators", describe(coordinatorAddresses)).backtrace();
g_simulator.protectedAddresses.insert(NetworkAddress(coordinatorAddresses[i].ip,coordinatorAddresses[i].port,true,false));
}
g_random->randomShuffle(coordinatorAddresses);