the data distributor and ratekeeper are not included in id_used, but when comparing equally good options we prefer to avoid sharing with those roles

excluded data distributor and ratekeeper were improperly killed when the best option was also excluded
This commit is contained in:
Evan Tschannen 2019-03-23 13:25:36 -07:00
parent 10988f89d9
commit b51a24453e

View File

@ -438,7 +438,7 @@ public:
}
WorkerFitnessInfo getWorkerForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, ProcessClass::Fitness unacceptableFitness, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<WorkerDetails>> fitness_workers;
std::map<std::pair<ProcessClass::Fitness,int>, std::pair<vector<WorkerDetails>,vector<WorkerDetails>>> fitness_workers;
for( auto& it : id_worker ) {
auto fitness = it.second.details.processClass.machineClassFitness( role );
@ -446,16 +446,23 @@ public:
fitness = std::max(fitness, ProcessClass::ExcludeFit);
}
if( workerAvailable(it.second, checkStable) && fitness < unacceptableFitness && it.second.details.interf.locality.dcId()==dcId ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(it.second.details);
if ((db.serverInfo->get().distributor.present() && db.serverInfo->get().distributor.get().locality.processId() == it.first) ||
(db.serverInfo->get().ratekeeper.present() && db.serverInfo->get().ratekeeper.get().locality.processId() == it.first)) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].second.push_back(it.second.details);
} else {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].first.push_back(it.second.details);
}
}
}
for( auto& it : fitness_workers ) {
auto& w = it.second;
g_random->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
id_used[w[i].interf.locality.processId()]++;
return WorkerFitnessInfo(w[i], it.first.first, it.first.second);
for( int j=0; j < 2; j++ ) {
auto& w = j==0 ? it.second.first : it.second.second;
g_random->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
id_used[w[i].interf.locality.processId()]++;
return WorkerFitnessInfo(w[i], it.first.first, it.first.second);
}
}
}
@ -463,7 +470,7 @@ public:
}
vector<WorkerDetails> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(), bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<WorkerDetails>> fitness_workers;
std::map<std::pair<ProcessClass::Fitness,int>, std::pair<vector<WorkerDetails>,vector<WorkerDetails>>> fitness_workers;
vector<WorkerDetails> results;
if (amount <= 0)
return results;
@ -472,18 +479,25 @@ public:
auto fitness = it.second.details.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && it.second.details.interf.locality.dcId() == dcId &&
( !minWorker.present() || ( it.second.details.interf.id() != minWorker.get().worker.interf.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(it.second.details);
if ((db.serverInfo->get().distributor.present() && db.serverInfo->get().distributor.get().locality.processId() == it.first) ||
(db.serverInfo->get().ratekeeper.present() && db.serverInfo->get().ratekeeper.get().locality.processId() == it.first)) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].second.push_back(it.second.details);
} else {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].first.push_back(it.second.details);
}
}
}
for( auto& it : fitness_workers ) {
auto& w = it.second;
g_random->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
results.push_back(w[i]);
id_used[w[i].interf.locality.processId()]++;
if( results.size() == amount )
return results;
for( int j=0; j < 2; j++ ) {
auto& w = j==0 ? it.second.first : it.second.second;
g_random->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
results.push_back(w[i]);
id_used[w[i].interf.locality.processId()]++;
if( results.size() == amount )
return results;
}
}
}
@ -572,12 +586,6 @@ public:
void updateKnownIds(std::map< Optional<Standalone<StringRef>>, int>* id_used) {
(*id_used)[masterProcessId]++;
(*id_used)[clusterControllerProcessId]++;
if (db.serverInfo->get().distributor.present()) {
(*id_used)[db.serverInfo->get().distributor.get().locality.processId()]++;
}
if (db.serverInfo->get().ratekeeper.present()) {
(*id_used)[db.serverInfo->get().ratekeeper.get().locality.processId()]++;
}
}
RecruitRemoteFromConfigurationReply findRemoteWorkersForConfiguration( RecruitRemoteFromConfigurationRequest const& req ) {
@ -975,12 +983,6 @@ public:
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[clusterControllerProcessId]++;
if (db.serverInfo->get().distributor.present()) {
id_used[db.serverInfo->get().distributor.get().locality.processId()]++;
}
if (db.serverInfo->get().ratekeeper.present()) {
id_used[db.serverInfo->get().ratekeeper.get().locality.processId()]++;
}
WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, true);
if ( oldMasterFit < mworker.fitness )
@ -1425,8 +1427,10 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
if (!self->recruitingRatekeeperID.present() && db.ratekeeper.present() && self->id_worker.count(db.ratekeeper.get().locality.processId())) {
auto& rkWorker = self->id_worker[db.ratekeeper.get().locality.processId()];
auto rkFitness = rkWorker.details.processClass.machineClassFitness(ProcessClass::RateKeeper);
if (self->isProxyOrResolver(rkWorker.details.interf.locality.processId()) ||
rkFitness > bestFitnessForRK || rkWorker.priorityInfo.isExcluded) {
if(rkWorker.priorityInfo.isExcluded) {
rkFitness == ProcessClass::ExcludeFit;
}
if (self->isProxyOrResolver(rkWorker.details.interf.locality.processId()) || rkFitness > bestFitnessForRK) {
TraceEvent("CC_HaltRK", self->id).detail("RKID", db.ratekeeper.get().id())
.detail("Excluded", rkWorker.priorityInfo.isExcluded)
.detail("Fitness", rkFitness).detail("BestFitness", bestFitnessForRK);
@ -1437,8 +1441,10 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
if (!self->recruitingDistributor && db.distributor.present() && self->id_worker.count(db.distributor.get().locality.processId())) {
auto& ddWorker = self->id_worker[db.distributor.get().locality.processId()];
auto ddFitness = ddWorker.details.processClass.machineClassFitness(ProcessClass::DataDistributor);
if (self->isProxyOrResolver(ddWorker.details.interf.locality.processId()) ||
ddFitness > bestFitnessForDD || ddWorker.priorityInfo.isExcluded) {
if(ddWorker.priorityInfo.isExcluded) {
ddFitness == ProcessClass::ExcludeFit;
}
if (self->isProxyOrResolver(ddWorker.details.interf.locality.processId()) || ddFitness > bestFitnessForDD) {
TraceEvent("CC_HaltDD", self->id).detail("DDID", db.distributor.get().id())
.detail("Excluded", ddWorker.priorityInfo.isExcluded)
.detail("Fitness", ddFitness).detail("BestFitness", bestFitnessForDD);
@ -2487,6 +2493,9 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
if (self->onMasterIsBetter(worker, ProcessClass::DataDistributor)) {
worker = self->id_worker[self->masterProcessId.get()].details;
}
if (self->db.serverInfo->get().distributor.present() && self->db.serverInfo->get().distributor.get().locality.processId() == worker.interf.locality.processId()) {
throw no_more_servers(); // Avoid recruiting an existing one.
}
InitializeDataDistributorRequest req(g_random->randomUniqueID());
TraceEvent("CC_DataDistributorRecruit", self->id).detail("Addr", worker.interf.address());