Log tlog initialization

This commit is contained in:
Zhe Wu 2022-07-20 15:05:10 -07:00
parent 45c8a4e09d
commit bd99f4fa3b

View File

@ -2614,6 +2614,8 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
req.tLogLocalities = localities;
req.tLogPolicy = logSet->tLogPolicy;
req.locality = remoteLocality;
TraceEvent("RemoteTLogRouterReplies", self->dbgid)
.detail("WorkerID", remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].id());
logRouterInitializationReplies.push_back(transformErrors(
throwErrorOr(
remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor(
@ -2693,11 +2695,13 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
}
remoteTLogInitializationReplies.reserve(remoteWorkers.remoteTLogs.size());
for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++)
for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++) {
TraceEvent("RemoteTLogReplies", self->dbgid).detail("WorkerID", remoteWorkers.remoteTLogs[i].id());
remoteTLogInitializationReplies.push_back(transformErrors(
throwErrorOr(remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor(
remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
}
TraceEvent("RemoteLogRecruitment_InitializingRemoteLogs")
.detail("StartVersion", logSet->startVersion)
@ -2966,11 +2970,13 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
}
initializationReplies.reserve(recr.tLogs.size());
for (int i = 0; i < recr.tLogs.size(); i++)
for (int i = 0; i < recr.tLogs.size(); i++) {
TraceEvent("PrimaryTLogReplies", logSystem->getDebugID()).detail("WorkerID", recr.tLogs[i].id());
initializationReplies.push_back(transformErrors(
throwErrorOr(recr.tLogs[i].tLog.getReplyUnlessFailedFor(
reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
}
state std::vector<Future<Void>> recoveryComplete;
@ -3034,11 +3040,14 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
}
satelliteInitializationReplies.reserve(recr.satelliteTLogs.size());
for (int i = 0; i < recr.satelliteTLogs.size(); i++)
for (int i = 0; i < recr.satelliteTLogs.size(); i++) {
TraceEvent("PrimarySatelliteTLogReplies", logSystem->getDebugID())
.detail("WorkerID", recr.satelliteTLogs[i].id());
satelliteInitializationReplies.push_back(transformErrors(
throwErrorOr(recr.satelliteTLogs[i].tLog.getReplyUnlessFailedFor(
sreqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
}
wait(waitForAll(satelliteInitializationReplies) || oldRouterRecruitment);