1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-25 00:30:00 +08:00

re-enabled better master exists

the cluster controller can choose a better data center for itself and let the workers know where the next cluster controller should be recruited
This commit is contained in:
Evan Tschannen 2018-02-09 16:48:55 -08:00
parent 63a9f2aed6
commit c7b3be5b19
15 changed files with 440 additions and 312 deletions

@ -93,7 +93,7 @@ private:
struct LeaderInfo {
UID changeID;
uint64_t mask = ~(15ll << 60);
uint64_t mask = ~(127ll << 57);
Value serializedInfo;
bool forward; // If true, serializedInfo is a connection string instead!
@ -103,12 +103,12 @@ struct LeaderInfo {
bool operator < (LeaderInfo const& r) const { return changeID < r.changeID; }
bool operator == (LeaderInfo const& r) const { return changeID == r.changeID; }
// The first 4 bits of ChangeID represent cluster controller process class fitness, the lower the better
void updateChangeID(uint64_t processClassFitness, bool isExcluded) {
changeID = UID( ( (uint64_t)isExcluded << 63) | (processClassFitness << 60) | (changeID.first() & mask ), changeID.second() );
// The first 7 bits of ChangeID represent cluster controller process class fitness, the lower the better
void updateChangeID(ClusterControllerPriorityInfo info) {
changeID = UID( ((uint64_t)info.isExcluded << 63) | ((uint64_t)info.dcFitness << 60) | ((uint64_t)info.processClassFitness << 57) | (changeID.first() & mask), changeID.second() );
}
// All but the first 4 bits are used to represent process id
// All but the first 7 bits are used to represent process id
bool equalInternalId(LeaderInfo const& leaderInfo) const {
if ( (changeID.first() & mask) == (leaderInfo.changeID.first() & mask) && changeID.second() == leaderInfo.changeID.second() ) {
return true;

@ -585,4 +585,41 @@ static bool addressExcluded( std::set<AddressExclusion> const& exclusions, Netwo
return exclusions.count( AddressExclusion(addr.ip, addr.port) ) || exclusions.count( AddressExclusion(addr.ip) );
}
struct ClusterControllerPriorityInfo {
enum DCFitness { FitnessPrimary, FitnessRemote, FitnessPreferred, FitnessUnknown, FitnessBad }; //cannot be larger than 7 because of leader election mask
static DCFitness calculateDCFitness(Optional<Key> dcId, vector<Optional<Key>> dcPriority) {
if(!dcPriority.size()) {
return FitnessUnknown;
} else if(dcPriority.size() == 1) {
if(dcId == dcPriority[0]) {
return FitnessPreferred;
} else {
return FitnessUnknown;
}
} else {
if(dcId == dcPriority[0]) {
return FitnessPrimary;
} else if(dcId == dcPriority[1]) {
return FitnessRemote;
} else {
return FitnessBad;
}
}
}
uint8_t processClassFitness;
bool isExcluded;
uint8_t dcFitness;
bool operator== (ClusterControllerPriorityInfo const& r) const { return processClassFitness == r.processClassFitness && isExcluded == r.isExcluded && dcFitness == r.dcFitness; }
ClusterControllerPriorityInfo(uint8_t processClassFitness, bool isExcluded, uint8_t dcFitness) : processClassFitness(processClassFitness), isExcluded(isExcluded), dcFitness(dcFitness) {}
template <class Ar>
void serialize(Ar& ar) {
ar & processClassFitness & isExcluded & dcFitness;
}
};
#endif

@ -27,7 +27,7 @@
struct ProcessClass {
// This enum is stored in restartInfo.ini for upgrade tests, so be very careful about changing the existing items!
enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, ClusterControllerClass, LogRouterClass, InvalidClass = -1 };
enum Fitness { BestFit, GoodFit, BestOtherFit, UnsetFit, WorstFit, ExcludeFit, NeverAssign };
enum Fitness { BestFit, GoodFit, BestOtherFit, UnsetFit, WorstFit, ExcludeFit, NeverAssign }; //cannot be larger than 7 because of leader election mask
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter, ClusterController };
enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 };
int16_t _class;

@ -49,14 +49,14 @@ struct WorkerInfo : NonCopyable {
WorkerInterface interf;
ProcessClass initialClass;
ProcessClass processClass;
bool isExcluded;
ClusterControllerPriorityInfo priorityInfo;
WorkerInfo() : gen(-1), reboots(0) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, bool isExcluded ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded) {}
WorkerInfo() : gen(-1), reboots(0), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo) {}
WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass), isExcluded(r.isExcluded) {}
reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass), priorityInfo(r.priorityInfo) {}
void operator=( WorkerInfo&& r ) noexcept(true) {
watcher = std::move(r.watcher);
reply = std::move(r.reply);
@ -65,10 +65,19 @@ struct WorkerInfo : NonCopyable {
interf = std::move(r.interf);
initialClass = r.initialClass;
processClass = r.processClass;
isExcluded = r.isExcluded;
priorityInfo = r.priorityInfo;
}
};
struct WorkerFitnessInfo {
std::pair<WorkerInterface, ProcessClass> worker;
ProcessClass::Fitness fitness;
int used;
WorkerFitnessInfo() : fitness(ProcessClass::NeverAssign), used(0) {}
WorkerFitnessInfo(std::pair<WorkerInterface, ProcessClass> worker, ProcessClass::Fitness fitness, int used) : worker(worker), fitness(fitness), used(used) {}
};
class ClusterControllerData {
public:
struct DBInfo {
@ -192,35 +201,6 @@ public:
throw no_more_servers();
}
//FIXME: get master in the same datacenter as the proxies and resolvers for ratekeeper, however this is difficult because the master is recruited before we know the cluster's configuration
std::pair<WorkerInterface, ProcessClass> getMasterWorker( DatabaseConfiguration const& conf, bool checkStable = false ) {
ProcessClass::Fitness bestFit = ProcessClass::NeverAssign;
Optional<std::pair<WorkerInterface, ProcessClass>> bestInfo;
bool bestIsClusterController = false;
int numEquivalent = 1;
for( auto& it : id_worker ) {
auto fit = it.second.processClass.machineClassFitness( ProcessClass::Master );
if(conf.isExcludedServer(it.second.interf.address())) {
fit = std::max(fit, ProcessClass::ExcludeFit);
}
if( workerAvailable(it.second, checkStable) && fit != ProcessClass::NeverAssign ) {
if( fit < bestFit || (fit == bestFit && bestIsClusterController) ) {
bestInfo = std::make_pair(it.second.interf, it.second.processClass);
bestFit = fit;
numEquivalent = 1;
bestIsClusterController = clusterControllerProcessId == it.first;
}
else if( fit == bestFit && clusterControllerProcessId != it.first && g_random->random01() < 1.0/++numEquivalent )
bestInfo = std::make_pair(it.second.interf, it.second.processClass);
}
}
if( bestInfo.present() )
return bestInfo.get();
throw no_more_servers();
}
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForSeedServers( DatabaseConfiguration const& conf, IRepPolicyRef const& policy, Optional<Optional<Standalone<StringRef>>> const& dcId = Optional<Optional<Standalone<StringRef>>>() ) {
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
@ -405,38 +385,15 @@ public:
return results;
}
struct WorkerFitnessInfo {
std::pair<WorkerInterface, ProcessClass> worker;
ProcessClass::Fitness fitness;
int used;
WorkerFitnessInfo(std::pair<WorkerInterface, ProcessClass> worker, ProcessClass::Fitness fitness, int used) : worker(worker), fitness(fitness), used(used) {}
};
WorkerFitnessInfo getWorkerForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false ) {
WorkerFitnessInfo getWorkerForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, ProcessClass::Fitness unacceptableFitness, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && fitness != ProcessClass::NeverAssign && it.second.interf.locality.dcId()==dcId ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(std::make_pair(it.second.interf, it.second.processClass));
if(conf.isExcludedServer(it.second.interf.address())) {
fitness = std::max(fitness, ProcessClass::ExcludeFit);
}
}
for( auto& it : fitness_workers ) {
auto& w = it.second;
g_random->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
id_used[w[i].first.locality.processId()]++;
return WorkerFitnessInfo(w[i], it.first.first, it.first.second);
}
}
//If we did not find enough workers in the primary data center, add workers from other data centers
fitness_workers.clear();
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && fitness != ProcessClass::NeverAssign && it.second.interf.locality.dcId()!=dcId ) {
if( workerAvailable(it.second, checkStable) && fitness < unacceptableFitness && it.second.interf.locality.dcId()==dcId ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(std::make_pair(it.second.interf, it.second.processClass));
}
}
@ -481,102 +438,54 @@ public:
return results;
}
struct InDatacenterFitness {
ProcessClass::Fitness proxyFit;
ProcessClass::Fitness resolverFit;
int proxyCount;
int resolverCount;
InDatacenterFitness( ProcessClass::Fitness proxyFit, ProcessClass::Fitness resolverFit, int proxyCount, int resolverCount)
: proxyFit(proxyFit), resolverFit(resolverFit), proxyCount(proxyCount), resolverCount(resolverCount) {}
InDatacenterFitness() : proxyFit( ProcessClass::NeverAssign ), resolverFit( ProcessClass::NeverAssign ) {}
InDatacenterFitness( vector<std::pair<WorkerInterface, ProcessClass>> proxies, vector<std::pair<WorkerInterface, ProcessClass>> resolvers ) {
proxyFit = ProcessClass::BestFit;
resolverFit = ProcessClass::BestFit;
for(auto it: proxies) {
proxyFit = std::max(proxyFit, it.second.machineClassFitness( ProcessClass::Proxy ));
}
for(auto it: resolvers) {
resolverFit = std::max(resolverFit, it.second.machineClassFitness( ProcessClass::Resolver ));
}
proxyCount = proxies.size();
resolverCount = resolvers.size();
}
InDatacenterFitness( vector<MasterProxyInterface> proxies, vector<ResolverInterface> resolvers, vector<ProcessClass> proxyClasses, vector<ProcessClass> resolverClasses ) {
std::set<Optional<Standalone<StringRef>>> dcs;
proxyFit = ProcessClass::BestFit;
resolverFit = ProcessClass::BestFit;
for(int i = 0; i < proxies.size(); i++) {
dcs.insert(proxies[i].locality.dcId());
proxyFit = std::max(proxyFit, proxyClasses[i].machineClassFitness( ProcessClass::Proxy ));
}
for(int i = 0; i < resolvers.size(); i++) {
dcs.insert(resolvers[i].locality.dcId());
resolverFit = std::max(resolverFit, resolverClasses[i].machineClassFitness( ProcessClass::Resolver ));
}
proxyCount = proxies.size();
resolverCount = resolvers.size();
}
bool operator < (InDatacenterFitness const& r) const {
int lmax = std::max(resolverFit,proxyFit);
int lmin = std::min(resolverFit,proxyFit);
int rmax = std::max(r.resolverFit,r.proxyFit);
int rmin = std::min(r.resolverFit,r.proxyFit);
if( lmax != rmax ) return lmax < rmax;
if( lmin != rmin ) return lmin < rmin;
if(proxyCount != r.proxyCount) return proxyCount > r.proxyCount;
return resolverCount > r.resolverCount;
}
bool betterInDatacenterFitness (InDatacenterFitness const& r) const {
int lmax = std::max(resolverFit,proxyFit);
int lmin = std::min(resolverFit,proxyFit);
int rmax = std::max(r.resolverFit,r.proxyFit);
int rmin = std::min(r.resolverFit,r.proxyFit);
if( lmax != rmax ) return lmax < rmax;
if( lmin != rmin ) return lmin < rmin;
return false;
}
bool operator == (InDatacenterFitness const& r) const { return proxyFit == r.proxyFit && resolverFit == r.resolverFit && proxyCount == r.proxyCount && resolverCount == r.resolverCount; }
};
struct TLogFitness {
struct RoleFitness {
ProcessClass::Fitness bestFit;
ProcessClass::Fitness worstFit;
int tlogCount;
int count;
TLogFitness( ProcessClass::Fitness bestFit, ProcessClass::Fitness worstFit, int tlogCount) : bestFit(bestFit), worstFit(worstFit), tlogCount(tlogCount) {}
RoleFitness(int bestFit, int worstFit, int count) : bestFit((ProcessClass::Fitness)bestFit), worstFit((ProcessClass::Fitness)worstFit), count(count) {}
TLogFitness() : bestFit( ProcessClass::NeverAssign ), worstFit( ProcessClass::NeverAssign ), tlogCount(0) {}
RoleFitness(int fitness, int count) : bestFit((ProcessClass::Fitness)fitness), worstFit((ProcessClass::Fitness)fitness), count(count) {}
TLogFitness( vector<std::pair<WorkerInterface, ProcessClass>> tlogs ) {
RoleFitness() : bestFit(ProcessClass::NeverAssign), worstFit(ProcessClass::NeverAssign), count(0) {}
RoleFitness(RoleFitness first, RoleFitness second) : bestFit(std::min(first.worstFit, second.worstFit)), worstFit(std::max(first.worstFit, second.worstFit)), count(first.count + second.count) {}
RoleFitness( vector<std::pair<WorkerInterface, ProcessClass>> workers, ProcessClass::ClusterRole role ) {
worstFit = ProcessClass::BestFit;
bestFit = ProcessClass::NeverAssign;
for(auto it : tlogs) {
auto thisFit = it.second.machineClassFitness( ProcessClass::TLog );
for(auto it : workers) {
auto thisFit = it.second.machineClassFitness( role );
worstFit = std::max(worstFit, thisFit);
bestFit = std::min(bestFit, thisFit);
}
tlogCount = tlogs.size();
count = workers.size();
}
bool operator < (TLogFitness const& r) const {
RoleFitness( std::vector<ProcessClass> classes, ProcessClass::ClusterRole role ) {
worstFit = ProcessClass::BestFit;
bestFit = ProcessClass::NeverAssign;
for(auto it : classes) {
auto thisFit = it.machineClassFitness( role );
worstFit = std::max(worstFit, thisFit);
bestFit = std::min(bestFit, thisFit);
}
count = classes.size();
}
bool operator < (RoleFitness const& r) const {
if (worstFit != r.worstFit) return worstFit < r.worstFit;
if (bestFit != r.bestFit) return bestFit < r.bestFit;
return tlogCount > r.tlogCount;
return count > r.count;
}
bool operator == (TLogFitness const& r) const { return worstFit == r.worstFit && bestFit == r.bestFit && tlogCount == r.tlogCount; }
bool betterFitness (RoleFitness const& r) const {
if (worstFit != r.worstFit) return worstFit < r.worstFit;
if (bestFit != r.bestFit) return bestFit < r.bestFit;
return false;
}
bool operator == (RoleFitness const& r) const { return worstFit == r.worstFit && bestFit == r.bestFit && count == r.count; }
};
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
@ -602,21 +511,21 @@ public:
result.remoteTLogs.push_back(remoteLogs[i].first);
}
auto logRouters = getWorkersForRoleInDatacenter( req.dcId, ProcessClass::LogRouter, req.configuration.logRouterCount, req.configuration, id_used );
auto logRouters = getWorkersForRoleInDatacenter( req.dcId, ProcessClass::LogRouter, req.configuration.getDesiredLogRouters(), req.configuration, id_used );
for(int i = 0; i < logRouters.size(); i++) {
result.logRouters.push_back(logRouters[i].first);
}
//FIXME: fitness for logs is wrong
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( TLogFitness(remoteLogs) > TLogFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ) ) {
( ( RoleFitness(remoteLogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ) ||
( RoleFitness(logRouters, ProcessClass::LogRouter) > RoleFitness(SERVER_KNOBS->EXPECTED_LOG_ROUTER_FITNESS, req.configuration.getDesiredLogRouters()) ) ) ) {
throw operation_failed();
}
return result;
}
RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req, Optional<Key> dcId ) {
ErrorOr<RecruitFromConfigurationReply> findWorkersForConfiguration( RecruitFromConfigurationRequest const& req, Optional<Key> dcId ) {
RecruitFromConfigurationReply result;
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[masterProcessId]++;
@ -643,6 +552,7 @@ public:
result.tLogs.push_back(tlogs[i].first);
}
std::vector<std::pair<WorkerInterface, ProcessClass>> satelliteLogs;
if(req.configuration.satelliteTLogReplicationFactor > 0) {
std::set<Optional<Key>> satelliteDCs;
if( dcId == req.configuration.primaryDcId ) {
@ -651,15 +561,15 @@ public:
satelliteDCs.insert( req.configuration.remoteSatelliteDcIds.begin(), req.configuration.remoteSatelliteDcIds.end() );
}
//FIXME: recruitment does not respect usable_dcs, a.k.a if usable_dcs is 1 we should recruit all tlogs in one data center
auto satelliteLogs = getWorkersForTlogs( req.configuration, req.configuration.satelliteTLogReplicationFactor, req.configuration.getDesiredSatelliteLogs(), req.configuration.satelliteTLogPolicy, id_used, false, satelliteDCs );
satelliteLogs = getWorkersForTlogs( req.configuration, req.configuration.satelliteTLogReplicationFactor, req.configuration.getDesiredSatelliteLogs(), req.configuration.satelliteTLogPolicy, id_used, false, satelliteDCs );
for(int i = 0; i < satelliteLogs.size(); i++) {
result.satelliteTLogs.push_back(satelliteLogs[i].first);
}
}
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration, id_used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration, id_used );
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, req.configuration, id_used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, req.configuration, id_used );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, id_used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, id_used, first_resolver );
@ -667,17 +577,17 @@ public:
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto fitness = InDatacenterFitness(proxies, resolvers);
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
//FIXME: fitness for logs is wrong
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( TLogFitness(tlogs) > TLogFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
fitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
throw operation_failed();
( RoleFitness(tlogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
( req.configuration.satelliteTLogReplicationFactor > 0 && RoleFitness(satelliteLogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs()) ) ||
RoleFitness(proxies, ProcessClass::Proxy) > RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies()) ||
RoleFitness(resolvers, ProcessClass::Resolver) > RoleFitness(SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredResolvers()) ) ) {
return operation_failed();
}
return result;
@ -685,15 +595,38 @@ public:
RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req ) {
if(req.configuration.primaryDcId.present()) {
bool setPrimaryDesired = false;
try {
return findWorkersForConfiguration(req, req.configuration.primaryDcId);
auto reply = findWorkersForConfiguration(req, req.configuration.primaryDcId);
setPrimaryDesired = true;
vector<Optional<Key>> dcPriority;
dcPriority.push_back(req.configuration.primaryDcId);
dcPriority.push_back(req.configuration.remoteDcId);
desiredDcIds.set(dcPriority);
if(reply.isError()) {
throw reply.getError();
} else if(req.configuration.primaryDcId == clusterControllerDcId) {
return reply.get();
}
throw no_more_servers();
} catch( Error& e ) {
if (e.code() == error_code_no_more_servers || e.code() == error_code_operation_failed) {
TraceEvent(SevWarn, "AttemptingRecruitmentInRemoteDC", id).error(e);
return findWorkersForConfiguration(req, req.configuration.remoteDcId);
} else {
if (e.code() != error_code_no_more_servers) {
throw;
}
TraceEvent(SevWarn, "AttemptingRecruitmentInRemoteDC", id).error(e);
auto reply = findWorkersForConfiguration(req, req.configuration.remoteDcId);
if(!setPrimaryDesired) {
vector<Optional<Key>> dcPriority;
dcPriority.push_back(req.configuration.remoteDcId);
dcPriority.push_back(req.configuration.primaryDcId);
desiredDcIds.set(dcPriority);
}
if(reply.isError()) {
throw reply.getError();
} else if(req.configuration.remoteDcId == clusterControllerDcId) {
return reply.get();
}
throw;
}
} else {
RecruitFromConfigurationReply result;
@ -706,58 +639,73 @@ public:
result.tLogs.push_back(tlogs[i].first);
}
auto datacenters = getDatacenters( req.configuration );
InDatacenterFitness bestFitness;
int numEquivalent = 1;
if(req.recruitSeedServers) {
auto primaryStorageServers = getWorkersForSeedServers( req.configuration, req.configuration.storagePolicy );
for(int i = 0; i < primaryStorageServers.size(); i++)
result.storageServers.push_back(primaryStorageServers[i].first);
}
auto datacenters = getDatacenters( req.configuration );
RoleFitness bestFitness;
int numEquivalent = 1;
Optional<Key> bestDC;
for(auto dcId : datacenters ) {
auto used = id_used;
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration, used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration, used );
try {
//SOMEDAY: recruitment in other DCs besides the clusterControllerDcID will not account for the processes used by the master and cluster controller properly.
auto used = id_used;
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, req.configuration, used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, req.configuration, used );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, used, first_resolver );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, used, first_resolver );
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto fitness = InDatacenterFitness(proxies, resolvers);
if(fitness < bestFitness) {
bestFitness = fitness;
numEquivalent = 1;
result.resolvers = vector<WorkerInterface>();
result.proxies = vector<WorkerInterface>();
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
} else if( fitness == bestFitness && g_random->random01() < 1.0/++numEquivalent ) {
result.resolvers = vector<WorkerInterface>();
result.proxies = vector<WorkerInterface>();
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
auto fitness = RoleFitness( RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver) );
if(dcId == clusterControllerDcId) {
bestFitness = fitness;
bestDC = dcId;
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
break;
} else {
if(fitness < bestFitness) {
bestFitness = fitness;
numEquivalent = 1;
bestDC = dcId;
} else if( fitness == bestFitness && g_random->random01() < 1.0/++numEquivalent ) {
bestDC = dcId;
}
}
} catch( Error &e ) {
if(e.code() != error_code_no_more_servers) {
throw;
}
}
}
ASSERT(bestFitness != InDatacenterFitness());
if(bestDC != clusterControllerDcId) {
vector<Optional<Key>> dcPriority;
dcPriority.push_back(bestDC);
desiredDcIds.set(dcPriority);
throw no_more_servers();
}
//If this cluster controller dies, do not prioritize recruiting the next one in the same DC
desiredDcIds.set(vector<Optional<Key>>());
TraceEvent("findWorkersForConfig").detail("replication", req.configuration.tLogReplicationFactor)
.detail("desiredLogs", req.configuration.getDesiredLogs()).detail("actualLogs", result.tLogs.size())
.detail("desiredProxies", req.configuration.getDesiredProxies()).detail("actualProxies", result.proxies.size())
.detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size());
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( TLogFitness(tlogs) > TLogFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
( RoleFitness(tlogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
bestFitness > RoleFitness(std::min(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), std::max(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), req.configuration.getDesiredProxies()+req.configuration.getDesiredResolvers()) ) ) {
throw operation_failed();
}
@ -765,6 +713,39 @@ public:
}
}
void checkPrimaryDC() {
if(db.config.primaryDcId.present() && db.config.primaryDcId != clusterControllerDcId) {
try {
std::map< Optional<Standalone<StringRef>>, int> id_used;
getWorkerForRoleInDatacenter(db.config.primaryDcId, ProcessClass::ClusterController, ProcessClass::ExcludeFit, db.config, id_used, true);
getWorkerForRoleInDatacenter(db.config.primaryDcId, ProcessClass::Master, ProcessClass::ExcludeFit, db.config, id_used, true);
std::set<Optional<Key>> primaryDC;
primaryDC.insert(db.config.primaryDcId);
getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.desiredTLogCount, db.config.tLogPolicy, id_used, true, primaryDC);
if(db.config.satelliteTLogReplicationFactor > 0) {
std::set<Optional<Key>> satelliteDCs;
satelliteDCs.insert( db.config.primarySatelliteDcIds.begin(), db.config.primarySatelliteDcIds.end() );
getWorkersForTlogs(db.config, db.config.satelliteTLogReplicationFactor, db.config.getDesiredSatelliteLogs(), db.config.satelliteTLogPolicy, id_used, true, satelliteDCs);
}
getWorkerForRoleInDatacenter( db.config.primaryDcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
getWorkerForRoleInDatacenter( db.config.primaryDcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
vector<Optional<Key>> dcPriority;
dcPriority.push_back(db.config.primaryDcId);
dcPriority.push_back(db.config.remoteDcId);
desiredDcIds.set(dcPriority);
} catch( Error &e ) {
if(e.code() != error_code_no_more_servers) {
throw;
}
}
}
}
//FIXME: determine when to fail the cluster controller when a primaryDC has not been set
bool betterMasterExists() {
ServerDBInfo dbi = db.serverInfo->get();
@ -772,6 +753,8 @@ public:
return false;
}
checkPrimaryDC();
// Get master process
auto masterWorker = id_worker.find(dbi.master.locality.processId());
if(masterWorker == id_worker.end()) {
@ -780,13 +763,35 @@ public:
// Get tlog processes
std::vector<std::pair<WorkerInterface, ProcessClass>> tlogs;
for( auto& it : dbi.logSystemConfig.tLogs[0].tLogs ) {
auto tlogWorker = id_worker.find(it.interf().locality.processId());
if ( tlogWorker == id_worker.end() )
return false;
if ( tlogWorker->second.isExcluded )
return true;
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
std::vector<std::pair<WorkerInterface, ProcessClass>> remote_tlogs;
std::vector<std::pair<WorkerInterface, ProcessClass>> satellite_tlogs;
std::vector<std::pair<WorkerInterface, ProcessClass>> log_routers;
for( auto& logSet : dbi.logSystemConfig.tLogs ) {
for( auto& it : logSet.tLogs ) {
auto tlogWorker = id_worker.find(it.interf().locality.processId());
if ( tlogWorker == id_worker.end() )
return false;
if ( tlogWorker->second.priorityInfo.isExcluded )
return true;
if(logSet.isLocal && logSet.hasBestPolicy > HasBestPolicyNone) {
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
} else if(logSet.isLocal) {
satellite_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
} else {
remote_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
}
for( auto& it : logSet.logRouters ) {
auto tlogWorker = id_worker.find(it.interf().locality.processId());
if ( tlogWorker == id_worker.end() )
return false;
if ( tlogWorker->second.priorityInfo.isExcluded )
return true;
log_routers.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
}
// Get proxy classes
@ -795,7 +800,7 @@ public:
auto proxyWorker = id_worker.find(it.locality.processId());
if ( proxyWorker == id_worker.end() )
return false;
if ( proxyWorker->second.isExcluded )
if ( proxyWorker->second.priorityInfo.isExcluded )
return true;
proxyClasses.push_back(proxyWorker->second.processClass);
}
@ -806,7 +811,7 @@ public:
auto resolverWorker = id_worker.find(it.locality.processId());
if ( resolverWorker == id_worker.end() )
return false;
if ( resolverWorker->second.isExcluded )
if ( resolverWorker->second.priorityInfo.isExcluded )
return true;
resolverClasses.push_back(resolverWorker->second.processClass);
}
@ -817,58 +822,74 @@ public:
oldMasterFit = std::max(oldMasterFit, ProcessClass::ExcludeFit);
}
auto mworker = getMasterWorker(db.config, true);
ProcessClass::Fitness newMasterFit = mworker.second.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(mworker.first.address())) {
newMasterFit = std::max(newMasterFit, ProcessClass::ExcludeFit);
}
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[clusterControllerProcessId]++;
WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, true);
if ( oldMasterFit < newMasterFit )
if ( oldMasterFit < mworker.fitness )
return false;
if ( oldMasterFit > newMasterFit )
if ( oldMasterFit > mworker.fitness )
return true;
// Check tLog fitness
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[masterProcessId]++;
id_used[clusterControllerProcessId]++;
std::set<Optional<Key>> primaryDC;
std::set<Optional<Key>> satelliteDCs;
std::set<Optional<Key>> remoteDC;
if(db.config.primaryDcId.present()) {
primaryDC.insert(clusterControllerDcId == db.config.primaryDcId ? db.config.primaryDcId : db.config.remoteDcId);
remoteDC.insert(clusterControllerDcId == db.config.primaryDcId ? db.config.remoteDcId : db.config.primaryDcId);
if(db.config.satelliteTLogReplicationFactor > 0) {
if( clusterControllerDcId == db.config.primaryDcId ) {
satelliteDCs.insert( db.config.primarySatelliteDcIds.begin(), db.config.primarySatelliteDcIds.end() );
} else {
satelliteDCs.insert( db.config.remoteSatelliteDcIds.begin(), db.config.remoteSatelliteDcIds.end() );
}
}
}
TLogFitness oldTLogFit(tlogs);
TLogFitness newTLotFit(getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.desiredTLogCount, db.config.tLogPolicy, id_used, true));
// Check tLog fitness
RoleFitness oldTLogFit(tlogs, ProcessClass::TLog);
RoleFitness newTLotFit(getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.desiredTLogCount, db.config.tLogPolicy, id_used, true, primaryDC), ProcessClass::TLog);
if(oldTLogFit < newTLotFit) return false;
RoleFitness oldSatelliteTLogFit(satellite_tlogs, ProcessClass::TLog);
RoleFitness newSatelliteTLotFit(db.config.satelliteTLogReplicationFactor > 0 ? getWorkersForTlogs(db.config, db.config.satelliteTLogReplicationFactor, db.config.getDesiredSatelliteLogs(), db.config.satelliteTLogPolicy, id_used, true, satelliteDCs) : satellite_tlogs, ProcessClass::TLog);
if(oldSatelliteTLogFit < newSatelliteTLotFit) return false;
RoleFitness oldRemoteTLogFit(remote_tlogs, ProcessClass::TLog);
RoleFitness newRemoteTLotFit((db.config.remoteTLogReplicationFactor > 0 && dbi.recoveryState == RecoveryState::REMOTE_RECOVERED) ? getWorkersForTlogs(db.config, db.config.remoteTLogReplicationFactor, db.config.getDesiredRemoteLogs(), db.config.remoteTLogPolicy, id_used, true, remoteDC) : remote_tlogs, ProcessClass::TLog);
if(oldRemoteTLogFit < newRemoteTLotFit) return false;
RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter);
RoleFitness newLogRoutersFit((db.config.remoteTLogReplicationFactor > 0 && dbi.recoveryState == RecoveryState::REMOTE_RECOVERED) ? getWorkersForRoleInDatacenter( *remoteDC.begin(), ProcessClass::LogRouter, db.config.getDesiredLogRouters(), db.config, id_used, Optional<WorkerFitnessInfo>(), true ) : log_routers, ProcessClass::LogRouter);
if(oldLogRoutersFit < newLogRoutersFit) return false;
// Check proxy/resolver fitness
InDatacenterFitness oldInFit(dbi.client.proxies, dbi.resolvers, proxyClasses, resolverClasses);
RoleFitness oldInFit(RoleFitness(proxyClasses, ProcessClass::Proxy), RoleFitness(resolverClasses, ProcessClass::Resolver));
auto datacenters = getDatacenters( db.config, true );
InDatacenterFitness newInFit;
for(auto dcId : datacenters) {
auto used = id_used;
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, db.config, used, true );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, db.config, used, true );
auto first_resolver = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
auto first_proxy = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, db.config.getDesiredProxies()-1, db.config, used, first_proxy, true );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, db.config.getDesiredResolvers()-1, db.config, used, first_resolver, true );
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto proxies = getWorkersForRoleInDatacenter( clusterControllerDcId, ProcessClass::Proxy, db.config.getDesiredProxies()-1, db.config, id_used, first_proxy, true );
auto resolvers = getWorkersForRoleInDatacenter( clusterControllerDcId, ProcessClass::Resolver, db.config.getDesiredResolvers()-1, db.config, id_used, first_resolver, true );
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto fitness = InDatacenterFitness(proxies, resolvers);
if(fitness < newInFit)
newInFit = fitness;
}
RoleFitness newInFit(RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver));
if(oldInFit.betterInDatacenterFitness(newInFit)) return false;
if(oldInFit.betterFitness(newInFit)) return false;
if(oldTLogFit > newTLotFit || oldInFit > newInFit) {
TraceEvent("BetterMasterExists", id).detail("oldMasterFit", oldMasterFit).detail("newMasterFit", newMasterFit)
.detail("oldTLogFitC", oldTLogFit.tlogCount).detail("newTLotFitC", newTLotFit.tlogCount)
if(oldTLogFit > newTLotFit || oldInFit > newInFit || oldSatelliteTLogFit > newSatelliteTLotFit || oldRemoteTLogFit > newRemoteTLotFit || oldLogRoutersFit > newLogRoutersFit) {
TraceEvent("BetterMasterExists", id).detail("oldMasterFit", oldMasterFit).detail("newMasterFit", mworker.fitness)
.detail("oldTLogFitC", oldTLogFit.count).detail("newTLotFitC", newTLotFit.count)
.detail("oldTLogWorstFitT", oldTLogFit.worstFit).detail("newTLotWorstFitT", newTLotFit.worstFit)
.detail("oldTLogBestFitT", oldTLogFit.bestFit).detail("newTLotBestFitT", newTLotFit.bestFit)
.detail("oldInFitP", oldInFit.proxyFit).detail("newInFitP", newInFit.proxyFit)
.detail("oldInFitR", oldInFit.resolverFit).detail("newInFitR", newInFit.resolverFit)
.detail("oldInFitPC", oldInFit.proxyCount).detail("newInFitPC", newInFit.proxyCount)
.detail("oldInFitRC", oldInFit.resolverCount).detail("newInFitRC", newInFit.resolverCount);
.detail("oldInFitW", oldInFit.worstFit).detail("newInFitW", newInFit.worstFit)
.detail("oldInFitB", oldInFit.bestFit).detail("newInFitB", newInFit.bestFit)
.detail("oldInFitC", oldInFit.count).detail("newInFitC", newInFit.count);
return true;
}
@ -881,8 +902,11 @@ public:
bool gotProcessClasses;
bool gotFullyRecoveredConfig;
Optional<Standalone<StringRef>> masterProcessId;
Optional<Standalone<StringRef>> masterDcId;
Optional<Standalone<StringRef>> clusterControllerProcessId;
Optional<Standalone<StringRef>> clusterControllerDcId;
AsyncVar<Optional<vector<Optional<Key>>>> desiredDcIds; //desired DC priorities
AsyncVar<Optional<vector<Optional<Key>>>> changingDcIds; //current DC priorities for everyone other than the cluster controller process
AsyncVar<Optional<vector<Optional<Key>>>> changedDcIds; //current DC priority for the cluster controller process
UID id;
std::vector<RecruitFromConfigurationRequest> outstandingRecruitmentRequests;
std::vector<RecruitRemoteFromConfigurationRequest> outstandingRemoteRecruitmentRequests;
@ -932,18 +956,26 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
try {
state double recoveryStart = now();
TraceEvent("CCWDB", cluster->id).detail("Recruiting", "Master");
state std::pair<WorkerInterface, ProcessClass> masterWorker = cluster->getMasterWorker(db->config);
if( masterWorker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.second.machineClassFitness( ProcessClass::Master ));
while(!cluster->clusterControllerProcessId.present()) {
Void _ = wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
//We must recruit the master in the same data center as the cluster controller.
//This should always be possible, because we can recruit the master on the same process as the cluster controller.
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[cluster->clusterControllerProcessId]++;
state WorkerFitnessInfo masterWorker = cluster->getWorkerForRoleInDatacenter(cluster->clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db->config, id_used);
if( masterWorker.worker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.worker.second.machineClassFitness( ProcessClass::Master ));
Void _ = wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
continue;
}
RecruitMasterRequest rmq;
rmq.lifetime = db->serverInfo->get().masterLifetime;
cluster->masterProcessId = masterWorker.first.locality.processId();
cluster->masterDcId = masterWorker.first.locality.dcId();
ErrorOr<MasterInterface> newMaster = wait( masterWorker.first.master.tryGetReply( rmq ) );
cluster->masterProcessId = masterWorker.worker.first.locality.processId();
ErrorOr<MasterInterface> newMaster = wait( masterWorker.worker.first.master.tryGetReply( rmq ) );
if (newMaster.present()) {
TraceEvent("CCWDB", cluster->id).detail("Recruited", newMaster.get().id());
@ -1140,15 +1172,12 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) {
ACTOR Future<Void> doCheckOutstandingMasterRequests( ClusterControllerData* self ) {
Void _ = wait( delay(SERVER_KNOBS->CHECK_BETTER_MASTER_INTERVAL) );
//FIXME: re-enable betterMasterExists
/*
if (self->betterMasterExists()) {
if (!self->db.forceMasterFailure.isSet()) {
self->db.forceMasterFailure.send( Void() );
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id());
}
}
*/
return Void();
}
@ -1197,7 +1226,7 @@ ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass
when( Void _ = wait( failed ) ) { // remove workers that have failed
WorkerInfo& failedWorkerInfo = cluster->id_worker[ worker.locality.processId() ];
if (!failedWorkerInfo.reply.isSet()) {
failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.processClass, failedWorkerInfo.isExcluded) );
failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.processClass, failedWorkerInfo.priorityInfo) );
}
cluster->id_worker.erase( worker.locality.processId() );
cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() );
@ -1444,8 +1473,11 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
db->fullyRecoveredConfig = req.configuration.get();
for ( auto& it : self->id_worker ) {
bool isExcludedFromConfig = db->fullyRecoveredConfig.isExcludedServer(it.second.interf.address());
if ( it.second.isExcluded != isExcludedFromConfig && !it.second.reply.isSet() ) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, isExcludedFromConfig) );
if ( it.second.priorityInfo.isExcluded != isExcludedFromConfig ) {
it.second.priorityInfo.isExcluded = isExcludedFromConfig;
if( !it.second.reply.isSet() ) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
}
}
}
}
@ -1502,13 +1534,19 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
WorkerInterface w = req.wi;
ProcessClass newProcessClass = req.processClass;
bool newIsExcluded = req.isExcluded;
auto info = self->id_worker.find( w.locality.processId() );
ClusterControllerPriorityInfo newPriorityInfo = req.priorityInfo;
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString()).detail("Workers", self->id_worker.size()).detail("Registered", (info == self->id_worker.end() ? "False" : "True")).backtrace();
if ( w.address() == g_network->getLocalAddress() ) {
self->clusterControllerProcessId = w.locality.processId();
self->clusterControllerDcId = w.locality.dcId();
if(self->changedDcIds.get().present()) {
newPriorityInfo.dcFitness = ClusterControllerPriorityInfo::calculateDCFitness( w.locality.dcId(), self->changedDcIds.get().get() );
}
} else if(self->changingDcIds.get().present()) {
newPriorityInfo.dcFitness = ClusterControllerPriorityInfo::calculateDCFitness( w.locality.dcId(), self->changingDcIds.get().get() );
}
// Check process class and exclusive property
@ -1521,20 +1559,21 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
} else {
newProcessClass = req.initialClass;
}
newPriorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController);
}
if ( self->gotFullyRecoveredConfig ) {
newIsExcluded = self->db.fullyRecoveredConfig.isExcludedServer(w.address());
newPriorityInfo.isExcluded = self->db.fullyRecoveredConfig.isExcludedServer(w.address());
}
// Notify the worker to register again with new process class/exclusive property
if ( !req.reply.isSet() && ( newProcessClass != req.processClass || newIsExcluded != req.isExcluded ) ) {
req.reply.send( RegisterWorkerReply(newProcessClass, newIsExcluded) );
if ( !req.reply.isSet() && newPriorityInfo != req.priorityInfo ) {
req.reply.send( RegisterWorkerReply(newProcessClass, newPriorityInfo) );
}
}
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, req.isExcluded );
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo );
checkOutstandingRequests( self );
return;
}
@ -1545,7 +1584,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
}
info->second.reply = req.reply;
info->second.processClass = newProcessClass;
info->second.isExcluded = req.isExcluded;
info->second.priorityInfo = newPriorityInfo;
info->second.initialClass = req.initialClass;
info->second.gen = req.generation;
@ -1749,10 +1788,12 @@ ACTOR Future<Void> monitorProcessClasses(ClusterControllerData *self) {
newProcessClass = w.second.initialClass;
}
if (newProcessClass != w.second.processClass) {
w.second.processClass = newProcessClass;
w.second.priorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController);
if (!w.second.reply.isSet()) {
w.second.reply.send( RegisterWorkerReply(newProcessClass, w.second.isExcluded) );
w.second.reply.send( RegisterWorkerReply(w.second.processClass, w.second.priorityInfo) );
}
}
}
@ -1808,6 +1849,50 @@ ACTOR Future<Void> monitorClientTxnInfoConfigs(ClusterControllerData::DBInfo* db
}
}
ACTOR Future<Void> updatedChangingDatacenters(ClusterControllerData *self) {
//do not change the cluster controller until all the processes have had a chance to register
Void _ = wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );
loop {
self->changingDcIds.set(self->desiredDcIds.get());
if(self->changingDcIds.get().present()) {
for ( auto& it : self->id_worker ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changingDcIds.get().get() );
if ( it.first != self->clusterControllerProcessId && it.second.priorityInfo.dcFitness != fitness ) {
it.second.priorityInfo.dcFitness = fitness;
if(!it.second.reply.isSet()) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
}
}
}
}
Void _ = wait(self->desiredDcIds.onChange());
}
}
ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData *self) {
state Future<Void> changeDelay = delay(SERVER_KNOBS->CC_CHANGE_DELAY);
loop {
choose {
when( Void _ = wait(self->changingDcIds.onChange()) ) { changeDelay = delay(SERVER_KNOBS->CC_CHANGE_DELAY); }
when( Void _ = wait(changeDelay) ) {
changeDelay = Never();
self->changedDcIds.set(self->changingDcIds.get());
if(self->changedDcIds.get().present()) {
auto& worker = self->id_worker[self->clusterControllerProcessId];
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.interf.locality.dcId(), self->changedDcIds.get().get() );
if ( worker.priorityInfo.dcFitness != fitness ) {
worker.priorityInfo.dcFitness = fitness;
if(!worker.reply.isSet()) {
worker.reply.send( RegisterWorkerReply( worker.processClass, worker.priorityInfo ) );
}
}
}
}
}
}
}
ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf, Future<Void> leaderFail, ServerCoordinators coordinators ) {
state ClusterControllerData self( interf );
state Future<Void> coordinationPingDelay = delay( SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY );
@ -1823,6 +1908,8 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
addActor.send( timeKeeper(&self) );
addActor.send( monitorProcessClasses(&self) );
addActor.send( monitorClientTxnInfoConfigs(&self.db) );
addActor.send( updatedChangingDatacenters(&self) );
addActor.send( updatedChangedDatacenters(&self) );
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
loop choose {
@ -1904,14 +1991,14 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
}
}
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded ) {
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo ) {
loop {
state ClusterControllerFullInterface cci;
state bool inRole = false;
cci.initEndpoints();
try {
//Register as a possible leader; wait to be elected
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded );
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncPriorityInfo );
while (!currentCC->get().present() || currentCC->get().get() != cci) {
choose {
@ -1935,12 +2022,12 @@ ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference
}
}
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded) {
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo ) {
state bool hasConnected = false;
loop {
try {
ServerCoordinators coordinators( connFile );
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded ) );
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncPriorityInfo ) );
} catch( Error &e ) {
if( e.code() != error_code_coordinators_changed )
throw; // Expected to terminate fdbserver

@ -143,32 +143,32 @@ struct RecruitStorageRequest {
struct RegisterWorkerReply {
ProcessClass processClass;
bool isExcluded;
ClusterControllerPriorityInfo priorityInfo;
RegisterWorkerReply() {}
RegisterWorkerReply(ProcessClass processClass, bool isExcluded) : processClass(processClass), isExcluded(isExcluded) {}
RegisterWorkerReply() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
RegisterWorkerReply(ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo) : processClass(processClass), priorityInfo(priorityInfo) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & processClass & isExcluded;
ar & processClass & priorityInfo;
}
};
struct RegisterWorkerRequest {
WorkerInterface wi;
ProcessClass processClass;
ProcessClass initialClass;
bool isExcluded;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
Generation generation;
ReplyPromise<RegisterWorkerReply> reply;
RegisterWorkerRequest() {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, bool isExcluded, Generation generation) :
wi(wi), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded), generation(generation) {}
RegisterWorkerRequest() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Generation generation) :
wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & wi & initialClass & processClass & isExcluded & generation & reply;
ar & wi & initialClass & processClass & priorityInfo & generation & reply;
}
};

@ -37,7 +37,8 @@ void DatabaseConfiguration::resetInternal() {
primaryDcId = remoteDcId = Optional<Standalone<StringRef>>();
tLogPolicy = storagePolicy = remoteTLogPolicy = satelliteTLogPolicy = IRepPolicyRef();
remoteDesiredTLogCount = remoteTLogReplicationFactor = satelliteDesiredTLogCount = satelliteTLogReplicationFactor = satelliteTLogWriteAntiQuorum = satelliteTLogUsableDcs = logRouterCount = 0;
remoteDesiredTLogCount = satelliteDesiredTLogCount = desiredLogRouterCount = -1;
remoteTLogReplicationFactor = satelliteTLogReplicationFactor = satelliteTLogWriteAntiQuorum = satelliteTLogUsableDcs = 0;
primarySatelliteDcIds.clear();
remoteSatelliteDcIds.clear();
}
@ -87,17 +88,17 @@ bool DatabaseConfiguration::isValid() const {
autoDesiredTLogCount >= 1 &&
storagePolicy &&
tLogPolicy &&
remoteDesiredTLogCount >= 0 &&
getDesiredRemoteLogs() >= 1 &&
getDesiredLogRouters() >= 1 &&
remoteTLogReplicationFactor >= 0 &&
( remoteTLogReplicationFactor == 0 || ( remoteTLogPolicy && primaryDcId.present() && remoteDcId.present() && logRouterCount >= 1 && durableStorageQuorum == storageTeamSize ) ) &&
( remoteTLogReplicationFactor == 0 || ( remoteTLogPolicy && primaryDcId.present() && remoteDcId.present() && durableStorageQuorum == storageTeamSize ) ) &&
primaryDcId.present() == remoteDcId.present() &&
satelliteDesiredTLogCount >= 0 &&
getDesiredSatelliteLogs() >= 1 &&
satelliteTLogReplicationFactor >= 0 &&
satelliteTLogWriteAntiQuorum >= 0 &&
satelliteTLogUsableDcs >= 0 &&
( satelliteTLogReplicationFactor == 0 || ( satelliteTLogPolicy && primarySatelliteDcIds.size() && remoteSatelliteDcIds.size() && remoteTLogReplicationFactor > 0 ) ) &&
primarySatelliteDcIds.size() == remoteSatelliteDcIds.size() &&
logRouterCount >= 0;
primarySatelliteDcIds.size() == remoteSatelliteDcIds.size();
}
std::map<std::string, std::string> DatabaseConfiguration::toMap() const {
@ -244,7 +245,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
else if (ck == LiteralStringRef("remote_dc")) remoteDcId = value;
else if (ck == LiteralStringRef("primary_satellite_dcs")) parse(&primarySatelliteDcIds, value);
else if (ck == LiteralStringRef("remote_satellite_dcs")) parse(&remoteSatelliteDcIds, value);
else if (ck == LiteralStringRef("log_routers")) parse(&logRouterCount, value);
else if (ck == LiteralStringRef("log_routers")) parse(&desiredLogRouterCount, value);
else return false;
return true; // All of the above options currently require recovery to take effect
}

@ -83,7 +83,7 @@ struct DatabaseConfiguration {
// Remote TLogs
int32_t remoteDesiredTLogCount;
int32_t remoteTLogReplicationFactor;
int32_t logRouterCount;
int32_t desiredLogRouterCount;
IRepPolicyRef remoteTLogPolicy;
Optional<Standalone<StringRef>> remoteDcId;
@ -105,6 +105,7 @@ struct DatabaseConfiguration {
int32_t getDesiredLogs() const { if(desiredTLogCount == -1) return autoDesiredTLogCount; return desiredTLogCount; }
int32_t getDesiredSatelliteLogs() const { if(satelliteDesiredTLogCount == -1) return autoDesiredTLogCount; return satelliteDesiredTLogCount; }
int32_t getDesiredRemoteLogs() const { if(remoteDesiredTLogCount == -1) return autoDesiredTLogCount; return remoteDesiredTLogCount; }
int32_t getDesiredLogRouters() const { if(desiredLogRouterCount == -1) return autoDesiredTLogCount; return desiredLogRouterCount; }
bool operator == ( DatabaseConfiguration const& rhs ) const {
const_cast<DatabaseConfiguration*>(this)->makeConfigurationImmutable();

@ -259,6 +259,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( SIM_SHUTDOWN_TIMEOUT, 10 );
init( SHUTDOWN_TIMEOUT, 600 ); if( randomize && BUGGIFY ) SHUTDOWN_TIMEOUT = 60.0;
init( MASTER_SPIN_DELAY, 1.0 ); if( randomize && BUGGIFY ) MASTER_SPIN_DELAY = 10.0;
init( CC_CHANGE_DELAY, 0.1 );
init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 1.0 );
init( ATTEMPT_RECRUITMENT_DELAY, 0.05 );
init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0;
@ -266,6 +267,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::GoodFit );
init( EXPECTED_TLOG_FITNESS, ProcessClass::GoodFit );
init( EXPECTED_LOG_ROUTER_FITNESS, ProcessClass::GoodFit );
init( EXPECTED_PROXY_FITNESS, ProcessClass::GoodFit );
init( EXPECTED_RESOLVER_FITNESS, ProcessClass::GoodFit );
init( RECRUITMENT_TIMEOUT, 600 ); if( randomize && BUGGIFY ) RECRUITMENT_TIMEOUT = g_random->coinflip() ? 60.0 : 1.0;

@ -201,6 +201,7 @@ public:
double SIM_SHUTDOWN_TIMEOUT;
double SHUTDOWN_TIMEOUT;
double MASTER_SPIN_DELAY;
double CC_CHANGE_DELAY;
double WAIT_FOR_GOOD_RECRUITMENT_DELAY;
double ATTEMPT_RECRUITMENT_DELAY;
double WORKER_FAILURE_TIME;
@ -213,6 +214,7 @@ public:
int EXPECTED_MASTER_FITNESS;
int EXPECTED_TLOG_FITNESS;
int EXPECTED_LOG_ROUTER_FITNESS;
int EXPECTED_PROXY_FITNESS;
int EXPECTED_RESOLVER_FITNESS;
double RECRUITMENT_TIMEOUT;

@ -75,7 +75,7 @@ ACTOR Future<Void> changeLeaderCoordinators( ServerCoordinators coordinators, Va
return Void();
}
ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded ) {
ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo ) {
state Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees( new AsyncVar<vector<Optional<LeaderInfo>>>() );
state LeaderInfo myInfo;
state Future<Void> candidacies;
@ -94,7 +94,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
myInfo.changeID = g_random->randomUniqueID();
prevChangeID = myInfo.changeID;
myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() );
myInfo.updateChangeID( asyncPriorityInfo->get() );
vector<Future<Void>> cand;
for(int i=0; i<coordinators.leaderElectionServers.size(); i++)
@ -153,7 +153,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
break;
}
when (Void _ = wait(candidacies)) { ASSERT(false); }
when (Void _ = wait( asyncProcessClass->onChange() || asyncIsExcluded->onChange() )) {
when (Void _ = wait( asyncPriorityInfo->onChange() )) {
break;
}
}
@ -166,7 +166,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
loop {
prevChangeID = myInfo.changeID;
myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() );
myInfo.updateChangeID( asyncPriorityInfo->get() );
if (myInfo.changeID != prevChangeID) {
TraceEvent("ChangeLeaderChangeID").detail("PrevChangeID", prevChangeID).detail("NewChangeID", myInfo.changeID);
}

@ -32,8 +32,7 @@ Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass,
Reference<AsyncVar<bool>> const& asyncIsExcluded);
Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo );
// Participates in the given coordination group's leader election process, nominating the given
// LeaderInterface (presumed to be a local interface) as leader. The leader election process is
@ -49,18 +48,17 @@ Future<Void> changeLeaderCoordinators( ServerCoordinators const& coordinators, V
#pragma region Implementation
Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, Reference<AsyncVar<bool>> const& asyncIsExcluded );
Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected, Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo );
template <class LeaderInterface>
Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass,
Reference<AsyncVar<bool>> const& asyncIsExcluded)
Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo)
{
Reference<AsyncVar<Value>> serializedInfo( new AsyncVar<Value> );
Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass, asyncIsExcluded );
Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncPriorityInfo );
return m || asyncDeserialize( serializedInfo, outKnownLeader );
}

@ -1093,7 +1093,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(oldLogSystem->getDebugID(), oldLogSystem->locality) );
state UID recruitmentID = g_random->randomUniqueID();
logSystem->logSystemType = 2;
logSystem->minRouters = configuration.logRouterCount;
logSystem->minRouters = configuration.getDesiredLogRouters();
logSystem->expectedLogSets = 1;
logSystem->tLogs.push_back( Reference<LogSet>( new LogSet() ) );

@ -270,8 +270,8 @@ class Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, int
Future<Void> extractClusterInterface( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& a, Reference<AsyncVar<Optional<struct ClusterInterface>>> const& b );
Future<Void> fdbd( Reference<ClusterConnectionFile> const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, ProcessClass const& initialClass, Reference<AsyncVar<bool>> const& asyncIsExcluded, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, Reference<AsyncVar<bool>> const& asyncIsExcluded );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo, ProcessClass const& initialClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo );
// These servers are started by workerServer
Future<Void> storageServer(

@ -228,14 +228,14 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
Standalone<StringRef> const& dbId,
PromiseStream<Future<Void>> const& addActor
)
: dbgid( myInterface.id() ),
: dbgid(myInterface.id()),
myInterface(myInterface),
dbInfo(dbInfo),
cstate(coordinators, addActor, dbgid),
coordinators(coordinators),
clusterController(clusterController),
dbName( dbName ),
dbId( dbId ),
dbName(dbName),
dbId(dbId),
lastEpochEnd(invalidVersion),
recoveryTransactionVersion(invalidVersion),
lastCommitTime(0),

@ -250,17 +250,17 @@ std::vector< DiskStore > getDiskStores( std::string folder ) {
return result;
}
ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, Reference<AsyncVar<bool>> asyncIsExcluded) {
ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, ProcessClass initialClass) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register)
state Generation requestGeneration = 0;
state ProcessClass processClass = initialClass;
loop {
Future<RegisterWorkerReply> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), asyncIsExcluded->get(), requestGeneration++) ) ) : Never();
Future<RegisterWorkerReply> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, processClass, asyncPriorityInfo->get(), requestGeneration++) ) ) : Never();
choose {
when ( RegisterWorkerReply reply = wait( registrationReply )) {
asyncProcessClass->set( reply.processClass );
asyncIsExcluded->set( reply.isExcluded );
processClass = reply.processClass;
asyncPriorityInfo->set( reply.priorityInfo );
}
when ( Void _ = wait( ccInterface->onChange() )) { }
}
@ -480,7 +480,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
}
ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities,
Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, Reference<AsyncVar<bool>> asyncIsExcluded, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
state PromiseStream< ErrorInfo > errors;
state Future<Void> handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last
state ActorCollection errorForwarders(false);
@ -644,7 +644,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
startRole( interf.id(), interf.id(), "Worker", details );
Void _ = wait(waitForAll(recoveries));
errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass, asyncIsExcluded ) );
errorForwarders.add( registrationClient( ccInterface, interf, asyncPriorityInfo, initialClass ) );
TraceEvent("RecoveriesComplete", interf.id());
@ -978,15 +978,15 @@ ACTOR Future<Void> fdbd(
// SOMEDAY: start the services on the machine in a staggered fashion in simulation?
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc( new AsyncVar<Optional<ClusterControllerFullInterface>> );
Reference<AsyncVar<Optional<ClusterInterface>>> ci( new AsyncVar<Optional<ClusterInterface>> );
Reference<AsyncVar<ProcessClass>> asyncProcessClass(new AsyncVar<ProcessClass>(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource)));
Reference<AsyncVar<bool>> asyncIsExcluded(new AsyncVar<bool>(false));
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo(new AsyncVar<ClusterControllerPriorityInfo>(
ClusterControllerPriorityInfo(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource).machineClassFitness(ProcessClass::ClusterController), false, ClusterControllerPriorityInfo::FitnessUnknown)));
vector<Future<Void>> v;
if ( coordFolder.size() )
v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass, asyncIsExcluded), "clusterController") );
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncPriorityInfo ), "clusterController") );
v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") );
v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, asyncIsExcluded, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncPriorityInfo, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
Void _ = wait( quorum(v,1) );