Change excluded cluster controller

This commit is contained in:
Yichi Chiang 2017-11-14 13:57:37 -08:00
parent d174e05bac
commit df922bc973
9 changed files with 138 additions and 72 deletions

View File

@ -93,6 +93,7 @@ private:
struct LeaderInfo { struct LeaderInfo {
UID changeID; UID changeID;
uint64_t mask = ~(15ll << 60);
Value serializedInfo; Value serializedInfo;
bool forward; // If true, serializedInfo is a connection string instead! bool forward; // If true, serializedInfo is a connection string instead!
@ -103,32 +104,24 @@ struct LeaderInfo {
bool operator == (LeaderInfo const& r) const { return changeID == r.changeID; } bool operator == (LeaderInfo const& r) const { return changeID == r.changeID; }
// The first 4 bits of ChangeID represent cluster controller process class fitness, the lower the better // The first 4 bits of ChangeID represent cluster controller process class fitness, the lower the better
bool updateChangeID(uint64_t processClassFitness) { void updateChangeID(uint64_t processClassFitness, bool isExcluded) {
uint64_t mask = 15ll << 60; changeID = UID( ( (uint64_t)isExcluded << 63) | (processClassFitness << 60) | (changeID.first() & mask ), changeID.second() );
processClassFitness <<= 60;
if ((changeID.first() & mask) == processClassFitness) {
return false;
} }
changeID = UID((changeID.first() & ~mask) | processClassFitness, changeID.second()); // All but the first 4 bits are used to represent process id
return true; bool equalInternalId(LeaderInfo const& leaderInfo) const {
} if ( (changeID.first() & mask) == (leaderInfo.changeID.first() & mask) && changeID.second() == leaderInfo.changeID.second() ) {
// Change leader only if the candidate has better process class fitness
bool leaderChangeRequired(LeaderInfo const& candidate) const {
uint64_t mask = 15ll << 60;
if ((changeID.first() & mask) > (candidate.changeID.first() & mask)) {
return true; return true;
} else { } else {
return false; return false;
} }
} }
// All but the first 4 bits are used to represent process id // Change leader only if
bool equalInternalId(LeaderInfo const& leaderInfo) const { // 1. the candidate has better process class fitness and the candidate is not the leader
uint64_t mask = ~(15ll << 60); // 2. the leader process class fitness become worse
if ((changeID.first() & mask) == (leaderInfo.changeID.first() & mask)) { bool leaderChangeRequired(LeaderInfo const& candidate) const {
if ( ((changeID.first() & ~mask) > (candidate.changeID.first() & ~mask) && !equalInternalId(candidate)) || ((changeID.first() & ~mask) < (candidate.changeID.first() & ~mask) && equalInternalId(candidate)) ) {
return true; return true;
} else { } else {
return false; return false;

View File

@ -43,19 +43,20 @@ void failAfter( Future<Void> trigger, Endpoint e );
struct WorkerInfo : NonCopyable { struct WorkerInfo : NonCopyable {
Future<Void> watcher; Future<Void> watcher;
ReplyPromise<ProcessClass> reply; ReplyPromise<RegisterWorkerReply> reply;
Generation gen; Generation gen;
int reboots; int reboots;
WorkerInterface interf; WorkerInterface interf;
ProcessClass initialClass; ProcessClass initialClass;
ProcessClass processClass; ProcessClass processClass;
bool isExcluded;
WorkerInfo() : gen(-1), reboots(0) {} WorkerInfo() : gen(-1), reboots(0) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<ProcessClass> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass ) : WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, bool isExcluded ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass) {} watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded) {}
WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen), WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass) {} reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass), isExcluded(r.isExcluded) {}
void operator=( WorkerInfo&& r ) noexcept(true) { void operator=( WorkerInfo&& r ) noexcept(true) {
watcher = std::move(r.watcher); watcher = std::move(r.watcher);
reply = std::move(r.reply); reply = std::move(r.reply);
@ -64,6 +65,7 @@ struct WorkerInfo : NonCopyable {
interf = std::move(r.interf); interf = std::move(r.interf);
initialClass = r.initialClass; initialClass = r.initialClass;
processClass = r.processClass; processClass = r.processClass;
isExcluded = r.isExcluded;
} }
}; };
@ -791,6 +793,7 @@ public:
std::map< Optional<Standalone<StringRef>>, ProcessClass > id_class; //contains the mapping from process id to process class from the database std::map< Optional<Standalone<StringRef>>, ProcessClass > id_class; //contains the mapping from process id to process class from the database
Standalone<RangeResultRef> lastProcessClasses; Standalone<RangeResultRef> lastProcessClasses;
bool gotProcessClasses; bool gotProcessClasses;
bool gotConfiguration;
Optional<Standalone<StringRef>> masterProcessId; Optional<Standalone<StringRef>> masterProcessId;
Optional<Standalone<StringRef>> clusterControllerProcessId; Optional<Standalone<StringRef>> clusterControllerProcessId;
UID id; UID id;
@ -805,7 +808,7 @@ public:
double startTime; double startTime;
explicit ClusterControllerData( ClusterControllerFullInterface ccInterface ) explicit ClusterControllerData( ClusterControllerFullInterface ccInterface )
: id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), startTime(now()) : id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), gotConfiguration(false), startTime(now())
{ {
auto serverInfo = db.serverInfo->get(); auto serverInfo = db.serverInfo->get();
serverInfo.id = g_random->randomUniqueID(); serverInfo.id = g_random->randomUniqueID();
@ -1084,7 +1087,7 @@ ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass
when( Void _ = wait( failed ) ) { // remove workers that have failed when( Void _ = wait( failed ) ) { // remove workers that have failed
WorkerInfo& failedWorkerInfo = cluster->id_worker[ worker.locality.processId() ]; WorkerInfo& failedWorkerInfo = cluster->id_worker[ worker.locality.processId() ];
if (!failedWorkerInfo.reply.isSet()) { if (!failedWorkerInfo.reply.isSet()) {
failedWorkerInfo.reply.send( failedWorkerInfo.processClass ); failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.processClass, failedWorkerInfo.isExcluded) );
} }
cluster->id_worker.erase( worker.locality.processId() ); cluster->id_worker.erase( worker.locality.processId() );
cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() ); cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() );
@ -1295,7 +1298,17 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
} }
db->masterRegistrationCount = req.registrationCount; db->masterRegistrationCount = req.registrationCount;
if(req.configuration.present()) db->config = req.configuration.get(); if ( req.configuration.present() ) {
db->config = req.configuration.get();
self->gotConfiguration = true;
for ( auto& it : self->id_worker ) {
bool isExcludedFromConfig = db->config.isExcludedServer(it.second.interf.address());
if ( it.second.isExcluded != isExcludedFromConfig && !it.second.reply.isSet() ) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, isExcludedFromConfig) );
}
}
}
bool isChanged = false; bool isChanged = false;
auto dbInfo = self->db.serverInfo->get(); auto dbInfo = self->db.serverInfo->get();
@ -1348,6 +1361,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
WorkerInterface w = req.wi; WorkerInterface w = req.wi;
ProcessClass newProcessClass = req.processClass; ProcessClass newProcessClass = req.processClass;
bool newIsExcluded = req.isExcluded;
auto info = self->id_worker.find( w.locality.processId() ); auto info = self->id_worker.find( w.locality.processId() );
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString()).detail("Workers", self->id_worker.size()).detail("Registered", (info == self->id_worker.end() ? "False" : "True")).backtrace(); TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString()).detail("Workers", self->id_worker.size()).detail("Registered", (info == self->id_worker.end() ? "False" : "True")).backtrace();
@ -1356,8 +1370,9 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
self->clusterControllerProcessId = w.locality.processId(); self->clusterControllerProcessId = w.locality.processId();
} }
// Check process class if needed // Check process class and exclusive property
if (self->gotProcessClasses && (info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen)) { if ( info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen ) {
if ( self->gotProcessClasses ) {
auto classIter = self->id_class.find(w.locality.processId()); auto classIter = self->id_class.find(w.locality.processId());
if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.initialClass.classType() == ProcessClass::UnsetClass)) { if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.initialClass.classType() == ProcessClass::UnsetClass)) {
@ -1365,30 +1380,33 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
} else { } else {
newProcessClass = req.initialClass; newProcessClass = req.initialClass;
} }
}
// Notify the worker to register again with new process class if ( self->gotConfiguration ) {
if (newProcessClass != req.processClass && !req.reply.isSet()) { newIsExcluded = self->db.config.isExcludedServer(w.address());
req.reply.send( newProcessClass ); }
// Notify the worker to register again with new process class/exclusive property
if ( !req.reply.isSet() && ( newProcessClass != req.processClass || newIsExcluded != req.isExcluded ) ) {
req.reply.send( RegisterWorkerReply(newProcessClass, newIsExcluded) );
} }
} }
if( info == self->id_worker.end() ) { if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass ); self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, req.isExcluded );
checkOutstandingRequests( self ); checkOutstandingRequests( self );
return; return;
} }
if( info->second.interf.id() != w.id() || req.generation >= info->second.gen ) { if( info->second.interf.id() != w.id() || req.generation >= info->second.gen ) {
if (info->second.processClass != newProcessClass) {
info->second.processClass = newProcessClass;
}
info->second.initialClass = req.initialClass;
if (!info->second.reply.isSet()) { if (!info->second.reply.isSet()) {
info->second.reply.send( Never() ); info->second.reply.send( Never() );
} }
info->second.reply = req.reply; info->second.reply = req.reply;
info->second.processClass = newProcessClass;
info->second.isExcluded = req.isExcluded;
info->second.initialClass = req.initialClass;
info->second.gen = req.generation; info->second.gen = req.generation;
if(info->second.interf.id() != w.id()) { if(info->second.interf.id() != w.id()) {
@ -1592,7 +1610,7 @@ ACTOR Future<Void> monitorProcessClasses(ClusterControllerData *self) {
if (newProcessClass != w.second.processClass) { if (newProcessClass != w.second.processClass) {
w.second.processClass = newProcessClass; w.second.processClass = newProcessClass;
if (!w.second.reply.isSet()) { if (!w.second.reply.isSet()) {
w.second.reply.send( newProcessClass ); w.second.reply.send( RegisterWorkerReply(newProcessClass, w.second.isExcluded) );
} }
} }
} }
@ -1741,14 +1759,14 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
} }
} }
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass ) { ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded ) {
loop { loop {
state ClusterControllerFullInterface cci; state ClusterControllerFullInterface cci;
state bool inRole = false; state bool inRole = false;
cci.initEndpoints(); cci.initEndpoints();
try { try {
//Register as a possible leader; wait to be elected //Register as a possible leader; wait to be elected
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass ); state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded );
while (!currentCC->get().present() || currentCC->get().get() != cci) { while (!currentCC->get().present() || currentCC->get().get() != cci) {
choose { choose {
@ -1772,12 +1790,12 @@ ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference
} }
} }
ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ProcessClass>> asyncProcessClass) { ACTOR Future<Void> clusterController( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded) {
state bool hasConnected = false; state bool hasConnected = false;
loop { loop {
try { try {
ServerCoordinators coordinators( connFile ); ServerCoordinators coordinators( connFile );
Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass ) ); Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded ) );
} catch( Error &e ) { } catch( Error &e ) {
if( e.code() != error_code_coordinators_changed ) if( e.code() != error_code_coordinators_changed )
throw; // Expected to terminate fdbserver throw; // Expected to terminate fdbserver

View File

@ -112,20 +112,34 @@ struct RecruitStorageRequest {
} }
}; };
struct RegisterWorkerReply {
ProcessClass processClass;
bool isExcluded;
RegisterWorkerReply() {}
RegisterWorkerReply(ProcessClass processClass, bool isExcluded) : processClass(processClass), isExcluded(isExcluded) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & processClass & isExcluded;
}
};
struct RegisterWorkerRequest { struct RegisterWorkerRequest {
WorkerInterface wi; WorkerInterface wi;
ProcessClass processClass; ProcessClass processClass;
ProcessClass initialClass; ProcessClass initialClass;
bool isExcluded;
Generation generation; Generation generation;
ReplyPromise<ProcessClass> reply; ReplyPromise<RegisterWorkerReply> reply;
RegisterWorkerRequest() {} RegisterWorkerRequest() {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, Generation generation) : RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, bool isExcluded, Generation generation) :
wi(wi), initialClass(initialClass), processClass(processClass), generation(generation) {} wi(wi), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded), generation(generation) {}
template <class Ar> template <class Ar>
void serialize( Ar& ar ) { void serialize( Ar& ar ) {
ar & wi & initialClass & processClass & generation & reply; ar & wi & initialClass & processClass & isExcluded & generation & reply;
} }
}; };

View File

@ -250,7 +250,7 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
} else { } else {
Optional<LeaderInfo> nextNominee; Optional<LeaderInfo> nextNominee;
if (availableLeaders.size() && availableCandidates.size()) { if (availableLeaders.size() && availableCandidates.size()) {
nextNominee = (*availableLeaders.begin()).leaderChangeRequired(*availableCandidates.begin()) ? *availableCandidates.begin() : *availableLeaders.begin(); nextNominee = ( *availableLeaders.begin() < *availableCandidates.begin() ) ? *availableLeaders.begin() : *availableCandidates.begin();
} else if (availableLeaders.size()) { } else if (availableLeaders.size()) {
nextNominee = *availableLeaders.begin(); nextNominee = *availableLeaders.begin();
} else if (availableCandidates.size()) { } else if (availableCandidates.size()) {
@ -259,13 +259,16 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
nextNominee = Optional<LeaderInfo>(); nextNominee = Optional<LeaderInfo>();
} }
if ( currentNominee.present() != nextNominee.present() || (currentNominee.present() && !currentNominee.get().equalInternalId(nextNominee.get())) || !availableLeaders.size() ) { if ( currentNominee.present() != nextNominee.present() || (currentNominee.present() && currentNominee.get().leaderChangeRequired(nextNominee.get())) || !availableLeaders.size() ) {
TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID()) TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID())
.detail("Changed", nextNominee != currentNominee).detail("Key", printable(key)); .detail("Changed", nextNominee != currentNominee).detail("Key", printable(key));
for(int i=0; i<notify.size(); i++) for(int i=0; i<notify.size(); i++)
notify[i].send( nextNominee ); notify[i].send( nextNominee );
notify.clear(); notify.clear();
currentNominee = nextNominee; currentNominee = nextNominee;
} else if (currentNominee.present() && nextNominee.present() && currentNominee.get().equalInternalId(nextNominee.get())) {
// leader becomes better
currentNominee = nextNominee;
} }
if( availableLeaders.size() ) { if( availableLeaders.size() ) {

View File

@ -75,7 +75,7 @@ ACTOR Future<Void> changeLeaderCoordinators( ServerCoordinators coordinators, Va
return Void(); return Void();
} }
ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass ) { ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ProcessClass>> asyncProcessClass, Reference<AsyncVar<bool>> asyncIsExcluded ) {
state Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees( new AsyncVar<vector<Optional<LeaderInfo>>>() ); state Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees( new AsyncVar<vector<Optional<LeaderInfo>>>() );
state LeaderInfo myInfo; state LeaderInfo myInfo;
state Future<Void> candidacies; state Future<Void> candidacies;
@ -94,7 +94,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
myInfo.changeID = g_random->randomUniqueID(); myInfo.changeID = g_random->randomUniqueID();
prevChangeID = myInfo.changeID; prevChangeID = myInfo.changeID;
myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController)); myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() );
vector<Future<Void>> cand; vector<Future<Void>> cand;
for(int i=0; i<coordinators.leaderElectionServers.size(); i++) for(int i=0; i<coordinators.leaderElectionServers.size(); i++)
@ -153,7 +153,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
break; break;
} }
when (Void _ = wait(candidacies)) { ASSERT(false); } when (Void _ = wait(candidacies)) { ASSERT(false); }
when (Void _ = wait( asyncProcessClass->onChange() )) { when (Void _ = wait( asyncProcessClass->onChange() || asyncIsExcluded->onChange() )) {
break; break;
} }
} }
@ -166,7 +166,8 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
loop { loop {
prevChangeID = myInfo.changeID; prevChangeID = myInfo.changeID;
if (myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController))) { myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() );
if (myInfo.changeID != prevChangeID) {
TraceEvent("ChangeLeaderChangeID").detail("PrevChangeID", prevChangeID).detail("NewChangeID", myInfo.changeID); TraceEvent("ChangeLeaderChangeID").detail("PrevChangeID", prevChangeID).detail("NewChangeID", myInfo.changeID);
} }

View File

@ -32,7 +32,8 @@ Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface, LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader, Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected, bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass); Reference<AsyncVar<ProcessClass>> const& asyncProcessClass,
Reference<AsyncVar<bool>> const& asyncIsExcluded);
// Participates in the given coordination group's leader election process, nominating the given // Participates in the given coordination group's leader election process, nominating the given
// LeaderInterface (presumed to be a local interface) as leader. The leader election process is // LeaderInterface (presumed to be a local interface) as leader. The leader election process is
@ -48,17 +49,18 @@ Future<Void> changeLeaderCoordinators( ServerCoordinators const& coordinators, V
#pragma region Implementation #pragma region Implementation
Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass ); Future<Void> tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference<AsyncVar<Value>> const& outSerializedLeader, bool const& hasConnected, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, Reference<AsyncVar<bool>> const& asyncIsExcluded );
template <class LeaderInterface> template <class LeaderInterface>
Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators, Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
LeaderInterface const& proposedInterface, LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader, Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected, bool hasConnected,
Reference<AsyncVar<ProcessClass>> const& asyncProcessClass) Reference<AsyncVar<ProcessClass>> const& asyncProcessClass,
Reference<AsyncVar<bool>> const& asyncIsExcluded)
{ {
Reference<AsyncVar<Value>> serializedInfo( new AsyncVar<Value> ); Reference<AsyncVar<Value>> serializedInfo( new AsyncVar<Value> );
Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass ); Future<Void> m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass, asyncIsExcluded );
return m || asyncDeserialize( serializedInfo, outKnownLeader ); return m || asyncDeserialize( serializedInfo, outKnownLeader );
} }

View File

@ -256,8 +256,8 @@ class Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, int
Future<Void> extractClusterInterface( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& a, Reference<AsyncVar<Optional<struct ClusterInterface>>> const& b ); Future<Void> extractClusterInterface( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& a, Reference<AsyncVar<Optional<struct ClusterInterface>>> const& b );
Future<Void> fdbd( Reference<ClusterConnectionFile> const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix ); Future<Void> fdbd( Reference<ClusterConnectionFile> const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, ProcessClass const& initialClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix ); Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, ProcessClass const& initialClass, Reference<AsyncVar<bool>> const& asyncIsExcluded, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass ); Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, Reference<AsyncVar<bool>> const& asyncIsExcluded );
// These servers are started by workerServer // These servers are started by workerServer
Future<Void> storageServer( Future<Void> storageServer(

View File

@ -954,6 +954,37 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
} }
} }
ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) {
state Database cx = openDBOnServer(self->dbInfo, TaskDefaultEndpoint, true, true);
loop {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Future<Standalone<RangeResultRef>> fresults = tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY );
Void _ = wait( success(fresults) );
Standalone<RangeResultRef> results = fresults.get();
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
DatabaseConfiguration conf;
conf.fromKeyValues((VectorRef<KeyValueRef>) results);
if(conf != self->configuration) {
self->configuration = conf;
self->registrationTrigger.trigger();
}
state Future<Void> watchFuture = tr.watch(excludedServersVersionKey);
Void _ = wait(tr.commit());
Void _ = wait(watchFuture);
break;
} catch (Error& e) {
Void _ = wait( tr.onError(e) );
}
}
}
}
ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<Void>> addActor ) ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<Void>> addActor )
{ {
state TraceInterval recoveryInterval("MasterRecovery"); state TraceInterval recoveryInterval("MasterRecovery");
@ -1073,6 +1104,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
state Future<Void> resolverFailure = waitResolverFailure( self->resolvers ); state Future<Void> resolverFailure = waitResolverFailure( self->resolvers );
state Future<Void> proxyFailure = waitProxyFailure( self->proxies ); state Future<Void> proxyFailure = waitProxyFailure( self->proxies );
state Future<Void> providingVersions = provideVersions(self); state Future<Void> providingVersions = provideVersions(self);
state Future<Void> configMonitor = configurationMonitor( self );
addActor.send( reportErrors(updateRegistration(self, self->logSystem), "updateRegistration", self->dbgid) ); addActor.send( reportErrors(updateRegistration(self, self->logSystem), "updateRegistration", self->dbgid) );
self->registrationTrigger.trigger(); self->registrationTrigger.trigger();
@ -1157,6 +1189,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
when( Void _ = wait( proxyFailure ) ) { throw internal_error(); } when( Void _ = wait( proxyFailure ) ) { throw internal_error(); }
when( Void _ = wait( resolverFailure ) ) { throw internal_error(); } when( Void _ = wait( resolverFailure ) ) { throw internal_error(); }
when( Void _ = wait( providingVersions ) ) { throw internal_error(); } when( Void _ = wait( providingVersions ) ) { throw internal_error(); }
when( Void _ = wait( configMonitor ) ) { throw internal_error(); }
} }
} }

View File

@ -250,16 +250,17 @@ std::vector< DiskStore > getDiskStores( std::string folder ) {
return result; return result;
} }
ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass ) { ACTOR Future<Void> registrationClient( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, WorkerInterface interf, Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, Reference<AsyncVar<bool>> asyncIsExcluded) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists // Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register) // The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register)
state Generation requestGeneration = 0; state Generation requestGeneration = 0;
loop { loop {
Future<ProcessClass> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), requestGeneration++) ) ) : Never(); Future<RegisterWorkerReply> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), asyncIsExcluded->get(), requestGeneration++) ) ) : Never();
choose { choose {
when ( ProcessClass newProcessClass = wait( registrationReply )) { when ( RegisterWorkerReply reply = wait( registrationReply )) {
asyncProcessClass->set(newProcessClass); asyncProcessClass->set( reply.processClass );
asyncIsExcluded->set( reply.isExcluded );
} }
when ( Void _ = wait( ccInterface->onChange() )) { } when ( Void _ = wait( ccInterface->onChange() )) { }
} }
@ -494,7 +495,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
} }
ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities, ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, LocalityData localities,
Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) { Reference<AsyncVar<ProcessClass>> asyncProcessClass, ProcessClass initialClass, Reference<AsyncVar<bool>> asyncIsExcluded, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) {
state PromiseStream< ErrorInfo > errors; state PromiseStream< ErrorInfo > errors;
state Future<Void> handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last state Future<Void> handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last
state ActorCollection errorForwarders(false); state ActorCollection errorForwarders(false);
@ -657,7 +658,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
startRole( interf.id(), interf.id(), "Worker", details ); startRole( interf.id(), interf.id(), "Worker", details );
Void _ = wait(waitForAll(recoveries)); Void _ = wait(waitForAll(recoveries));
errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass ) ); errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass, asyncIsExcluded ) );
TraceEvent("RecoveriesComplete", interf.id()); TraceEvent("RecoveriesComplete", interf.id());
@ -972,13 +973,14 @@ ACTOR Future<Void> fdbd(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc( new AsyncVar<Optional<ClusterControllerFullInterface>> ); Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc( new AsyncVar<Optional<ClusterControllerFullInterface>> );
Reference<AsyncVar<Optional<ClusterInterface>>> ci( new AsyncVar<Optional<ClusterInterface>> ); Reference<AsyncVar<Optional<ClusterInterface>>> ci( new AsyncVar<Optional<ClusterInterface>> );
Reference<AsyncVar<ProcessClass>> asyncProcessClass(new AsyncVar<ProcessClass>(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource))); Reference<AsyncVar<ProcessClass>> asyncProcessClass(new AsyncVar<ProcessClass>(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource)));
Reference<AsyncVar<bool>> asyncIsExcluded(new AsyncVar<bool>(false));
vector<Future<Void>> v; vector<Future<Void>> v;
if ( coordFolder.size() ) if ( coordFolder.size() )
v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files
v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass), "clusterController") ); v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass, asyncIsExcluded), "clusterController") );
v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") ); v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") );
v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") ); v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") );
v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) ); v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, asyncIsExcluded, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) );
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" ); state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
Void _ = wait( quorum(v,1) ); Void _ = wait( quorum(v,1) );