diff --git a/fdbclient/CoordinationInterface.h b/fdbclient/CoordinationInterface.h index 8be0ce9e89..84f29aeca9 100644 --- a/fdbclient/CoordinationInterface.h +++ b/fdbclient/CoordinationInterface.h @@ -93,6 +93,7 @@ private: struct LeaderInfo { UID changeID; + uint64_t mask = ~(15ll << 60); Value serializedInfo; bool forward; // If true, serializedInfo is a connection string instead! @@ -103,32 +104,24 @@ struct LeaderInfo { bool operator == (LeaderInfo const& r) const { return changeID == r.changeID; } // The first 4 bits of ChangeID represent cluster controller process class fitness, the lower the better - bool updateChangeID(uint64_t processClassFitness) { - uint64_t mask = 15ll << 60; - processClassFitness <<= 60; - - if ((changeID.first() & mask) == processClassFitness) { - return false; - } - - changeID = UID((changeID.first() & ~mask) | processClassFitness, changeID.second()); - return true; + void updateChangeID(uint64_t processClassFitness, bool isExcluded) { + changeID = UID( ( (uint64_t)isExcluded << 63) | (processClassFitness << 60) | (changeID.first() & mask ), changeID.second() ); } - // Change leader only if the candidate has better process class fitness - bool leaderChangeRequired(LeaderInfo const& candidate) const { - uint64_t mask = 15ll << 60; - if ((changeID.first() & mask) > (candidate.changeID.first() & mask)) { + // All but the first 4 bits are used to represent process id + bool equalInternalId(LeaderInfo const& leaderInfo) const { + if ( (changeID.first() & mask) == (leaderInfo.changeID.first() & mask) && changeID.second() == leaderInfo.changeID.second() ) { return true; } else { return false; } } - // All but the first 4 bits are used to represent process id - bool equalInternalId(LeaderInfo const& leaderInfo) const { - uint64_t mask = ~(15ll << 60); - if ((changeID.first() & mask) == (leaderInfo.changeID.first() & mask)) { + // Change leader only if + // 1. the candidate has better process class fitness and the candidate is not the leader + // 2. the leader process class fitness become worse + bool leaderChangeRequired(LeaderInfo const& candidate) const { + if ( ((changeID.first() & ~mask) > (candidate.changeID.first() & ~mask) && !equalInternalId(candidate)) || ((changeID.first() & ~mask) < (candidate.changeID.first() & ~mask) && equalInternalId(candidate)) ) { return true; } else { return false; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index e6f326437e..9face5eb9a 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -43,19 +43,20 @@ void failAfter( Future trigger, Endpoint e ); struct WorkerInfo : NonCopyable { Future watcher; - ReplyPromise reply; + ReplyPromise reply; Generation gen; int reboots; WorkerInterface interf; ProcessClass initialClass; ProcessClass processClass; + bool isExcluded; WorkerInfo() : gen(-1), reboots(0) {} - WorkerInfo( Future watcher, ReplyPromise reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass ) : - watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass) {} + WorkerInfo( Future watcher, ReplyPromise reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, bool isExcluded ) : + watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded) {} WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen), - reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass) {} + reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass), isExcluded(r.isExcluded) {} void operator=( WorkerInfo&& r ) noexcept(true) { watcher = std::move(r.watcher); reply = std::move(r.reply); @@ -64,6 +65,7 @@ struct WorkerInfo : NonCopyable { interf = std::move(r.interf); initialClass = r.initialClass; processClass = r.processClass; + isExcluded = r.isExcluded; } }; @@ -791,6 +793,7 @@ public: std::map< Optional>, ProcessClass > id_class; //contains the mapping from process id to process class from the database Standalone lastProcessClasses; bool gotProcessClasses; + bool gotConfiguration; Optional> masterProcessId; Optional> clusterControllerProcessId; UID id; @@ -805,7 +808,7 @@ public: double startTime; explicit ClusterControllerData( ClusterControllerFullInterface ccInterface ) - : id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), startTime(now()) + : id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), gotConfiguration(false), startTime(now()) { auto serverInfo = db.serverInfo->get(); serverInfo.id = g_random->randomUniqueID(); @@ -1084,7 +1087,7 @@ ACTOR Future workerAvailabilityWatch( WorkerInterface worker, ProcessClass when( Void _ = wait( failed ) ) { // remove workers that have failed WorkerInfo& failedWorkerInfo = cluster->id_worker[ worker.locality.processId() ]; if (!failedWorkerInfo.reply.isSet()) { - failedWorkerInfo.reply.send( failedWorkerInfo.processClass ); + failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.processClass, failedWorkerInfo.isExcluded) ); } cluster->id_worker.erase( worker.locality.processId() ); cluster->updateWorkerList.set( worker.locality.processId(), Optional() ); @@ -1295,7 +1298,17 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c } db->masterRegistrationCount = req.registrationCount; - if(req.configuration.present()) db->config = req.configuration.get(); + if ( req.configuration.present() ) { + db->config = req.configuration.get(); + self->gotConfiguration = true; + + for ( auto& it : self->id_worker ) { + bool isExcludedFromConfig = db->config.isExcludedServer(it.second.interf.address()); + if ( it.second.isExcluded != isExcludedFromConfig && !it.second.reply.isSet() ) { + it.second.reply.send( RegisterWorkerReply( it.second.processClass, isExcludedFromConfig) ); + } + } + } bool isChanged = false; auto dbInfo = self->db.serverInfo->get(); @@ -1348,6 +1361,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { WorkerInterface w = req.wi; ProcessClass newProcessClass = req.processClass; + bool newIsExcluded = req.isExcluded; auto info = self->id_worker.find( w.locality.processId() ); TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString()).detail("Workers", self->id_worker.size()).detail("Registered", (info == self->id_worker.end() ? "False" : "True")).backtrace(); @@ -1356,39 +1370,43 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { self->clusterControllerProcessId = w.locality.processId(); } - // Check process class if needed - if (self->gotProcessClasses && (info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen)) { - auto classIter = self->id_class.find(w.locality.processId()); - - if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.initialClass.classType() == ProcessClass::UnsetClass)) { - newProcessClass = classIter->second; - } else { - newProcessClass = req.initialClass; + // Check process class and exclusive property + if ( info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen ) { + if ( self->gotProcessClasses ) { + auto classIter = self->id_class.find(w.locality.processId()); + + if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.initialClass.classType() == ProcessClass::UnsetClass)) { + newProcessClass = classIter->second; + } else { + newProcessClass = req.initialClass; + } } - // Notify the worker to register again with new process class - if (newProcessClass != req.processClass && !req.reply.isSet()) { - req.reply.send( newProcessClass ); + if ( self->gotConfiguration ) { + newIsExcluded = self->db.config.isExcludedServer(w.address()); + } + + // Notify the worker to register again with new process class/exclusive property + if ( !req.reply.isSet() && ( newProcessClass != req.processClass || newIsExcluded != req.isExcluded ) ) { + req.reply.send( RegisterWorkerReply(newProcessClass, newIsExcluded) ); } } if( info == self->id_worker.end() ) { - self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass ); + self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, req.isExcluded ); checkOutstandingRequests( self ); return; } if( info->second.interf.id() != w.id() || req.generation >= info->second.gen ) { - if (info->second.processClass != newProcessClass) { - info->second.processClass = newProcessClass; - } - - info->second.initialClass = req.initialClass; if (!info->second.reply.isSet()) { info->second.reply.send( Never() ); } info->second.reply = req.reply; + info->second.processClass = newProcessClass; + info->second.isExcluded = req.isExcluded; + info->second.initialClass = req.initialClass; info->second.gen = req.generation; if(info->second.interf.id() != w.id()) { @@ -1592,7 +1610,7 @@ ACTOR Future monitorProcessClasses(ClusterControllerData *self) { if (newProcessClass != w.second.processClass) { w.second.processClass = newProcessClass; if (!w.second.reply.isSet()) { - w.second.reply.send( newProcessClass ); + w.second.reply.send( RegisterWorkerReply(newProcessClass, w.second.isExcluded) ); } } } @@ -1741,14 +1759,14 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, } } -ACTOR Future clusterController( ServerCoordinators coordinators, Reference>> currentCC, bool hasConnected, Reference> asyncProcessClass ) { +ACTOR Future clusterController( ServerCoordinators coordinators, Reference>> currentCC, bool hasConnected, Reference> asyncProcessClass, Reference> asyncIsExcluded ) { loop { state ClusterControllerFullInterface cci; state bool inRole = false; cci.initEndpoints(); try { //Register as a possible leader; wait to be elected - state Future leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass ); + state Future leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded ); while (!currentCC->get().present() || currentCC->get().get() != cci) { choose { @@ -1772,12 +1790,12 @@ ACTOR Future clusterController( ServerCoordinators coordinators, Reference } } -ACTOR Future clusterController( Reference connFile, Reference>> currentCC, Reference> asyncProcessClass) { +ACTOR Future clusterController( Reference connFile, Reference>> currentCC, Reference> asyncProcessClass, Reference> asyncIsExcluded) { state bool hasConnected = false; loop { try { ServerCoordinators coordinators( connFile ); - Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass ) ); + Void _ = wait( clusterController( coordinators, currentCC, hasConnected, asyncProcessClass, asyncIsExcluded ) ); } catch( Error &e ) { if( e.code() != error_code_coordinators_changed ) throw; // Expected to terminate fdbserver diff --git a/fdbserver/ClusterRecruitmentInterface.h b/fdbserver/ClusterRecruitmentInterface.h index 18d8e01c5e..d4f4458b4e 100644 --- a/fdbserver/ClusterRecruitmentInterface.h +++ b/fdbserver/ClusterRecruitmentInterface.h @@ -112,20 +112,34 @@ struct RecruitStorageRequest { } }; +struct RegisterWorkerReply { + ProcessClass processClass; + bool isExcluded; + + RegisterWorkerReply() {} + RegisterWorkerReply(ProcessClass processClass, bool isExcluded) : processClass(processClass), isExcluded(isExcluded) {} + + template + void serialize( Ar& ar ) { + ar & processClass & isExcluded; + } +}; + struct RegisterWorkerRequest { WorkerInterface wi; ProcessClass processClass; ProcessClass initialClass; + bool isExcluded; Generation generation; - ReplyPromise reply; + ReplyPromise reply; RegisterWorkerRequest() {} - RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, Generation generation) : - wi(wi), initialClass(initialClass), processClass(processClass), generation(generation) {} + RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, bool isExcluded, Generation generation) : + wi(wi), initialClass(initialClass), processClass(processClass), isExcluded(isExcluded), generation(generation) {} template void serialize( Ar& ar ) { - ar & wi & initialClass & processClass & generation & reply; + ar & wi & initialClass & processClass & isExcluded & generation & reply; } }; diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index 3945a535ea..15186056c4 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -250,7 +250,7 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { } else { Optional nextNominee; if (availableLeaders.size() && availableCandidates.size()) { - nextNominee = (*availableLeaders.begin()).leaderChangeRequired(*availableCandidates.begin()) ? *availableCandidates.begin() : *availableLeaders.begin(); + nextNominee = ( *availableLeaders.begin() < *availableCandidates.begin() ) ? *availableLeaders.begin() : *availableCandidates.begin(); } else if (availableLeaders.size()) { nextNominee = *availableLeaders.begin(); } else if (availableCandidates.size()) { @@ -259,13 +259,16 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { nextNominee = Optional(); } - if ( currentNominee.present() != nextNominee.present() || (currentNominee.present() && !currentNominee.get().equalInternalId(nextNominee.get())) || !availableLeaders.size() ) { + if ( currentNominee.present() != nextNominee.present() || (currentNominee.present() && currentNominee.get().leaderChangeRequired(nextNominee.get())) || !availableLeaders.size() ) { TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID()) .detail("Changed", nextNominee != currentNominee).detail("Key", printable(key)); for(int i=0; i changeLeaderCoordinators( ServerCoordinators coordinators, Va return Void(); } -ACTOR Future tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference> outSerializedLeader, bool hasConnected, Reference> asyncProcessClass ) { +ACTOR Future tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference> outSerializedLeader, bool hasConnected, Reference> asyncProcessClass, Reference> asyncIsExcluded ) { state Reference>>> nominees( new AsyncVar>>() ); state LeaderInfo myInfo; state Future candidacies; @@ -94,7 +94,7 @@ ACTOR Future tryBecomeLeaderInternal( ServerCoordinators coordinators, Val myInfo.changeID = g_random->randomUniqueID(); prevChangeID = myInfo.changeID; - myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController)); + myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() ); vector> cand; for(int i=0; i tryBecomeLeaderInternal( ServerCoordinators coordinators, Val break; } when (Void _ = wait(candidacies)) { ASSERT(false); } - when (Void _ = wait( asyncProcessClass->onChange() )) { + when (Void _ = wait( asyncProcessClass->onChange() || asyncIsExcluded->onChange() )) { break; } } @@ -166,7 +166,8 @@ ACTOR Future tryBecomeLeaderInternal( ServerCoordinators coordinators, Val loop { prevChangeID = myInfo.changeID; - if (myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController))) { + myInfo.updateChangeID( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController), asyncIsExcluded->get() ); + if (myInfo.changeID != prevChangeID) { TraceEvent("ChangeLeaderChangeID").detail("PrevChangeID", prevChangeID).detail("NewChangeID", myInfo.changeID); } diff --git a/fdbserver/LeaderElection.h b/fdbserver/LeaderElection.h index da05fb0fd9..1bb1e15cf1 100644 --- a/fdbserver/LeaderElection.h +++ b/fdbserver/LeaderElection.h @@ -32,7 +32,8 @@ Future tryBecomeLeader( ServerCoordinators const& coordinators, LeaderInterface const& proposedInterface, Reference>> const& outKnownLeader, bool hasConnected, - Reference> const& asyncProcessClass); + Reference> const& asyncProcessClass, + Reference> const& asyncIsExcluded); // Participates in the given coordination group's leader election process, nominating the given // LeaderInterface (presumed to be a local interface) as leader. The leader election process is @@ -48,17 +49,18 @@ Future changeLeaderCoordinators( ServerCoordinators const& coordinators, V #pragma region Implementation -Future tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference> const& outSerializedLeader, bool const& hasConnected, Reference> const& asyncProcessClass ); +Future tryBecomeLeaderInternal( ServerCoordinators const& coordinators, Value const& proposedSerializedInterface, Reference> const& outSerializedLeader, bool const& hasConnected, Reference> const& asyncProcessClass, Reference> const& asyncIsExcluded ); template Future tryBecomeLeader( ServerCoordinators const& coordinators, LeaderInterface const& proposedInterface, Reference>> const& outKnownLeader, bool hasConnected, - Reference> const& asyncProcessClass) + Reference> const& asyncProcessClass, + Reference> const& asyncIsExcluded) { Reference> serializedInfo( new AsyncVar ); - Future m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass ); + Future m = tryBecomeLeaderInternal( coordinators, BinaryWriter::toValue(proposedInterface, IncludeVersion()), serializedInfo, hasConnected, asyncProcessClass, asyncIsExcluded ); return m || asyncDeserialize( serializedInfo, outKnownLeader ); } diff --git a/fdbserver/WorkerInterface.h b/fdbserver/WorkerInterface.h index a65be32bc6..dd28fea05a 100644 --- a/fdbserver/WorkerInterface.h +++ b/fdbserver/WorkerInterface.h @@ -256,8 +256,8 @@ class Database openDBOnServer( Reference> const& db, int Future extractClusterInterface( Reference>> const& a, Reference>> const& b ); Future fdbd( Reference const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix ); -Future workerServer( Reference const&, Reference>> const& ccInterface, LocalityData const& localities, Reference> const& asyncProcessClass, ProcessClass const& initialClass, std::string const& filename, int64_t const& memoryLimit, Future const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix ); -Future clusterController( Reference const&, Reference>> const& currentCC, Reference> const& asyncProcessClass ); +Future workerServer( Reference const&, Reference>> const& ccInterface, LocalityData const& localities, Reference> const& asyncProcessClass, ProcessClass const& initialClass, Reference> const& asyncIsExcluded, std::string const& filename, int64_t const& memoryLimit, Future const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix ); +Future clusterController( Reference const&, Reference>> const& currentCC, Reference> const& asyncProcessClass, Reference> const& asyncIsExcluded ); // These servers are started by workerServer Future storageServer( diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 2ffa4e78bd..db9a874c40 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -954,6 +954,37 @@ ACTOR Future trackTlogRecovery( Reference self, Reference configurationMonitor( Reference self ) { + state Database cx = openDBOnServer(self->dbInfo, TaskDefaultEndpoint, true, true); + loop { + state ReadYourWritesTransaction tr(cx); + + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + state Future> fresults = tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ); + Void _ = wait( success(fresults) ); + Standalone results = fresults.get(); + ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY ); + + DatabaseConfiguration conf; + conf.fromKeyValues((VectorRef) results); + if(conf != self->configuration) { + self->configuration = conf; + self->registrationTrigger.trigger(); + } + + state Future watchFuture = tr.watch(excludedServersVersionKey); + Void _ = wait(tr.commit()); + Void _ = wait(watchFuture); + break; + } catch (Error& e) { + Void _ = wait( tr.onError(e) ); + } + } + } +} + ACTOR Future masterCore( Reference self, PromiseStream> addActor ) { state TraceInterval recoveryInterval("MasterRecovery"); @@ -1073,6 +1104,7 @@ ACTOR Future masterCore( Reference self, PromiseStream resolverFailure = waitResolverFailure( self->resolvers ); state Future proxyFailure = waitProxyFailure( self->proxies ); state Future providingVersions = provideVersions(self); + state Future configMonitor = configurationMonitor( self ); addActor.send( reportErrors(updateRegistration(self, self->logSystem), "updateRegistration", self->dbgid) ); self->registrationTrigger.trigger(); @@ -1156,7 +1188,8 @@ ACTOR Future masterCore( Reference self, PromiseStream getDiskStores( std::string folder ) { return result; } -ACTOR Future registrationClient( Reference>> ccInterface, WorkerInterface interf, Reference> asyncProcessClass, ProcessClass initialClass ) { +ACTOR Future registrationClient( Reference>> ccInterface, WorkerInterface interf, Reference> asyncProcessClass, ProcessClass initialClass, Reference> asyncIsExcluded) { // Keeps the cluster controller (as it may be re-elected) informed that this worker exists // The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register) state Generation requestGeneration = 0; loop { - Future registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), requestGeneration++) ) ) : Never(); + Future registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply( RegisterWorkerRequest(interf, initialClass, asyncProcessClass->get(), asyncIsExcluded->get(), requestGeneration++) ) ) : Never(); choose { - when ( ProcessClass newProcessClass = wait( registrationReply )) { - asyncProcessClass->set(newProcessClass); + when ( RegisterWorkerReply reply = wait( registrationReply )) { + asyncProcessClass->set( reply.processClass ); + asyncIsExcluded->set( reply.isExcluded ); } when ( Void _ = wait( ccInterface->onChange() )) { } } @@ -494,7 +495,7 @@ ACTOR Future monitorServerDBInfo( Reference workerServer( Reference connFile, Reference>> ccInterface, LocalityData localities, - Reference> asyncProcessClass, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) { + Reference> asyncProcessClass, ProcessClass initialClass, Reference> asyncIsExcluded, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix ) { state PromiseStream< ErrorInfo > errors; state Future handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last state ActorCollection errorForwarders(false); @@ -657,7 +658,7 @@ ACTOR Future workerServer( Reference connFile, Refe startRole( interf.id(), interf.id(), "Worker", details ); Void _ = wait(waitForAll(recoveries)); - errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass ) ); + errorForwarders.add( registrationClient( ccInterface, interf, asyncProcessClass, initialClass, asyncIsExcluded ) ); TraceEvent("RecoveriesComplete", interf.id()); @@ -972,13 +973,14 @@ ACTOR Future fdbd( Reference>> cc( new AsyncVar> ); Reference>> ci( new AsyncVar> ); Reference> asyncProcessClass(new AsyncVar(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource))); + Reference> asyncIsExcluded(new AsyncVar(false)); vector> v; if ( coordFolder.size() ) v.push_back( fileNotFoundToNever( coordinationServer( coordFolder ) ) ); //SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files - v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass), "clusterController") ); + v.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncProcessClass, asyncIsExcluded), "clusterController") ); v.push_back( reportErrors(extractClusterInterface( cc, ci ), "extractClusterInterface") ); v.push_back( reportErrors(failureMonitorClient( ci, true ), "failureMonitorClient") ); - v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) ); + v.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncProcessClass, processClass, asyncIsExcluded, dataFolder, memoryLimit, metricsConnFile, metricsPrefix), "workerServer", UID(), &normalWorkerErrors()) ); state Future firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" ); Void _ = wait( quorum(v,1) );