diff --git a/fdbserver/Backup.actor.cpp b/fdbserver/Backup.actor.cpp index 640736e1fc..ba0ad7e62a 100644 --- a/fdbserver/Backup.actor.cpp +++ b/fdbserver/Backup.actor.cpp @@ -110,10 +110,8 @@ ACTOR Future checkRemoved(Reference> db, uint64_t r } } -ACTOR Future backupWorker( - BackupInterface interf, InitializeBackupRequest req, - Reference> db) -{ +ACTOR Future backupWorker(BackupInterface interf, InitializeBackupRequest req, + Reference> db) { state BackupData self(interf.id(), req); state PromiseStream> addActor; state Future error = actorCollection(addActor.getFuture()); @@ -122,6 +120,7 @@ ACTOR Future backupWorker( TraceEvent("BackupWorkerStart", interf.id()); try { addActor.send(pullAsyncData(&self)); + addActor.send(waitFailureServer(interf.waitFailure.getFuture())); loop choose { when(wait(dbInfoChange)) { diff --git a/fdbserver/BackupInterface.h b/fdbserver/BackupInterface.h index 63a029efd4..bf3903bc7f 100644 --- a/fdbserver/BackupInterface.h +++ b/fdbserver/BackupInterface.h @@ -25,13 +25,14 @@ #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" +// The interface for backup workers. struct BackupInterface { RequestStream> waitFailure; RequestStream haltBackup; struct LocalityData locality; UID myId; - BackupInterface() {} + BackupInterface() = default; explicit BackupInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {} void initEndpoints() {} diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index fd0d9bf5ac..832ebcdd23 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -355,6 +355,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( LAST_LIMITED_RATIO, 2.0 ); // Backup Worker + init( BACKUP_TIMEOUT, 0.4 ); init( BACKUP_FAILURE_TIME, 1.0 ); init( WAIT_FOR_BACKUP_JOIN_DELAY, 1.0 ); diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 8c672e0111..aac6c66191 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -41,6 +41,7 @@ class LogSet : NonCopyable, public ReferenceCounted { public: std::vector>>> logServers; std::vector>>> logRouters; + std::vector>>> backupWorkers; int32_t tLogWriteAntiQuorum; int32_t tLogReplicationFactor; std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index f19a4c2e45..4fe5826a81 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -96,7 +96,7 @@ struct TLogSet { explicit TLogSet(const LogSet& rhs); std::string toString() const { - return format("anti: %d replication: %d local: %d routers: %d tLogs: %s backupWorkers: %s locality: %d", + return format("anti: %d replication: %d local: %d routers: %d tLogs: %s backupWorkers: %d locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, logRouters.size(), describe(tLogs).c_str(), backupWorkers.size(), locality); } diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 72863af4f7..4ca4595d62 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -81,12 +81,15 @@ LogSet::LogSet(const TLogSet& tLogSet) : locality(tLogSet.locality), startVersion(tLogSet.startVersion), satelliteTagLocations(tLogSet.satelliteTagLocations) { - for(const auto& log : tLogSet.tLogs) { + for (const auto& log : tLogSet.tLogs) { logServers.emplace_back(new AsyncVar>(log)); } - for(const auto& log : tLogSet.logRouters) { + for (const auto& log : tLogSet.logRouters) { logRouters.emplace_back(new AsyncVar>(log)); } + for (const auto& log : tLogSet.backupWorkers) { + backupWorkers.emplace_back(new AsyncVar>(log)); + } filterLocalityDataForPolicy(tLogPolicy, &tLogLocalities); updateLocalitySet(tLogLocalities); } @@ -117,10 +120,12 @@ TLogSet::TLogSet(const LogSet& rhs) : for (const auto& tlog : rhs.logServers) { tLogs.push_back(tlog->get()); } - for (const auto& logRouter : rhs.logRouters) { logRouters.push_back(logRouter->get()); } + for (const auto& worker : rhs.backupWorkers) { + backupWorkers.push_back(worker->get()); + } } OldTLogConf::OldTLogConf(const OldLogData& oldLogData) : @@ -399,6 +404,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedonChange()); } } + for (const auto& worker : it->backupWorkers) { + if (worker->get().present()) { + failed.push_back(waitFailureClient( + worker->get().interf().waitFailure, SERVER_KNOBS->BACKUP_TIMEOUT, + -SERVER_KNOBS->BACKUP_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY)); + } else { + changes.push_back(worker->onChange()); + } + } } if(!self->recoveryCompleteWrittenToCoreState.get()) { @@ -1833,6 +1847,30 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted newBackupWorkers(Reference logSet, RecruitFromConfigurationReply recruits, + LogEpoch recoveryCount) { + std::vector> initializationReplies; + for (const auto& worker : recruits.backupWorkers) { + InitializeBackupRequest req(g_random->randomUniqueID()); + req.recoveryCount = recoveryCount; + req.startVersion = logSet->startVersion; + TraceEvent("BackupRecruitment").detail("WorkerID", worker.id()).detail("RecoveryCount", recoveryCount) + .detail("StartVersion", req.startVersion); + initializationReplies.push_back(transformErrors( + throwErrorOr(worker.backup.getReplyUnlessFailedFor(req, SERVER_KNOBS->BACKUP_TIMEOUT, + SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)), + master_recovery_failed())); + } + + std::vector newRecruits = wait(getAll(initializationReplies)); + for (const auto& interf : newRecruits) { + logSet->backupWorkers.emplace_back( + new AsyncVar>(OptionalInterface(interf))); + } + + return Void(); + } + ACTOR static Future newRemoteEpoch( TagPartitionedLogSystem* self, Reference oldLogSystem, Future fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality, std::vector allTags ) { TraceEvent("RemoteLogRecruitment_WaitingForWorkers"); state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers ); @@ -2162,6 +2200,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedTLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + state Future recruitBackup = newBackupWorkers(logSystem->tLogs[0], recr, recoveryCount); state std::vector> recoveryComplete; if(region.satelliteTLogReplicationFactor > 0) { @@ -2235,6 +2274,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[0]->logServers[i] = Reference>>( new AsyncVar>( OptionalInterface(initializationReplies[i].get()) ) ); diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 84f9dce281..c4889398bc 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -197,7 +197,6 @@ struct MasterData : NonCopyable, ReferenceCounted { std::vector proxies; std::vector provisionalProxies; std::vector resolvers; - std::vector backupWorkers; std::map lastProxyVersionReplies; @@ -339,23 +338,6 @@ ACTOR Future newTLogServers( Reference self, RecruitFromConfig return Void(); } -ACTOR Future newBackupWorkers(Reference self, RecruitFromConfigurationReply recruits) { - std::vector> initializationReplies; - for (int i = 0; i < recruits.backupWorkers.size(); i++) { - InitializeBackupRequest req(g_random->randomUniqueID()); - TraceEvent("BackupReplies", self->dbgid).detail("WorkerID", recruits.backupWorkers[i].id()); - initializationReplies.push_back( - transformErrors(throwErrorOr(recruits.backupWorkers[i].backup.getReplyUnlessFailedFor( - req, SERVER_KNOBS->BACKUP_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)), - master_recovery_failed())); - } - - std::vector newRecruits = wait(getAll(initializationReplies)); - self->backupWorkers = newRecruits; - - return Void(); -} - ACTOR Future newSeedServers( Reference self, RecruitFromConfigurationReply recruits, vector* servers ) { // This is only necessary if the database is at version 0 servers->clear(); @@ -625,7 +607,7 @@ ACTOR Future>> recruitEverything( Refere wait( newSeedServers( self, recruits, seedServers ) ); state vector> confChanges; wait(newProxies(self, recruits) && newResolvers(self, recruits) && - newTLogServers(self, recruits, oldLogSystem, &confChanges) && newBackupWorkers(self, recruits)); + newTLogServers(self, recruits, oldLogSystem, &confChanges)); return confChanges; } @@ -1112,6 +1094,7 @@ static std::set const& normalMasterErrors() { s.insert( error_code_master_tlog_failed ); s.insert( error_code_master_proxy_failed ); s.insert( error_code_master_resolver_failed ); + s.insert( error_code_master_backup_worker_failed ); s.insert( error_code_recruitment_failed ); s.insert( error_code_no_more_servers ); s.insert( error_code_master_recovery_failed ); @@ -1508,9 +1491,9 @@ ACTOR Future masterServer( MasterInterface mi, Reference