WIP: should be divided into smaller commits.

This commit is contained in:
Jingyu Zhou 2019-07-23 11:45:04 -07:00
parent 17002740bb
commit 11964733b7
8 changed files with 170 additions and 87 deletions

View File

@ -48,6 +48,10 @@ enum {
tagLocalityInvalid = -99 tagLocalityInvalid = -99
}; //The TLog and LogRouter require these number to be as compact as possible }; //The TLog and LogRouter require these number to be as compact as possible
inline bool isPseudoLocality(int8_t locality) {
return locality == tagLocalityLogRouterMapped || locality == tagLocalityBackup;
}
#pragma pack(push, 1) #pragma pack(push, 1)
struct Tag { struct Tag {
int8_t locality; int8_t locality;

View File

@ -30,11 +30,12 @@
struct BackupData { struct BackupData {
const UID myId; const UID myId;
const Tag tag; const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
const Version startVersion; const Version startVersion;
Version endVersion; // mutable in a new epoch, i.e., end version for this epoch is known. Version endVersion; // mutable in a new epoch, i.e., end version for this epoch is known.
const LogEpoch epoch; const LogEpoch epoch;
Version minKnownCommittedVersion; Version minKnownCommittedVersion;
Version poppedVersion;
AsyncVar<Reference<ILogSystem>> logSystem; AsyncVar<Reference<ILogSystem>> logSystem;
Database cx; Database cx;
std::vector<TagsAndMessage> messages; std::vector<TagsAndMessage> messages;
@ -42,11 +43,20 @@ struct BackupData {
NotifiedVersion version; NotifiedVersion version;
AsyncTrigger backupDone; AsyncTrigger backupDone;
CounterCollection cc;
Future<Void> logger;
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req) explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
: myId(id), tag(req.routerTag), startVersion(req.startVersion), : myId(id), tag(req.routerTag), startVersion(req.startVersion),
endVersion(req.endVersion.present() ? req.endVersion.get() : std::numeric_limits<Version>::max()), endVersion(req.endVersion.present() ? req.endVersion.get() : std::numeric_limits<Version>::max()),
epoch(req.epoch), minKnownCommittedVersion(invalidVersion), version(invalidVersion) { epoch(req.epoch), minKnownCommittedVersion(invalidVersion), poppedVersion(invalidVersion),
version(req.startVersion - 1), cc("BackupWorker", id.toString()) {
cx = openDBOnServer(db, TaskDefaultEndpoint, true, true); cx = openDBOnServer(db, TaskDefaultEndpoint, true, true);
specialCounter(cc, "PoppedVersion", [this]() { return this->poppedVersion; });
specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; });
logger = traceCounters("BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc,
"BackupWorkerMetrics");
} }
}; };
@ -71,28 +81,33 @@ ACTOR Future<Void> saveProgress(BackupData* self, Version backupVersion) {
// Uploads self->messages to cloud storage. // Uploads self->messages to cloud storage.
ACTOR Future<Void> uploadData(BackupData* self) { ACTOR Future<Void> uploadData(BackupData* self) {
state Version popVersion = invalidVersion; state Version popVersion = invalidVersion;
state Version lastPopVersion = 0;
loop { loop {
ASSERT(self->messages.size() == self->versions.size()); ASSERT(self->messages.size() == self->versions.size());
// TODO: upload messages
while (!self->messages.empty()) { while (!self->messages.empty()) {
popVersion = std::max(popVersion, self->versions[0]); popVersion = std::max(popVersion, self->versions[0]);
// TODO: consume the messages
self->messages.erase(self->messages.begin()); self->messages.erase(self->messages.begin());
self->versions.erase(self->versions.begin()); self->versions.erase(self->versions.begin());
} }
// TODO: upload messages
Future<Void> savedProgress = Void(); Future<Void> savedProgress = Void();
if (self->logSystem.get() && popVersion > lastPopVersion) { if (self->logSystem.get() && popVersion > self->poppedVersion) {
const Tag popTag = self->logSystem.get()->getPseudoPopTag(self->tag, ProcessClass::BackupClass);
self->logSystem.get()->pop(popVersion, popTag);
lastPopVersion = popVersion;
TraceEvent("BackupWorkerPop", self->myId).detail("V", popVersion).detail("Tag", self->tag.toString());
savedProgress = saveProgress(self, popVersion); savedProgress = saveProgress(self, popVersion);
} }
if (lastPopVersion >= self->endVersion) { wait(delay(30) && savedProgress); // TODO: knobify the delay of 30s
if (self->logSystem.get() && popVersion > self->poppedVersion) {
const Tag popTag = self->logSystem.get()->getPseudoPopTag(self->tag, ProcessClass::BackupClass);
self->logSystem.get()->pop(popVersion, popTag);
self->poppedVersion = popVersion;
TraceEvent("BackupWorkerPop", self->myId)
.detail("V", popVersion)
.detail("Tag", self->tag.toString())
.detail("PopTag", popTag.toString());
}
if (self->poppedVersion >= self->endVersion) {
self->backupDone.trigger(); self->backupDone.trigger();
} }
wait(delay(30) && savedProgress); // TODO: knobify the delay of 30s
} }
} }
@ -122,6 +137,8 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, r->getMinKnownCommittedVersion()); self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, r->getMinKnownCommittedVersion());
// TODO: avoid peeking uncommitted messages // TODO: avoid peeking uncommitted messages
// Should we wait until knownCommittedVersion == startVersion - 1 ? In this way, we know previous
// epoch has finished and then starting for this epoch.
while (r->hasMessage()) { while (r->hasMessage()) {
lastVersion = r->version().version; lastVersion = r->version().version;
self->messages.emplace_back(r->getMessage(), std::vector<Tag>()); self->messages.emplace_back(r->getMessage(), std::vector<Tag>());
@ -140,14 +157,15 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch r
state UID lastMasterID(0,0); state UID lastMasterID(0,0);
loop { loop {
bool isDisplaced = bool isDisplaced =
((db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED) || (db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED);
(db->get().recoveryCount == recoveryCount && db->get().recoveryState == RecoveryState::FULLY_RECOVERED)); // (db->get().recoveryCount == recoveryCount && db->get().recoveryState == RecoveryState::FULLY_RECOVERED));
isDisplaced = isDisplaced && !db->get().logSystemConfig.hasBackupWorker(self->myId); isDisplaced = isDisplaced && !db->get().logSystemConfig.hasBackupWorker(self->myId);
if (isDisplaced) { if (isDisplaced) {
TraceEvent("BackupWorkerDisplaced", self->myId) TraceEvent("BackupWorkerDisplaced", self->myId)
.detail("RecoveryCount", recoveryCount) .detail("RecoveryCount", recoveryCount)
.detail("PoppedVersion", self->poppedVersion)
.detail("DBRecoveryCount", db->get().recoveryCount) .detail("DBRecoveryCount", db->get().recoveryCount)
.detail("RecoveryState", (int)db->get().recoveryState); .detail("RecoveryState", (int)db->get().recoveryState);
throw worker_removed(); throw worker_removed();
} }
if (db->get().master.id() != lastMasterID) { if (db->get().master.id() != lastMasterID) {

View File

@ -73,11 +73,11 @@ struct LogRouterData {
} }
}; };
UID dbgid; const UID dbgid;
Reference<AsyncVar<Reference<ILogSystem>>> logSystem; Reference<AsyncVar<Reference<ILogSystem>>> logSystem;
NotifiedVersion version; NotifiedVersion version;
NotifiedVersion minPopped; NotifiedVersion minPopped;
Version startVersion; const Version startVersion;
Version minKnownCommittedVersion; Version minKnownCommittedVersion;
Version poppedVersion; Version poppedVersion;
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks; Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
@ -108,7 +108,7 @@ struct LogRouterData {
//only callable after getTagData returns a null reference //only callable after getTagData returns a null reference
Reference<TagData> createTagData(Tag tag, Version popped, Version knownCommittedVersion) { Reference<TagData> createTagData(Tag tag, Version popped, Version knownCommittedVersion) {
Reference<TagData> newTagData = Reference<TagData>( new TagData(tag, popped, knownCommittedVersion) ); Reference<TagData> newTagData(new TagData(tag, popped, knownCommittedVersion));
tag_data[tag.id] = newTagData; tag_data[tag.id] = newTagData;
return newTagData; return newTagData;
} }
@ -221,21 +221,17 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
state Reference<ILogSystem::IPeekCursor> r; state Reference<ILogSystem::IPeekCursor> r;
state Version tagAt = self->version.get() + 1; state Version tagAt = self->version.get() + 1;
state Version lastVer = 0; state Version lastVer = 0;
state std::vector<int> tags;
loop { loop {
loop { loop choose {
choose { when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) { break; }
when(wait( r ? r->getMore(TaskPriority::TLogCommit) : Never() ) ) { when(wait(dbInfoChange)) { // FIXME: does this actually happen?
break; if (self->logSystem->get()) {
} r = self->logSystem->get()->peekLogRouter(self->dbgid, tagAt, self->routerTag);
when( wait( dbInfoChange ) ) { //FIXME: does this actually happen? } else {
if( self->logSystem->get() ) r = Reference<ILogSystem::IPeekCursor>();
r = self->logSystem->get()->peekLogRouter( self->dbgid, tagAt, self->routerTag );
else
r = Reference<ILogSystem::IPeekCursor>();
dbInfoChange = self->logSystem->onChange();
} }
dbInfoChange = self->logSystem->onChange();
} }
} }
@ -275,7 +271,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
TagsAndMessage tagAndMsg; TagsAndMessage tagAndMsg;
tagAndMsg.message = r->getMessageWithTags(); tagAndMsg.message = r->getMessageWithTags();
tags.clear(); std::vector<int> tags;
self->logSet.getPushLocations(r->getTags(), tags, 0); self->logSet.getPushLocations(r->getTags(), tags, 0);
tagAndMsg.tags.reserve(arena, tags.size()); tagAndMsg.tags.reserve(arena, tags.size());
for (const auto& t : tags) { for (const auto& t : tags) {

View File

@ -714,6 +714,11 @@ struct ILogSystem {
// Call only on an ILogSystem obtained from recoverAndEndEpoch() // Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable) // Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable)
virtual Version getStartVersion() const = 0; // Returns the start version of current epoch.
// Returns end versions for old epochs that this log system is aware of, excluding the current epoch.
virtual std::map<LogEpoch, Version> getEpochEndVersions() const = 0;
virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config,
LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags, Reference<AsyncVar<bool>> const& recruitmentStalled ) = 0; LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags, Reference<AsyncVar<bool>> const& recruitmentStalled ) = 0;
// Call only on an ILogSystem obtained from recoverAndEndEpoch() // Call only on an ILogSystem obtained from recoverAndEndEpoch()
@ -736,6 +741,7 @@ struct ILogSystem {
virtual bool hasRemoteLogs() const = 0; virtual bool hasRemoteLogs() const = 0;
virtual Tag getRandomRouterTag() const = 0; virtual Tag getRandomRouterTag() const = 0;
virtual int getLogRouterTags() const = 0; // Returns the number of router tags.
virtual Tag getRandomTxsTag() const = 0; virtual Tag getRandomTxsTag() const = 0;
@ -749,9 +755,12 @@ struct ILogSystem {
// process class doesn't use pseudo tag, return the same tag. // process class doesn't use pseudo tag, return the same tag.
virtual Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) = 0; virtual Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) = 0;
virtual bool isPseudoLocality(int8_t locality) = 0; virtual bool hasPseudoLocality(int8_t locality) = 0;
virtual Version popPseudoLocalityTag(int8_t locality, Version upTo) = 0; virtual Version popPseudoLocalityTag(int8_t locality, Version upTo) = 0;
virtual void setBackupWorkers(
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers) = 0;
}; };
struct LengthPrefixedStringRef { struct LengthPrefixedStringRef {

View File

@ -695,9 +695,13 @@ ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere
} }
state Version upTo = to; state Version upTo = to;
int8_t tagLocality = inputTag.locality; int8_t tagLocality = inputTag.locality;
if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) { if (isPseudoLocality(tagLocality)) {
upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, to); if (logData->logSystem->get().isValid()) {
tagLocality = tagLocalityLogRouter; upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, to);
tagLocality = tagLocalityLogRouter;
} else {
TraceEvent("TLogPopNoLogSystem", self->dbgid).detail("Locality", tagLocality).detail("Version", upTo);
}
} }
state Tag tag(tagLocality, inputTag.id); state Tag tag(tagLocality, inputTag.id);
auto tagData = logData->getTagData(tag); auto tagData = logData->getTagData(tag);

View File

@ -983,9 +983,15 @@ ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere
} }
state Version upTo = to; state Version upTo = to;
int8_t tagLocality = inputTag.locality; int8_t tagLocality = inputTag.locality;
if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) { if (isPseudoLocality(tagLocality)) {
upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, to); if (logData->logSystem->get().isValid()) {
tagLocality = tagLocalityLogRouter; upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, to);
tagLocality = tagLocalityLogRouter;
} else {
// TODO: if this happens, need to save the popped version? or discard the pop?
printf("TLogPopNoLogSystem %s, Locality %d, Version %ld", self->dbgid.toString().c_str(), tagLocality,
upTo);
}
} }
state Tag tag(tagLocality, inputTag.id); state Tag tag(tagLocality, inputTag.id);
auto tagData = logData->getTagData(tag); auto tagData = logData->getTagData(tag);

View File

@ -250,7 +250,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
break; break;
case ProcessClass::BackupClass: case ProcessClass::BackupClass:
if (tag.locality == tagLocalityLogRouter && pseudoLocalities.count(tag.locality) > 0) { if (tag.locality == tagLocalityBackup) {
ASSERT(pseudoLocalities.count(tag.locality) > 0);
tag.locality = tagLocalityBackup; tag.locality = tagLocalityBackup;
} }
break; break;
@ -261,18 +262,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return tag; return tag;
} }
bool isPseudoLocality(int8_t locality) override { bool hasPseudoLocality(int8_t locality) override { return pseudoLocalities.count(locality) > 0; }
return pseudoLocalities.count(locality) > 0;
}
Version popPseudoLocalityTag(int8_t locality, Version upTo) override { Version popPseudoLocalityTag(int8_t locality, Version upTo) override {
ASSERT(isPseudoLocality(locality)); ASSERT(hasPseudoLocality(locality));
auto& localityVersion = pseudoLocalityPopVersion[locality]; auto& localityVersion = pseudoLocalityPopVersion[locality];
localityVersion = std::max(localityVersion, upTo); localityVersion = std::max(localityVersion, upTo);
Version minVersion = localityVersion; Version minVersion = localityVersion;
for (const auto& it : pseudoLocalityPopVersion) { for (const auto& it : pseudoLocalityPopVersion) {
minVersion = std::min(minVersion, it.second); minVersion = std::min(minVersion, it.second);
} }
TraceEvent("Pop").detail("L", locality).detail("Version", upTo).detail("PopVersion", minVersion);
return minVersion; return minVersion;
} }
@ -1323,6 +1323,26 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return tLogs[0]->tLogVersion; return tLogs[0]->tLogVersion;
} }
int getLogRouterTags() const override { return logRouterTags; }
Version getStartVersion() const override {
ASSERT(tLogs.size() > 0);
return tLogs[0]->startVersion;
}
std::map<LogEpoch, Version> getEpochEndVersions() const override {
std::map<LogEpoch, Version> epochEndVersion;
for (const auto old : oldLogData) {
epochEndVersion[old.epoch] = old.epochEnd;
}
return epochEndVersion;
}
void setBackupWorkers(std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers) override {
ASSERT(tLogs.size() > 0);
tLogs[0]->backupWorkers = backupWorkers;
}
ACTOR static Future<Void> monitorLog(Reference<AsyncVar<OptionalInterface<TLogInterface>>> logServer, Reference<AsyncVar<bool>> failed) { ACTOR static Future<Void> monitorLog(Reference<AsyncVar<OptionalInterface<TLogInterface>>> logServer, Reference<AsyncVar<bool>> failed) {
state Future<Void> waitFailure; state Future<Void> waitFailure;
loop { loop {
@ -1868,47 +1888,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return localTags; return localTags;
} }
ACTOR static Future<Void> loadBackupProgress() {
// get a map of UID -> recoveryCount, savedVersion
// for each old epoch:
// if savedVersion >= epochEnd - 1 = knownCommittedVersion
// skip or remove this entry
// else
// recover/backup [savedVersion + 1, epochEnd - 1]
// use the old recoveryCount and tag. Make sure old worker stopped.
return Void();
}
ACTOR static Future<Void> recruitBackupWorkers(Reference<TagPartitionedLogSystem> logSystem,
RecruitFromConfigurationReply recruits, LogEpoch recoveryCount) {
if (recruits.backupWorkers.size() == 0) return Void();
std::vector<Future<BackupInterface>> initializationReplies;
for (int i = 0; i < logSystem->logRouterTags; i++) {
const auto& worker = recruits.backupWorkers[i % recruits.backupWorkers.size()];
InitializeBackupRequest req(g_random->randomUniqueID());
req.epoch = recoveryCount;
req.routerTag = Tag(tagLocalityLogRouter, i);
req.startVersion = logSystem->tLogs[0]->startVersion;
TraceEvent("BackupRecruitment")
.detail("WorkerID", worker.id())
.detail("Epoch", recoveryCount)
.detail("StartVersion", req.startVersion);
initializationReplies.push_back(transformErrors(
throwErrorOr(worker.backup.getReplyUnlessFailedFor(req, SERVER_KNOBS->BACKUP_TIMEOUT,
SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
}
std::vector<BackupInterface> newRecruits = wait(getAll(initializationReplies));
for (const auto& interf : newRecruits) {
logSystem->tLogs[0]->backupWorkers.emplace_back(
new AsyncVar<OptionalInterface<BackupInterface>>(OptionalInterface<BackupInterface>(interf)));
}
return Void();
}
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality, std::vector<Tag> allTags ) { ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality, std::vector<Tag> allTags ) {
TraceEvent("RemoteLogRecruitment_WaitingForWorkers"); TraceEvent("RemoteLogRecruitment_WaitingForWorkers");
state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers ); state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers );
@ -2075,6 +2054,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->expectedLogSets++; logSystem->expectedLogSets++;
logSystem->addPseudoLocality(tagLocalityLogRouterMapped); logSystem->addPseudoLocality(tagLocalityLogRouterMapped);
logSystem->addPseudoLocality(tagLocalityBackup); logSystem->addPseudoLocality(tagLocalityBackup);
TraceEvent("AddPseudoLocality", logSystem->getDebugID())
.detail("Locality1", "LogRouterMapped")
.detail("Locality2", "Backup");
} }
logSystem->tLogs.emplace_back(new LogSet()); logSystem->tLogs.emplace_back(new LogSet());
@ -2240,7 +2222,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 0; i < recr.tLogs.size(); i++ ) for( int i = 0; i < recr.tLogs.size(); i++ )
initializationReplies.push_back( transformErrors( throwErrorOr( recr.tLogs[i].tLog.getReplyUnlessFailedFor( reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); initializationReplies.push_back( transformErrors( throwErrorOr( recr.tLogs[i].tLog.getReplyUnlessFailedFor( reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
state Future<Void> recruitBackup = recruitBackupWorkers(logSystem, recr, recoveryCount);
state std::vector<Future<Void>> recoveryComplete; state std::vector<Future<Void>> recoveryComplete;
if(region.satelliteTLogReplicationFactor > 0) { if(region.satelliteTLogReplicationFactor > 0) {
@ -2314,7 +2295,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
} }
wait( waitForAll( initializationReplies ) || oldRouterRecruitment ); wait( waitForAll( initializationReplies ) || oldRouterRecruitment );
wait(recruitBackup);
for( int i = 0; i < initializationReplies.size(); i++ ) { for( int i = 0; i < initializationReplies.size(); i++ ) {
logSystem->tLogs[0]->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(initializationReplies[i].get()) ) ); logSystem->tLogs[0]->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(initializationReplies[i].get()) ) );

View File

@ -223,6 +223,8 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
int8_t safeLocality; int8_t safeLocality;
int8_t primaryLocality; int8_t primaryLocality;
std::vector<WorkerInterface> backupWorkers; // Recruited backup workers from cluster controller.
MasterData( MasterData(
Reference<AsyncVar<ServerDBInfo>> const& dbInfo, Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
MasterInterface const& myInterface, MasterInterface const& myInterface,
@ -592,6 +594,7 @@ ACTOR Future<vector<Standalone<CommitTransactionRef>>> recruitEverything( Refere
self->remoteDcIds.push_back(recruits.dcId.get() == self->configuration.regions[0].dcId ? self->configuration.regions[1].dcId : self->configuration.regions[0].dcId); self->remoteDcIds.push_back(recruits.dcId.get() == self->configuration.regions[0].dcId ? self->configuration.regions[1].dcId : self->configuration.regions[0].dcId);
} }
} }
self->backupWorkers.swap(recruits.backupWorkers);
TraceEvent("MasterRecoveryState", self->dbgid) TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::initializing_transaction_servers) .detail("StatusCode", RecoveryStatus::initializing_transaction_servers)
@ -1238,6 +1241,7 @@ ACTOR Future<std::vector<std::tuple<UID, LogEpoch, Version>>> getBackupProgress(
try { try {
state std::vector<std::tuple<UID, LogEpoch, Version>> progress; state std::vector<std::tuple<UID, LogEpoch, Version>> progress;
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Standalone<RangeResultRef> results = wait(tr.getRange(backupProgressKeys, CLIENT_KNOBS->TOO_MANY)); Standalone<RangeResultRef> results = wait(tr.getRange(backupProgressKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY); ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
@ -1257,6 +1261,68 @@ ACTOR Future<std::vector<std::tuple<UID, LogEpoch, Version>>> getBackupProgress(
} }
} }
ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self) {
if (self->backupWorkers.size() == 0) return Void();
LogEpoch epoch = self->cstate.myDBState.recoveryCount;
state Future<std::vector<std::tuple<UID, LogEpoch, Version>>> backupProgress = getBackupProgress(self);
state std::map<LogEpoch, Version> epochEndVersion = self->logSystem->getEpochEndVersions();
std::vector<Future<BackupInterface>> initializationReplies;
const int logRouterTags = self->logSystem->getLogRouterTags();
const Version startVersion = self->logSystem->getStartVersion();
for (int i = 0; i < logRouterTags; i++) {
const auto& worker = self->backupWorkers[i % self->backupWorkers.size()];
InitializeBackupRequest req(g_random->randomUniqueID());
req.epoch = epoch;
req.routerTag = Tag(tagLocalityLogRouter, i);
req.startVersion = startVersion;
TraceEvent("BackupRecruitment", self->dbgid)
.detail("WorkerID", worker.id())
.detail("Epoch", epoch)
.detail("StartVersion", req.startVersion);
initializationReplies.push_back(
transformErrors(throwErrorOr(worker.backup.getReplyUnlessFailedFor(
req, SERVER_KNOBS->BACKUP_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
}
std::vector<BackupInterface> newRecruits = wait(getAll(initializationReplies));
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
for (const auto& interf : newRecruits) {
backupWorkers.emplace_back(
new AsyncVar<OptionalInterface<BackupInterface>>(OptionalInterface<BackupInterface>(interf)));
}
self->logSystem->setBackupWorkers(backupWorkers);
// get a map of UID -> recoveryCount, savedVersion
// for each old epoch:
// if savedVersion >= epochEnd - 1 = knownCommittedVersion
// skip or remove this entry
// else
// recover/backup [savedVersion + 1, epochEnd - 1]
// use the old recoveryCount and tag. Make sure old worker stopped.
state std::vector<std::tuple<UID, LogEpoch, Version>> progress = wait(backupProgress);
for (const auto& t : progress) {
const UID uid = std::get<0>(t);
const LogEpoch epoch = std::get<1>(t);
const Version version = std::get<2>(t);
auto it = epochEndVersion.find(epoch);
if (it != epochEndVersion.end()) {
TraceEvent("BW", self->dbgid)
.detail("UID", uid)
.detail("Epoch", epoch)
.detail("Version", version)
.detail("BackedupVersion", it->second);
} else {
TraceEvent("BW", self->dbgid).detail("UID", uid).detail("Epoch", epoch).detail("Version", version);
}
}
TraceEvent("BackupRecruitmentDone", self->dbgid);
return Void();
}
ACTOR Future<Void> masterCore( Reference<MasterData> self ) { ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
state TraceInterval recoveryInterval("MasterRecovery"); state TraceInterval recoveryInterval("MasterRecovery");
state double recoverStartTime = now(); state double recoverStartTime = now();
@ -1284,7 +1350,6 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
state Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems( new AsyncVar<Reference<ILogSystem>> ); state Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems( new AsyncVar<Reference<ILogSystem>> );
state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->cstate.prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality, &self->forceRecovery); state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->cstate.prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality, &self->forceRecovery);
state Future<std::vector<std::tuple<UID, LogEpoch, Version>>> backupProgress = getBackupProgress(self);
DBCoreState newState = self->cstate.myDBState; DBCoreState newState = self->cstate.myDBState;
newState.recoveryCount++; newState.recoveryCount++;
@ -1476,6 +1541,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
self->addActor.send( changeCoordinators(self) ); self->addActor.send( changeCoordinators(self) );
self->addActor.send( configurationMonitor( self ) ); self->addActor.send( configurationMonitor( self ) );
self->addActor.send(recruitBackupWorkers(self));
wait( Future<Void>(Never()) ); wait( Future<Void>(Never()) );
throw internal_error(); throw internal_error();