/*
 * RestoreController.actor.cpp
 *
 * This source file is part of the FoundationDB open source project
 *
 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// This file implements the functions for RestoreController role

#include "fdbrpc/RangeMap.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/MutationList.h"
#include "fdbclient/BackupContainer.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreCommon.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/RestoreController.actor.h"
#include "fdbserver/RestoreApplier.actor.h"
#include "fdbserver/RestoreLoader.actor.h"

#include "flow/Platform.h"
#include "flow/actorcompiler.h" // This must be the last #include.

ACTOR static Future<Void> clearDB(Database cx);
ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
                                                std::vector<RestoreFileFR>* rangeFiles,
                                                std::vector<RestoreFileFR>* logFiles,
                                                Version* minRangeVersion,
                                                Database cx,
                                                RestoreRequest request);
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
                                             std::vector<RestoreFileFR>* pRangeFiles,
                                             Key url);

ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerData> self,
                                                   Database cx,
                                                   RestoreRequest request);
ACTOR static Future<Void> startProcessRestoreRequests(Reference<RestoreControllerData> self, Database cx);
ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreControllerData> self,
                                                            int batchIndex,
                                                            Database cx,
                                                            RestoreRequest request,
                                                            VersionBatch versionBatch);

ACTOR static Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWorker,
                                              Reference<RestoreControllerData> controllerData);
ACTOR static Future<Void> distributeRestoreSysInfo(Reference<RestoreControllerData> controllerData,
                                                   KeyRangeMap<Version>* pRangeVersions);

ACTOR static Future<std::vector<RestoreRequest>> collectRestoreRequests(Database cx);
ACTOR static Future<Void> initializeVersionBatch(std::map<UID, RestoreApplierInterface> appliersInterf,
                                                 std::map<UID, RestoreLoaderInterface> loadersInterf,
                                                 int batchIndex);
ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<ControllerBatchData> batchData,
                                                        Reference<ControllerBatchStatus> batchStatus,
                                                        std::map<UID, RestoreApplierInterface> appliersInterf,
                                                        int batchIndex,
                                                        NotifiedVersion* finishedBatch);
ACTOR static Future<Void> notifyLoadersVersionBatchFinished(std::map<UID, RestoreLoaderInterface> loadersInterf,
                                                            int batchIndex);
ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreControllerData> self, bool terminate);
ACTOR static Future<Void> signalRestoreCompleted(Reference<RestoreControllerData> self, Database cx);
ACTOR static Future<Void> updateHeartbeatTime(Reference<RestoreControllerData> self);
ACTOR static Future<Void> checkRolesLiveness(Reference<RestoreControllerData> self);

void splitKeyRangeForAppliers(Reference<ControllerBatchData> batchData,
                              std::map<UID, RestoreApplierInterface> appliersInterf,
                              int batchIndex);

ACTOR Future<Void> sampleBackups(Reference<RestoreControllerData> self, RestoreControllerInterface ci) {
	loop {
		try {
			RestoreSamplesRequest req = waitNext(ci.samples.getFuture());
			TraceEvent(SevDebug, "FastRestoreControllerSampleBackups")
			    .detail("SampleID", req.id)
			    .detail("BatchIndex", req.batchIndex)
			    .detail("Samples", req.samples.size());
			ASSERT(req.batchIndex <= self->batch.size()); // batchIndex starts from 1

			Reference<ControllerBatchData> batch = self->batch[req.batchIndex];
			ASSERT(batch.isValid());
			if (batch->sampleMsgs.find(req.id) != batch->sampleMsgs.end()) {
				req.reply.send(RestoreCommonReply(req.id));
				continue;
			}
			batch->sampleMsgs.insert(req.id);
			for (auto& m : req.samples) {
				batch->samples.addMetric(m.key, m.size);
				batch->samplesSize += m.size;
			}
			req.reply.send(RestoreCommonReply(req.id));
		} catch (Error& e) {
			TraceEvent(SevWarn, "FastRestoreControllerSampleBackupsError", self->id()).error(e);
			break;
		}
	}

	return Void();
}

ACTOR Future<Void> startRestoreController(Reference<RestoreWorkerData> controllerWorker, Database cx) {
	ASSERT(controllerWorker.isValid());
	ASSERT(controllerWorker->controllerInterf.present());
	state Reference<RestoreControllerData> self =
	    makeReference<RestoreControllerData>(controllerWorker->controllerInterf.get().id());
	state Future<Void> error = actorCollection(self->addActor.getFuture());

	try {
		// recruitRestoreRoles must come after controllerWorker has finished collectWorkerInterface
		wait(recruitRestoreRoles(controllerWorker, self));

		// self->addActor.send(updateHeartbeatTime(self));
		self->addActor.send(checkRolesLiveness(self));
		self->addActor.send(updateProcessMetrics(self));
		self->addActor.send(traceProcessMetrics(self, "RestoreController"));
		self->addActor.send(sampleBackups(self, controllerWorker->controllerInterf.get()));

		wait(startProcessRestoreRequests(self, cx) || error);
	} catch (Error& e) {
		if (e.code() != error_code_operation_cancelled) {
			TraceEvent(SevError, "FastRestoreControllerStart").detail("Reason", "Unexpected unhandled error").error(e);
		}
	}

	return Void();
}

// RestoreWorker that has restore controller role: Recruite a role for each worker
ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWorker,
                                       Reference<RestoreControllerData> controllerData) {
	state int nodeIndex = 0;
	state RestoreRole role = RestoreRole::Invalid;

	TraceEvent("FastRestoreController", controllerData->id())
	    .detail("RecruitRestoreRoles", controllerWorker->workerInterfaces.size())
	    .detail("NumLoaders", SERVER_KNOBS->FASTRESTORE_NUM_LOADERS)
	    .detail("NumAppliers", SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS);
	ASSERT(controllerData->loadersInterf.empty() && controllerData->appliersInterf.empty());
	ASSERT(controllerWorker->controllerInterf.present());

	ASSERT(controllerData.isValid());
	ASSERT(SERVER_KNOBS->FASTRESTORE_NUM_LOADERS > 0 && SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS > 0);
	// We assign 1 role per worker for now
	ASSERT(SERVER_KNOBS->FASTRESTORE_NUM_LOADERS + SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS <=
	       controllerWorker->workerInterfaces.size());

	// Assign a role to each worker
	std::vector<std::pair<UID, RestoreRecruitRoleRequest>> requests;
	for (auto& workerInterf : controllerWorker->workerInterfaces) {
		if (nodeIndex >= 0 && nodeIndex < SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS) {
			// [0, numApplier) are appliers
			role = RestoreRole::Applier;
		} else if (nodeIndex >= SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS &&
		           nodeIndex < SERVER_KNOBS->FASTRESTORE_NUM_LOADERS + SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS) {
			// [numApplier, numApplier + numLoader) are loaders
			role = RestoreRole::Loader;
		} else {
			break;
		}

		TraceEvent("FastRestoreController", controllerData->id())
		    .detail("WorkerNode", workerInterf.first)
		    .detail("NodeRole", role)
		    .detail("NodeIndex", nodeIndex);
		requests.emplace_back(workerInterf.first,
		                      RestoreRecruitRoleRequest(controllerWorker->controllerInterf.get(), role, nodeIndex));
		nodeIndex++;
	}

	state std::vector<RestoreRecruitRoleReply> replies;
	wait(getBatchReplies(&RestoreWorkerInterface::recruitRole, controllerWorker->workerInterfaces, requests, &replies));
	for (auto& reply : replies) {
		if (reply.role == RestoreRole::Applier) {
			ASSERT_WE_THINK(reply.applier.present());
			controllerData->appliersInterf[reply.applier.get().id()] = reply.applier.get();
		} else if (reply.role == RestoreRole::Loader) {
			ASSERT_WE_THINK(reply.loader.present());
			controllerData->loadersInterf[reply.loader.get().id()] = reply.loader.get();
		} else {
			TraceEvent(SevError, "FastRestoreController").detail("RecruitRestoreRolesInvalidRole", reply.role);
		}
	}
	controllerData->recruitedRoles.send(Void());
	TraceEvent("FastRestoreRecruitRestoreRolesDone", controllerData->id())
	    .detail("Workers", controllerWorker->workerInterfaces.size())
	    .detail("RecruitedRoles", replies.size());

	return Void();
}

ACTOR Future<Void> distributeRestoreSysInfo(Reference<RestoreControllerData> controllerData,
                                            KeyRangeMap<Version>* pRangeVersions) {
	ASSERT(controllerData.isValid());
	ASSERT(!controllerData->loadersInterf.empty());
	RestoreSysInfo sysInfo(controllerData->appliersInterf);
	// Construct serializable KeyRange versions
	Standalone<VectorRef<std::pair<KeyRangeRef, Version>>> rangeVersionsVec;
	auto ranges = pRangeVersions->ranges();
	int i = 0;
	for (auto r = ranges.begin(); r != ranges.end(); ++r) {
		rangeVersionsVec.push_back(rangeVersionsVec.arena(),
		                           std::make_pair(KeyRangeRef(r->begin(), r->end()), r->value()));
		TraceEvent("DistributeRangeVersions")
		    .detail("RangeIndex", i++)
		    .detail("RangeBegin", r->begin())
		    .detail("RangeEnd", r->end())
		    .detail("RangeVersion", r->value());
	}
	std::vector<std::pair<UID, RestoreSysInfoRequest>> requests;
	for (auto& loader : controllerData->loadersInterf) {
		requests.emplace_back(loader.first, RestoreSysInfoRequest(sysInfo, rangeVersionsVec));
	}

	TraceEvent("FastRestoreDistributeRestoreSysInfoToLoaders", controllerData->id())
	    .detail("Loaders", controllerData->loadersInterf.size());
	wait(sendBatchRequests(&RestoreLoaderInterface::updateRestoreSysInfo, controllerData->loadersInterf, requests));
	TraceEvent("FastRestoreDistributeRestoreSysInfoToLoadersDone", controllerData->id())
	    .detail("Loaders", controllerData->loadersInterf.size());

	return Void();
}

// The server of the restore controller. It drives the restore progress with the following steps:
// 1) Lock database and clear the normal keyspace
// 2) Wait on each RestoreRequest, which is sent by RestoreTool operated by DBA
// 3) Process each restore request in actor processRestoreRequest;
// 3.1) Sample workload to decide the key range for each applier, which is implemented as a dummy sampling;
// 3.2) Send each loader the map of key-range to applier interface;
// 3.3) Construct requests of which file should be loaded by which loader, and send requests to loaders;
// 4) After process all restore requests, finish restore by cleaning up the restore related system key
//    and ask all restore roles to quit.
ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreControllerData> self, Database cx) {
	state UID randomUID = deterministicRandom()->randomUniqueID();
	state std::vector<RestoreRequest> restoreRequests = wait(collectRestoreRequests(cx));
	state int restoreIndex = 0;

	TraceEvent("FastRestoreControllerWaitOnRestoreRequests", self->id())
	    .detail("RestoreRequests", restoreRequests.size());

	// TODO: Sanity check restoreRequests' key ranges do not overlap

	// Step: Perform the restore requests
	try {
		for (restoreIndex = 0; restoreIndex < restoreRequests.size(); restoreIndex++) {
			state RestoreRequest request = restoreRequests[restoreIndex];
			state KeyRange range = request.range.removePrefix(request.removePrefix).withPrefix(request.addPrefix);
			TraceEvent("FastRestoreControllerProcessRestoreRequests", self->id())
			    .detail("RestoreRequestInfo", request.toString())
			    .detail("TransformedKeyRange", range);
			// TODO: Initialize controllerData and all loaders and appliers' data for each restore request!
			self->resetPerRestoreRequest();

			// clear the key range that will be restored
			wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
				tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
				tr->setOption(FDBTransactionOptions::LOCK_AWARE);
				tr->clear(range);
				return Void();
			}));

			wait(success(processRestoreRequest(self, cx, request)));
			wait(notifyRestoreCompleted(self, false));
		}
	} catch (Error& e) {
		if (restoreIndex < restoreRequests.size()) {
			TraceEvent(SevError, "FastRestoreControllerProcessRestoreRequestsFailed", self->id())
			    .error(e)
			    .detail("RestoreRequest", restoreRequests[restoreIndex].toString());
		} else {
			TraceEvent(SevError, "FastRestoreControllerProcessRestoreRequestsFailed", self->id())
			    .error(e)
			    .detail("RestoreRequests", restoreRequests.size())
			    .detail("RestoreIndex", restoreIndex);
		}
	}

	// Step: Notify all restore requests have been handled by cleaning up the restore keys
	wait(signalRestoreCompleted(self, cx));

	TraceEvent("FastRestoreControllerRestoreCompleted", self->id());

	return Void();
}

ACTOR static Future<Void> monitorFinishedVersion(Reference<RestoreControllerData> self, RestoreRequest request) {
	loop {
		TraceEvent("FastRestoreMonitorFinishedVersion", self->id())
		    .detail("RestoreRequest", request.toString())
		    .detail("BatchIndex", self->finishedBatch.get());
		wait(delay(SERVER_KNOBS->FASTRESTORE_VB_MONITOR_DELAY));
	}
}

ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerData> self,
                                                   Database cx,
                                                   RestoreRequest request) {
	state std::vector<RestoreFileFR> rangeFiles;
	state std::vector<RestoreFileFR> logFiles;
	state std::vector<RestoreFileFR> allFiles;
	state Version minRangeVersion = MAX_VERSION;

	self->initBackupContainer(request.url);

	// Get all backup files' description and save them to files
	state Version targetVersion =
	    wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, &minRangeVersion, cx, request));
	ASSERT(targetVersion > 0);
	ASSERT(minRangeVersion != MAX_VERSION); // otherwise, all mutations will be skipped

	std::sort(rangeFiles.begin(), rangeFiles.end());
	std::sort(logFiles.begin(), logFiles.end(), [](RestoreFileFR const& f1, RestoreFileFR const& f2) -> bool {
		return std::tie(f1.endVersion, f1.beginVersion, f1.fileIndex, f1.fileName) <
		       std::tie(f2.endVersion, f2.beginVersion, f2.fileIndex, f2.fileName);
	});

	// Build range versions: version of key ranges in range file
	state KeyRangeMap<Version> rangeVersions(minRangeVersion, allKeys.end);
	if (SERVER_KNOBS->FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE) {
		wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url));
	} else {
		// Debug purpose, dump range versions
		auto ranges = rangeVersions.ranges();
		int i = 0;
		for (auto r = ranges.begin(); r != ranges.end(); ++r) {
			TraceEvent(SevDebug, "SingleRangeVersion")
			    .detail("RangeIndex", i++)
			    .detail("RangeBegin", r->begin())
			    .detail("RangeEnd", r->end())
			    .detail("RangeVersion", r->value());
		}
	}

	wait(distributeRestoreSysInfo(self, &rangeVersions));

	// Divide files into version batches.
	self->buildVersionBatches(rangeFiles, logFiles, &self->versionBatches, targetVersion);
	self->dumpVersionBatches(self->versionBatches);

	state std::vector<Future<Void>> fBatches;
	state std::vector<VersionBatch> versionBatches; // To randomize invoking order of version batchs
	for (auto& vb : self->versionBatches) {
		versionBatches.push_back(vb.second);
	}

	// releaseVBOutOfOrder can only be true in simulation
	state bool releaseVBOutOfOrder = g_network->isSimulated() ? deterministicRandom()->random01() < 0.5 : false;
	ASSERT(g_network->isSimulated() || !releaseVBOutOfOrder);
	if (releaseVBOutOfOrder) {
		// Randomize invoking order of version batches
		int permTimes = deterministicRandom()->randomInt(0, 100);
		while (permTimes-- > 0) {
			std::next_permutation(versionBatches.begin(), versionBatches.end());
		}
	}

	self->addActor.send(monitorFinishedVersion(self, request));
	state std::vector<VersionBatch>::iterator versionBatch = versionBatches.begin();
	for (; versionBatch != versionBatches.end(); versionBatch++) {
		while (self->runningVersionBatches.get() >= SERVER_KNOBS->FASTRESTORE_VB_PARALLELISM && !releaseVBOutOfOrder) {
			// Control how many batches can be processed in parallel. Avoid dead lock due to OOM on loaders
			TraceEvent("FastRestoreControllerDispatchVersionBatches")
			    .detail("WaitOnRunningVersionBatches", self->runningVersionBatches.get());
			wait(self->runningVersionBatches.onChange());
		}
		int batchIndex = versionBatch->batchIndex;
		TraceEvent("FastRestoreControllerDispatchVersionBatches")
		    .detail("BatchIndex", batchIndex)
		    .detail("BatchSize", versionBatch->size)
		    .detail("RunningVersionBatches", self->runningVersionBatches.get())
		    .detail("VersionBatches", versionBatches.size());
		self->batch[batchIndex] = makeReference<ControllerBatchData>();
		self->batchStatus[batchIndex] = makeReference<ControllerBatchStatus>();
		fBatches.push_back(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, *versionBatch));
		// Wait a bit to give the current version batch a head start from the next version batch
		wait(delay(SERVER_KNOBS->FASTRESTORE_VB_LAUNCH_DELAY));
	}

	try {
		wait(waitForAll(fBatches));
	} catch (Error& e) {
		TraceEvent(SevError, "FastRestoreControllerDispatchVersionBatchesUnexpectedError").error(e);
	}

	TraceEvent("FastRestoreController").detail("RestoreToVersion", request.targetVersion);
	return request.targetVersion;
}

ACTOR static Future<Void> loadFilesOnLoaders(Reference<ControllerBatchData> batchData,
                                             Reference<ControllerBatchStatus> batchStatus,
                                             std::map<UID, RestoreLoaderInterface> loadersInterf,
                                             int batchIndex,
                                             Database cx,
                                             RestoreRequest request,
                                             VersionBatch versionBatch,
                                             bool isRangeFile) {
	// set is internally sorted
	std::set<RestoreFileFR>* files = isRangeFile ? &versionBatch.rangeFiles : &versionBatch.logFiles;

	TraceEvent("FastRestoreControllerPhaseLoadFilesStart")
	    .detail("RestoreRequestID", request.randomUid)
	    .detail("BatchIndex", batchIndex)
	    .detail("FileTypeLoadedInVersionBatch", isRangeFile)
	    .detail("BeginVersion", versionBatch.beginVersion)
	    .detail("EndVersion", versionBatch.endVersion)
	    .detail("Files", (files != nullptr ? files->size() : -1));

	std::vector<std::pair<UID, RestoreLoadFileRequest>> requests;
	std::map<UID, RestoreLoaderInterface>::iterator loader = loadersInterf.begin();
	state std::vector<RestoreAsset> assets; // all assets loaded, used for sanity check restore progress

	// Balance workload on loaders for parsing range and log files across version batches
	int random = deterministicRandom()->randomInt(0, loadersInterf.size());
	while (random-- > 0) {
		loader++;
	}

	int paramIdx = 0;
	for (auto& file : *files) {
		// TODO: Allow empty files in version batch; Filter out them here.
		if (loader == loadersInterf.end()) {
			loader = loadersInterf.begin();
		}
		// Prepare loading
		LoadingParam param;
		param.url = request.url;
		param.isRangeFile = file.isRange;
		param.rangeVersion = file.isRange ? file.version : -1;
		param.blockSize = file.blockSize;

		param.asset.uid = deterministicRandom()->randomUniqueID();
		param.asset.filename = file.fileName;
		param.asset.fileIndex = file.fileIndex;
		param.asset.partitionId = file.partitionId;
		param.asset.offset = 0;
		param.asset.len = file.fileSize;
		param.asset.range = request.range;
		param.asset.beginVersion = versionBatch.beginVersion;
		param.asset.endVersion = (isRangeFile || request.targetVersion == -1)
		                             ? versionBatch.endVersion
		                             : std::min(versionBatch.endVersion, request.targetVersion + 1);
		param.asset.addPrefix = request.addPrefix;
		param.asset.removePrefix = request.removePrefix;
		param.asset.batchIndex = batchIndex;

		TraceEvent("FastRestoreControllerPhaseLoadFiles")
		    .detail("BatchIndex", batchIndex)
		    .detail("LoadParamIndex", paramIdx)
		    .detail("LoaderID", loader->first.toString())
		    .detail("LoadParam", param.toString());
		ASSERT_WE_THINK(param.asset.len > 0);
		ASSERT_WE_THINK(param.asset.offset >= 0);
		ASSERT_WE_THINK(param.asset.offset <= file.fileSize);
		ASSERT_WE_THINK(param.asset.beginVersion <= param.asset.endVersion);

		requests.emplace_back(loader->first, RestoreLoadFileRequest(batchIndex, param));
		// Restore asset should only be loaded exactly once.
		if (batchStatus->raStatus.find(param.asset) != batchStatus->raStatus.end()) {
			TraceEvent(SevError, "FastRestoreControllerPhaseLoadFiles")
			    .detail("LoadingParam", param.toString())
			    .detail("RestoreAssetAlreadyProcessed", batchStatus->raStatus[param.asset]);
		}
		batchStatus->raStatus[param.asset] = RestoreAssetStatus::Loading;
		assets.push_back(param.asset);
		++loader;
		++paramIdx;
	}
	TraceEvent(files->size() != paramIdx ? SevError : SevInfo, "FastRestoreControllerPhaseLoadFiles")
	    .detail("BatchIndex", batchIndex)
	    .detail("Files", files->size())
	    .detail("LoadParams", paramIdx);

	state std::vector<RestoreLoadFileReply> replies;
	// Wait on the batch of load files or log files
	wait(getBatchReplies(
	    &RestoreLoaderInterface::loadFile, loadersInterf, requests, &replies, TaskPriority::RestoreLoaderLoadFiles));

	TraceEvent("FastRestoreControllerPhaseLoadFilesReply")
	    .detail("BatchIndex", batchIndex)
	    .detail("SamplingReplies", replies.size());
	for (auto& reply : replies) {
		// Update and sanity check restore asset's status
		RestoreAssetStatus status = batchStatus->raStatus[reply.param.asset];
		if (status == RestoreAssetStatus::Loading && !reply.isDuplicated) {
			batchStatus->raStatus[reply.param.asset] = RestoreAssetStatus::Loaded;
		} else if (status == RestoreAssetStatus::Loading && reply.isDuplicated) {
			// Duplicate request wait on the restore asset to be processed before it replies
			batchStatus->raStatus[reply.param.asset] = RestoreAssetStatus::Loaded;
			TraceEvent(SevWarn, "FastRestoreControllerPhaseLoadFilesReply")
			    .detail("RestoreAsset", reply.param.asset.toString())
			    .detail("DuplicateRequestArriveEarly", "RestoreAsset should have been processed");
		} else if (status == RestoreAssetStatus::Loaded && reply.isDuplicated) {
			TraceEvent(SevDebug, "FastRestoreControllerPhaseLoadFilesReply")
			    .detail("RestoreAsset", reply.param.asset.toString())
			    .detail("RequestIgnored", "Loading request was sent more than once");
		} else {
			TraceEvent(SevError, "FastRestoreControllerPhaseLoadFilesReply")
			    .detail("RestoreAsset", reply.param.asset.toString())
			    .detail("UnexpectedReply", reply.toString());
		}
	}

	// Sanity check: all restore assets status should be Loaded
	for (auto& asset : assets) {
		if (batchStatus->raStatus[asset] != RestoreAssetStatus::Loaded) {
			TraceEvent(SevError, "FastRestoreControllerPhaseLoadFilesReply")
			    .detail("RestoreAsset", asset.toString())
			    .detail("UnexpectedStatus", batchStatus->raStatus[asset]);
		}
	}

	TraceEvent("FastRestoreControllerPhaseLoadFilesDone")
	    .detail("BatchIndex", batchIndex)
	    .detail("FileTypeLoadedInVersionBatch", isRangeFile)
	    .detail("BeginVersion", versionBatch.beginVersion)
	    .detail("EndVersion", versionBatch.endVersion);
	return Void();
}

// Ask loaders to send its buffered mutations to appliers
ACTOR static Future<Void> sendMutationsFromLoaders(Reference<ControllerBatchData> batchData,
                                                   Reference<ControllerBatchStatus> batchStatus,
                                                   std::map<UID, RestoreLoaderInterface> loadersInterf,
                                                   int batchIndex,
                                                   bool useRangeFile) {
	TraceEvent("FastRestoreControllerPhaseSendMutationsFromLoadersStart")
	    .detail("BatchIndex", batchIndex)
	    .detail("UseRangeFiles", useRangeFile)
	    .detail("Loaders", loadersInterf.size());

	std::vector<std::pair<UID, RestoreSendMutationsToAppliersRequest>> requests;
	for (auto& loader : loadersInterf) {
		requests.emplace_back(
		    loader.first, RestoreSendMutationsToAppliersRequest(batchIndex, batchData->rangeToApplier, useRangeFile));
		batchStatus->loadStatus[loader.first] =
		    useRangeFile ? RestoreSendStatus::SendingRanges : RestoreSendStatus::SendingLogs;
	}
	state std::vector<RestoreCommonReply> replies;
	wait(getBatchReplies(&RestoreLoaderInterface::sendMutations,
	                     loadersInterf,
	                     requests,
	                     &replies,
	                     TaskPriority::RestoreLoaderSendMutations));

	TraceEvent("FastRestoreControllerPhaseSendMutationsFromLoadersDone")
	    .detail("BatchIndex", batchIndex)
	    .detail("UseRangeFiles", useRangeFile)
	    .detail("Loaders", loadersInterf.size());

	return Void();
}

// Process a version batch. Phases (loading files, send mutations) should execute in order
ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreControllerData> self,
                                                            int batchIndex,
                                                            Database cx,
                                                            RestoreRequest request,
                                                            VersionBatch versionBatch) {
	state Reference<ControllerBatchData> batchData = self->batch[batchIndex];
	state Reference<ControllerBatchStatus> batchStatus = self->batchStatus[batchIndex];
	state double startTime = now();

	TraceEvent("FastRestoreControllerDispatchVersionBatchesStart", self->id())
	    .detail("BatchIndex", batchIndex)
	    .detail("BatchSize", versionBatch.size)
	    .detail("RunningVersionBatches", self->runningVersionBatches.get());

	self->runningVersionBatches.set(self->runningVersionBatches.get() + 1);

	// In case sampling data takes too much memory on controller
	wait(isSchedulable(self, batchIndex, __FUNCTION__));

	wait(initializeVersionBatch(self->appliersInterf, self->loadersInterf, batchIndex));

	ASSERT(!versionBatch.isEmpty());
	ASSERT(self->loadersInterf.size() > 0);
	ASSERT(self->appliersInterf.size() > 0);

	// Parse log files and send mutations to appliers before we parse range files
	// TODO: Allow loading both range and log files in parallel
	ASSERT(batchData->samples.empty());
	ASSERT(batchData->samplesSize < 1 && batchData->samplesSize > -1); // samplesSize should be 0
	ASSERT(batchStatus->raStatus.empty());
	ASSERT(batchStatus->loadStatus.empty());
	ASSERT(batchStatus->applyStatus.empty());

	// New backup has subversion to order mutations at the same version. For mutations at the same version,
	// range file's mutations have the largest subversion and larger than log file's.
	// SOMEDAY: Extend subversion to old-style backup.
	wait(
	    loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, false) &&
	    loadFilesOnLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, cx, request, versionBatch, true));

	ASSERT(batchData->rangeToApplier.empty());
	splitKeyRangeForAppliers(batchData, self->appliersInterf, batchIndex);

	// Ask loaders to send parsed mutations to appliers;
	// log mutations should be applied before range mutations at the same version, which is ensured by LogMessageVersion
	wait(sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, false) &&
	     sendMutationsFromLoaders(batchData, batchStatus, self->loadersInterf, batchIndex, true));

	// Synchronization point for version batch pipelining.
	// self->finishedBatch will continuously increase by 1 per version batch.
	wait(notifyApplierToApplyMutations(batchData, batchStatus, self->appliersInterf, batchIndex, &self->finishedBatch));

	wait(notifyLoadersVersionBatchFinished(self->loadersInterf, batchIndex));

	self->runningVersionBatches.set(self->runningVersionBatches.get() - 1);

	if (self->delayedActors > 0) {
		self->checkMemory.trigger();
	}

	TraceEvent("FastRestoreControllerDispatchVersionBatchesDone", self->id())
	    .detail("BatchIndex", batchIndex)
	    .detail("BatchSize", versionBatch.size)
	    .detail("RunningVersionBatches", self->runningVersionBatches.get())
	    .detail("Latency", now() - startTime);

	return Void();
}

// Decide which key range should be taken by which applier
// Input: samples in batchData
// Output: rangeToApplier in batchData
void splitKeyRangeForAppliers(Reference<ControllerBatchData> batchData,
                              std::map<UID, RestoreApplierInterface> appliersInterf,
                              int batchIndex) {
	ASSERT(batchData->samplesSize >= 0);
	// Sanity check: samples should not be used after freed
	ASSERT((batchData->samplesSize > 0 && !batchData->samples.empty()) ||
	       (batchData->samplesSize == 0 && batchData->samples.empty()));
	int numAppliers = appliersInterf.size();
	double slotSize = std::max(batchData->samplesSize / numAppliers, 1.0);
	double cumulativeSize = slotSize;
	TraceEvent("FastRestoreControllerPhaseCalculateApplierKeyRangesStart")
	    .detail("BatchIndex", batchIndex)
	    .detail("SamplingSize", batchData->samplesSize)
	    .detail("SlotSize", slotSize);

	std::set<Key> keyrangeSplitter; // unique key to split key range for appliers
	keyrangeSplitter.insert(normalKeys.begin); // First slot
	TraceEvent("FastRestoreControllerPhaseCalculateApplierKeyRanges")
	    .detail("BatchIndex", batchIndex)
	    .detail("CumulativeSize", cumulativeSize)
	    .detail("Slot", 0)
	    .detail("LowerBoundKey", normalKeys.begin);
	int slotIdx = 1;
	while (cumulativeSize < batchData->samplesSize) {
		IndexedSet<Key, int64_t>::iterator lowerBound = batchData->samples.index(cumulativeSize);
		if (lowerBound == batchData->samples.end()) {
			break;
		}
		keyrangeSplitter.insert(*lowerBound);
		TraceEvent("FastRestoreControllerPhaseCalculateApplierKeyRanges")
		    .detail("BatchIndex", batchIndex)
		    .detail("CumulativeSize", cumulativeSize)
		    .detail("Slot", slotIdx++)
		    .detail("LowerBoundKey", lowerBound->toString());
		cumulativeSize += slotSize;
	}
	if (keyrangeSplitter.size() < numAppliers) {
		TraceEvent(SevWarnAlways, "FastRestoreControllerPhaseCalculateApplierKeyRanges")
		    .detail("NotAllAppliersAreUsed", keyrangeSplitter.size())
		    .detail("NumAppliers", numAppliers);
	} else if (keyrangeSplitter.size() > numAppliers) {
		bool expected = (keyrangeSplitter.size() == numAppliers + 1);
		TraceEvent(expected ? SevWarn : SevError, "FastRestoreControllerPhaseCalculateApplierKeyRanges")
		    .detail("TooManySlotsThanAppliers", keyrangeSplitter.size())
		    .detail("NumAppliers", numAppliers)
		    .detail("SamplingSize", batchData->samplesSize)
		    .detail("PerformanceMayDegrade", "Last applier handles more data than others");
	}

	std::set<Key>::iterator splitter = keyrangeSplitter.begin();
	batchData->rangeToApplier.clear();
	for (auto& applier : appliersInterf) {
		if (splitter == keyrangeSplitter.end()) {
			break; // Not all appliers will be used
		}
		batchData->rangeToApplier[*splitter] = applier.first;
		splitter++;
	}
	ASSERT(batchData->rangeToApplier.size() > 0);
	ASSERT(batchData->sanityCheckApplierKeyRange());
	batchData->logApplierKeyRange(batchIndex);
	TraceEvent("FastRestoreControllerPhaseCalculateApplierKeyRangesDone")
	    .detail("BatchIndex", batchIndex)
	    .detail("SamplingSize", batchData->samplesSize)
	    .detail("SlotSize", slotSize);
	batchData->samples.clear();
}

ACTOR static Future<std::vector<RestoreRequest>> collectRestoreRequests(Database cx) {
	state std::vector<RestoreRequest> restoreRequests;
	state Future<Void> watch4RestoreRequest;
	state ReadYourWritesTransaction tr(cx);

	// restoreRequestTriggerKey should already been set
	loop {
		try {
			TraceEvent("FastRestoreControllerPhaseCollectRestoreRequestsWait").log();
			tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
			tr.setOption(FDBTransactionOptions::LOCK_AWARE);

			// Sanity check
			Optional<Value> numRequests = wait(tr.get(restoreRequestTriggerKey));
			ASSERT(numRequests.present());

			RangeResult restoreRequestValues = wait(tr.getRange(restoreRequestKeys, CLIENT_KNOBS->TOO_MANY));
			ASSERT(!restoreRequestValues.more);
			if (restoreRequestValues.size()) {
				for (auto& it : restoreRequestValues) {
					restoreRequests.push_back(decodeRestoreRequestValue(it.value));
					TraceEvent("FastRestoreControllerPhaseCollectRestoreRequests")
					    .detail("RestoreRequest", restoreRequests.back().toString());
				}
				break;
			} else {
				TraceEvent(SevError, "FastRestoreControllerPhaseCollectRestoreRequestsEmptyRequests").log();
				wait(delay(5.0));
			}
		} catch (Error& e) {
			wait(tr.onError(e));
		}
	}

	return restoreRequests;
}

// Collect the backup files' description into output_files by reading the backupContainer bc.
// Returns the restore target version.
ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
                                                std::vector<RestoreFileFR>* rangeFiles,
                                                std::vector<RestoreFileFR>* logFiles,
                                                Version* minRangeVersion,
                                                Database cx,
                                                RestoreRequest request) {
	state BackupDescription desc = wait(bc->describeBackup());

	// Convert version to real time for operators to read the BackupDescription desc.
	wait(desc.resolveVersionTimes(cx));

	if (request.targetVersion == invalidVersion && desc.maxRestorableVersion.present()) {
		request.targetVersion = desc.maxRestorableVersion.get();
	}

	TraceEvent("FastRestoreControllerPhaseCollectBackupFilesStart")
	    .detail("TargetVersion", request.targetVersion)
	    .detail("BackupDesc", desc.toString())
	    .detail("UseRangeFile", SERVER_KNOBS->FASTRESTORE_USE_RANGE_FILE)
	    .detail("UseLogFile", SERVER_KNOBS->FASTRESTORE_USE_LOG_FILE);
	if (g_network->isSimulated()) {
		std::cout << "Restore to version: " << request.targetVersion << "\nBackupDesc: \n" << desc.toString() << "\n\n";
	}

	state VectorRef<KeyRangeRef> restoreRanges;
	restoreRanges.add(request.range);
	Optional<RestorableFileSet> restorable = wait(bc->getRestoreSet(request.targetVersion, restoreRanges));

	if (!restorable.present()) {
		TraceEvent(SevWarn, "FastRestoreControllerPhaseCollectBackupFiles")
		    .detail("NotRestorable", request.targetVersion);
		throw restore_missing_data();
	}

	ASSERT(rangeFiles->empty());
	ASSERT(logFiles->empty());

	std::set<RestoreFileFR> uniqueRangeFiles;
	std::set<RestoreFileFR> uniqueLogFiles;
	double rangeSize = 0;
	double logSize = 0;
	*minRangeVersion = MAX_VERSION;
	if (SERVER_KNOBS->FASTRESTORE_USE_RANGE_FILE) {
		for (const RangeFile& f : restorable.get().ranges) {
			TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles")
			    .detail("RangeFile", f.toString());
			if (f.fileSize <= 0) {
				continue;
			}
			RestoreFileFR file(f);
			TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles")
			    .detail("RangeFileFR", file.toString());
			uniqueRangeFiles.insert(file);
			rangeSize += file.fileSize;
			*minRangeVersion = std::min(*minRangeVersion, file.version);
		}
	}
	if (MAX_VERSION == *minRangeVersion) {
		*minRangeVersion = 0; // If no range file, range version must be 0 so that we apply all mutations
	}

	if (SERVER_KNOBS->FASTRESTORE_USE_LOG_FILE) {
		for (const LogFile& f : restorable.get().logs) {
			TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles").detail("LogFile", f.toString());
			if (f.fileSize <= 0) {
				continue;
			}
			RestoreFileFR file(f);
			TraceEvent(SevFRDebugInfo, "FastRestoreControllerPhaseCollectBackupFiles")
			    .detail("LogFileFR", file.toString());
			logFiles->push_back(file);
			uniqueLogFiles.insert(file);
			logSize += file.fileSize;
		}
	}

	// Assign unique range files and log files to output
	rangeFiles->assign(uniqueRangeFiles.begin(), uniqueRangeFiles.end());
	logFiles->assign(uniqueLogFiles.begin(), uniqueLogFiles.end());

	TraceEvent("FastRestoreControllerPhaseCollectBackupFilesDone")
	    .detail("BackupDesc", desc.toString())
	    .detail("RangeFiles", rangeFiles->size())
	    .detail("LogFiles", logFiles->size())
	    .detail("RangeFileBytes", rangeSize)
	    .detail("LogFileBytes", logSize)
	    .detail("UseRangeFile", SERVER_KNOBS->FASTRESTORE_USE_RANGE_FILE)
	    .detail("UseLogFile", SERVER_KNOBS->FASTRESTORE_USE_LOG_FILE);
	return request.targetVersion;
}

// By the first and last block of *file to get (beginKey, endKey);
// set (beginKey, endKey) and file->version to pRangeVersions
ACTOR static Future<Void> insertRangeVersion(KeyRangeMap<Version>* pRangeVersions,
                                             RestoreFileFR* file,
                                             Reference<IBackupContainer> bc) {
	TraceEvent("FastRestoreControllerDecodeRangeVersion").detail("File", file->toString());
	RangeFile rangeFile = { file->version, (uint32_t)file->blockSize, file->fileName, file->fileSize };

	// First and last key are the range for this file: endKey is exclusive
	KeyRange fileRange = wait(bc->getSnapshotFileKeyRange(rangeFile));
	TraceEvent("FastRestoreControllerInsertRangeVersion")
	    .detail("DecodedRangeFile", file->fileName)
	    .detail("KeyRange", fileRange)
	    .detail("Version", file->version);
	// Update version for pRangeVersions's ranges in fileRange
	auto ranges = pRangeVersions->modify(fileRange);
	for (auto r = ranges.begin(); r != ranges.end(); ++r) {
		r->value() = std::max(r->value(), file->version);
	}

	// Dump the new key ranges
	ranges = pRangeVersions->ranges();
	int i = 0;
	for (auto r = ranges.begin(); r != ranges.end(); ++r) {
		TraceEvent(SevDebug, "RangeVersionsAfterUpdate")
		    .detail("File", file->toString())
		    .detail("FileRange", fileRange.toString())
		    .detail("FileVersion", file->version)
		    .detail("RangeIndex", i++)
		    .detail("RangeBegin", r->begin())
		    .detail("RangeEnd", r->end())
		    .detail("RangeVersion", r->value());
	}

	return Void();
}

// Build the version skyline of snapshot ranges by parsing range files;
// Expensive and slow operation that should not run in real prod.
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
                                             std::vector<RestoreFileFR>* pRangeFiles,
                                             Key url) {
	if (!g_network->isSimulated()) {
		TraceEvent(SevError, "ExpensiveBuildRangeVersions")
		    .detail("Reason", "Parsing all range files is slow and memory intensive");
		return Void();
	}
	Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());

	// Key ranges not in range files are empty;
	// Assign highest version to avoid applying any mutation in these ranges
	state int fileIndex = 0;
	state std::vector<Future<Void>> fInsertRangeVersions;
	for (; fileIndex < pRangeFiles->size(); ++fileIndex) {
		fInsertRangeVersions.push_back(insertRangeVersion(pRangeVersions, &pRangeFiles->at(fileIndex), bc));
	}

	wait(waitForAll(fInsertRangeVersions));

	return Void();
}

ACTOR static Future<Void> clearDB(Database cx) {
	wait(runRYWTransaction(cx, [](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
		tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
		tr->setOption(FDBTransactionOptions::LOCK_AWARE);
		tr->clear(normalKeys);
		return Void();
	}));

	return Void();
}

ACTOR static Future<Void> initializeVersionBatch(std::map<UID, RestoreApplierInterface> appliersInterf,
                                                 std::map<UID, RestoreLoaderInterface> loadersInterf,
                                                 int batchIndex) {
	TraceEvent("FastRestoreControllerPhaseInitVersionBatchForAppliersStart")
	    .detail("BatchIndex", batchIndex)
	    .detail("Appliers", appliersInterf.size());
	std::vector<std::pair<UID, RestoreVersionBatchRequest>> requestsToAppliers;
	requestsToAppliers.reserve(appliersInterf.size());
	for (auto& applier : appliersInterf) {
		requestsToAppliers.emplace_back(applier.first, RestoreVersionBatchRequest(batchIndex));
	}
	wait(sendBatchRequests(&RestoreApplierInterface::initVersionBatch, appliersInterf, requestsToAppliers));

	TraceEvent("FastRestoreControllerPhaseInitVersionBatchForLoaders")
	    .detail("BatchIndex", batchIndex)
	    .detail("Loaders", loadersInterf.size());
	std::vector<std::pair<UID, RestoreVersionBatchRequest>> requestsToLoaders;
	requestsToLoaders.reserve(loadersInterf.size());
	for (auto& loader : loadersInterf) {
		requestsToLoaders.emplace_back(loader.first, RestoreVersionBatchRequest(batchIndex));
	}
	wait(sendBatchRequests(&RestoreLoaderInterface::initVersionBatch, loadersInterf, requestsToLoaders));

	TraceEvent("FastRestoreControllerPhaseInitVersionBatchForAppliersDone").detail("BatchIndex", batchIndex);
	return Void();
}

// Calculate the amount of data each applier should keep outstanding to DB;
// This is the amount of data that are in in-progress transactions.
ACTOR static Future<Void> updateApplierWriteBW(Reference<ControllerBatchData> batchData,
                                               std::map<UID, RestoreApplierInterface> appliersInterf,
                                               int batchIndex) {
	state std::unordered_map<UID, double> applierRemainMB;
	state double totalRemainMB = SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB;
	state double standardAvgBW = SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB / SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS;
	state int loopCount = 0;
	state std::vector<RestoreUpdateRateReply> replies;
	state std::vector<std::pair<UID, RestoreUpdateRateRequest>> requests;
	for (auto& applier : appliersInterf) {
		applierRemainMB[applier.first] = SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB / SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS;
	}

	loop {
		requests.clear();
		for (auto& applier : appliersInterf) {
			double writeRate = totalRemainMB > 1 ? (applierRemainMB[applier.first] / totalRemainMB) *
			                                           SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB
			                                     : standardAvgBW;
			requests.emplace_back(applier.first, RestoreUpdateRateRequest(batchIndex, writeRate));
		}
		replies.clear();
		wait(getBatchReplies(
		    &RestoreApplierInterface::updateRate,
		    appliersInterf,
		    requests,
		    &replies,
		    TaskPriority::DefaultEndpoint)); // DefaultEndpoint has higher priority than fast restore endpoints
		ASSERT(replies.size() == requests.size());
		totalRemainMB = 0;
		for (int i = 0; i < replies.size(); i++) {
			UID& applierID = requests[i].first;
			applierRemainMB[applierID] = replies[i].remainMB;
			totalRemainMB += replies[i].remainMB;
		}
		ASSERT(totalRemainMB >= 0);
		double delayTime = SERVER_KNOBS->FASTRESTORE_RATE_UPDATE_SECONDS;
		if (loopCount == 0) { // First loop: Need to update writeRate quicker
			delayTime = 0.2;
		}
		loopCount++;
		wait(delay(delayTime));
	}
}

// Ask each applier to apply its received mutations to DB
// NOTE: Controller cannot start applying mutations at batchIndex until all appliers have applied for (batchIndex - 1)
//       because appliers at different batchIndex may have overlapped key ranges.
ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<ControllerBatchData> batchData,
                                                        Reference<ControllerBatchStatus> batchStatus,
                                                        std::map<UID, RestoreApplierInterface> appliersInterf,
                                                        int batchIndex,
                                                        NotifiedVersion* finishedBatch) {
	TraceEvent("FastRestoreControllerPhaseApplyToDBStart")
	    .detail("BatchIndex", batchIndex)
	    .detail("FinishedBatch", finishedBatch->get());

	wait(finishedBatch->whenAtLeast(batchIndex - 1));

	state Future<Void> updateRate;

	if (finishedBatch->get() == batchIndex - 1) {
		// Prepare the applyToDB requests
		std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;

		TraceEvent("FastRestoreControllerPhaseApplyToDBRunning")
		    .detail("BatchIndex", batchIndex)
		    .detail("Appliers", appliersInterf.size());
		for (auto& applier : appliersInterf) {
			ASSERT(batchStatus->applyStatus.find(applier.first) == batchStatus->applyStatus.end());
			requests.emplace_back(applier.first, RestoreVersionBatchRequest(batchIndex));
			batchStatus->applyStatus[applier.first] = RestoreApplyStatus::Applying;
		}
		state std::vector<RestoreCommonReply> replies;
		// The actor at each batchIndex should only occur once.
		// Use batchData->applyToDB just incase the actor at a batchIndex is executed more than once.
		if (!batchData->applyToDB.present()) {
			batchData->applyToDB = Never();
			batchData->applyToDB = getBatchReplies(&RestoreApplierInterface::applyToDB,
			                                       appliersInterf,
			                                       requests,
			                                       &replies,
			                                       TaskPriority::RestoreApplierWriteDB);
			updateRate = updateApplierWriteBW(batchData, appliersInterf, batchIndex);
		} else {
			TraceEvent(SevError, "FastRestoreControllerPhaseApplyToDB")
			    .detail("BatchIndex", batchIndex)
			    .detail("Attention", "Actor should not be invoked twice for the same batch index");
		}

		ASSERT(batchData->applyToDB.present());
		ASSERT(!batchData->applyToDB.get().isError());
		wait(batchData->applyToDB.get());

		// Sanity check all appliers have applied data to destination DB
		for (auto& reply : replies) {
			if (batchStatus->applyStatus[reply.id] == RestoreApplyStatus::Applying) {
				batchStatus->applyStatus[reply.id] = RestoreApplyStatus::Applied;
				if (reply.isDuplicated) {
					TraceEvent(SevWarn, "FastRestoreControllerPhaseApplyToDB")
					    .detail("Applier", reply.id)
					    .detail("DuplicateRequestReturnEarlier", "Apply db request should have been processed");
				}
			}
		}
		for (auto& applier : appliersInterf) {
			if (batchStatus->applyStatus[applier.first] != RestoreApplyStatus::Applied) {
				TraceEvent(SevError, "FastRestoreControllerPhaseApplyToDB")
				    .detail("Applier", applier.first)
				    .detail("ApplyStatus", batchStatus->applyStatus[applier.first]);
			}
		}
		finishedBatch->set(batchIndex);
	}

	TraceEvent("FastRestoreControllerPhaseApplyToDBDone")
	    .detail("BatchIndex", batchIndex)
	    .detail("FinishedBatch", finishedBatch->get());

	return Void();
}

// Notify loaders that all data in the version batch has been applied to DB.
ACTOR static Future<Void> notifyLoadersVersionBatchFinished(std::map<UID, RestoreLoaderInterface> loadersInterf,
                                                            int batchIndex) {
	TraceEvent("FastRestoreControllerPhaseNotifyLoadersVersionBatchFinishedStart").detail("BatchIndex", batchIndex);
	std::vector<std::pair<UID, RestoreVersionBatchRequest>> requestsToLoaders;
	requestsToLoaders.reserve(loadersInterf.size());
	for (auto& loader : loadersInterf) {
		requestsToLoaders.emplace_back(loader.first, RestoreVersionBatchRequest(batchIndex));
	}
	wait(sendBatchRequests(&RestoreLoaderInterface::finishVersionBatch, loadersInterf, requestsToLoaders));
	TraceEvent("FastRestoreControllerPhaseNotifyLoadersVersionBatchFinishedDone").detail("BatchIndex", batchIndex);

	return Void();
}

// Ask all loaders and appliers to perform housecleaning at the end of a restore request
// Terminate those roles if terminate = true
ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreControllerData> self, bool terminate = false) {
	std::vector<std::pair<UID, RestoreFinishRequest>> requests;
	TraceEvent("FastRestoreControllerPhaseNotifyRestoreCompletedStart").log();
	for (auto& loader : self->loadersInterf) {
		requests.emplace_back(loader.first, RestoreFinishRequest(terminate));
	}

	Future<Void> endLoaders = sendBatchRequests(&RestoreLoaderInterface::finishRestore, self->loadersInterf, requests);

	requests.clear();
	for (auto& applier : self->appliersInterf) {
		requests.emplace_back(applier.first, RestoreFinishRequest(terminate));
	}
	Future<Void> endAppliers =
	    sendBatchRequests(&RestoreApplierInterface::finishRestore, self->appliersInterf, requests);

	// If terminate = true, loaders and appliers exits immediately after it receives the request. Controller may not
	// receive acks.
	if (!terminate) {
		wait(endLoaders && endAppliers);
	}

	TraceEvent("FastRestoreControllerPhaseNotifyRestoreCompletedDone").log();

	return Void();
}

// Register the restoreRequestDoneKey to signal the end of restore
ACTOR static Future<Void> signalRestoreCompleted(Reference<RestoreControllerData> self, Database cx) {
	state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));

	wait(notifyRestoreCompleted(self, true)); // notify workers the restore has completed

	wait(delay(5.0)); // Give some time for loaders and appliers to exit

	// Notify tester that the restore has finished
	loop {
		try {
			tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
			tr->setOption(FDBTransactionOptions::LOCK_AWARE);
			tr->clear(restoreRequestTriggerKey);
			tr->clear(restoreRequestKeys);
			Version readVersion = wait(tr->getReadVersion());
			tr->set(restoreRequestDoneKey, restoreRequestDoneVersionValue(readVersion));
			wait(tr->commit());
			break;
		} catch (Error& e) {
			wait(tr->onError(e));
		}
	}

	TraceEvent("FastRestoreControllerAllRestoreCompleted").log();

	return Void();
}

// Update the most recent time when controller receives hearbeat from each loader and applier
// TODO: Replace the heartbeat mechanism with FDB failure monitoring mechanism
ACTOR static Future<Void> updateHeartbeatTime(Reference<RestoreControllerData> self) {
	wait(self->recruitedRoles.getFuture());

	int numRoles = self->loadersInterf.size() + self->appliersInterf.size();
	state std::map<UID, RestoreLoaderInterface>::iterator loader = self->loadersInterf.begin();
	state std::map<UID, RestoreApplierInterface>::iterator applier = self->appliersInterf.begin();
	state std::vector<Future<RestoreCommonReply>> fReplies(numRoles, Never()); // TODO: Reserve memory for this vector
	state std::vector<UID> nodes;
	state int index = 0;
	state Future<Void> fTimeout = Void();

	// Initialize nodes only once
	std::transform(self->loadersInterf.begin(),
	               self->loadersInterf.end(),
	               std::back_inserter(nodes),
	               [](const std::pair<UID, RestoreLoaderInterface>& in) { return in.first; });
	std::transform(self->appliersInterf.begin(),
	               self->appliersInterf.end(),
	               std::back_inserter(nodes),
	               [](const std::pair<UID, RestoreApplierInterface>& in) { return in.first; });

	loop {
		loader = self->loadersInterf.begin();
		applier = self->appliersInterf.begin();
		index = 0;
		std::fill(fReplies.begin(), fReplies.end(), Never());
		// ping loaders and appliers
		while (loader != self->loadersInterf.end()) {
			fReplies[index] = loader->second.heartbeat.getReply(RestoreSimpleRequest());
			loader++;
			index++;
		}
		while (applier != self->appliersInterf.end()) {
			fReplies[index] = applier->second.heartbeat.getReply(RestoreSimpleRequest());
			applier++;
			index++;
		}

		fTimeout = delay(SERVER_KNOBS->FASTRESTORE_HEARTBEAT_DELAY);

		// Here we have to handle error, otherwise controller worker will fail and exit.
		try {
			wait(waitForAll(fReplies) || fTimeout);
		} catch (Error& e) {
			// This should be an ignorable error.
			TraceEvent(g_network->isSimulated() ? SevWarnAlways : SevError, "FastRestoreUpdateHeartbeatError").error(e);
		}

		// Update the most recent heart beat time for each role
		for (int i = 0; i < fReplies.size(); ++i) {
			if (!fReplies[i].isError() && fReplies[i].isReady()) {
				double currentTime = now();
				auto item = self->rolesHeartBeatTime.emplace(nodes[i], currentTime);
				item.first->second = currentTime;
			}
		}
		wait(fTimeout); // Ensure not updating heartbeat too quickly
	}
}

// Check if a restore role dies or disconnected
ACTOR static Future<Void> checkRolesLiveness(Reference<RestoreControllerData> self) {
	loop {
		wait(delay(SERVER_KNOBS->FASTRESTORE_HEARTBEAT_MAX_DELAY));
		for (auto& role : self->rolesHeartBeatTime) {
			if (now() - role.second > SERVER_KNOBS->FASTRESTORE_HEARTBEAT_MAX_DELAY) {
				TraceEvent(SevWarnAlways, "FastRestoreUnavailableRole", role.first)
				    .detail("Delta", now() - role.second)
				    .detail("LastAliveTime", role.second);
			}
		}
	}
}