Start batches in reverse order for testings and code cleanup

This commit is contained in:
Meng Xu 2020-01-17 11:06:07 -08:00
parent 4ac92d223b
commit 022783b449
7 changed files with 48 additions and 47 deletions

View File

@ -65,7 +65,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
}
when(RestoreVersionBatchRequest req = waitNext(applierInterf.initVersionBatch.getFuture())) {
requestTypeStr = "initVersionBatch";
wait(handleInitVersionBatchRequest(req, self));
actors.add(handleInitVersionBatchRequest(req, self));
}
when(RestoreFinishRequest req = waitNext(applierInterf.finishRestore.getFuture())) {
requestTypeStr = "finishRestore";
@ -450,24 +450,26 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
// Ensure batch i is applied before batch (i+1)
wait(self->finishedBatch.whenAtLeast(req.batchIndex-1));
Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
ASSERT(batchData.isValid());
TraceEvent("FastRestore")
.detail("ApplierApplyToDB", self->id())
.detail("VersionBatchIndex", req.batchIndex)
.detail("DBApplierPresent", batchData->dbApplier.present());
if (!batchData->dbApplier.present()) {
batchData->dbApplier = applyToDB(self->id(), req.batchIndex, batchData, cx);
}
ASSERT(batchData->dbApplier.present());
wait(batchData->dbApplier.get());
if (self->finishedBatch.get() == req.batchIndex-1) {
Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
ASSERT(batchData.isValid());
TraceEvent("FastRestore")
.detail("ApplierApplyToDB", self->id())
.detail("VersionBatchIndex", req.batchIndex)
.detail("DBApplierPresent", batchData->dbApplier.present());
if (!batchData->dbApplier.present()) {
batchData->dbApplier = applyToDB(self->id(), req.batchIndex, batchData, cx);
}
ASSERT(batchData->dbApplier.present());
wait(batchData->dbApplier.get());
// Multiple actor invokation can wait on req.batchIndex-1;
// Avoid setting finishedBatch when finishedBatch > req.batchIndex
self->finishedBatch.set(req.batchIndex);
if (self->finishedBatch.get() == req.batchIndex-1) {
self->finishedBatch.set(req.batchIndex);
}
}
req.reply.send(RestoreCommonReply(self->id()));

View File

@ -86,7 +86,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
}
when(RestoreVersionBatchRequest req = waitNext(loaderInterf.initVersionBatch.getFuture())) {
requestTypeStr = "initVersionBatch";
wait(handleInitVersionBatchRequest(req, self));
actors.add(handleInitVersionBatchRequest(req, self));
}
when(RestoreFinishRequest req = waitNext(loaderInterf.finishRestore.getFuture())) {
requestTypeStr = "finishRestore";
@ -330,6 +330,7 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
// TODO: Add a unit test for this function
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
TraceEvent(SevWarn, "FastRestoreSplitMutation").detail("Mutation", m.toString());
// mvector[i] should be mapped to nodeID[i]
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());

View File

@ -237,7 +237,6 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
state std::vector<RestoreFileFR> rangeFiles;
state std::vector<RestoreFileFR> logFiles;
state std::vector<RestoreFileFR> allFiles;
state std::map<Version, VersionBatch>::iterator versionBatch = self->versionBatches.begin();
self->initBackupContainer(request.url);
@ -253,15 +252,31 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
self->buildVersionBatches(rangeFiles, logFiles, &self->versionBatches); // Divide files into version batches
self->dumpVersionBatches(self->versionBatches);
ASSERT(self->batchIndex == 1); // versionBatchIndex starts at 1 because NotifiedVersion starts at 0
std::vector<Future<Void>> fBatches;
// TODO: Control how many batches can be processed in parallel. Avoid dead lock due to OOM on loaders
for (versionBatch = self->versionBatches.begin(); versionBatch != self->versionBatches.end(); versionBatch++) {
self->batch[self->batchIndex] = Reference<MasterBatchData>(new MasterBatchData());
fBatches.push_back(
distributeWorkloadPerVersionBatch(self, self->batchIndex, cx, request, versionBatch->second));
// wait(distributeWorkloadPerVersionBatch(self, self->batchIndex, cx, request, versionBatch->second));
self->batchIndex++;
int batchIndex = 1; // versionBatchIndex starts at 1 because NotifiedVersion starts at 0
if (!g_network->isSimulated() || deterministicRandom()->random01() > 0.5) {
TraceEvent("FastRestoreMasterDispatchVersionBatches").detail("VersionBatchStart", batchIndex);
// TODO: Control how many batches can be processed in parallel. Avoid dead lock due to OOM on loaders
for (std::map<Version, VersionBatch>::iterator versionBatch = self->versionBatches.begin(); versionBatch != self->versionBatches.end(); versionBatch++) {
self->batch[batchIndex] = Reference<MasterBatchData>(new MasterBatchData());
fBatches.push_back(
distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, versionBatch->second));
// wait(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, versionBatch->second));
batchIndex++;
}
TraceEvent("FastRestoreMasterDispatchVersionBatches").detail("VersionBatchEnd", batchIndex);
} else {
batchIndex = self->versionBatches.size();
TraceEvent("FastRestoreMasterDispatchVersionBatches").detail("VersionBatchStart", batchIndex);
for (std::map<Version, VersionBatch>::reverse_iterator versionBatch = self->versionBatches.rbegin(); versionBatch != self->versionBatches.rend(); versionBatch++) {
self->batch[batchIndex] = Reference<MasterBatchData>(new MasterBatchData());
fBatches.push_back(
distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, versionBatch->second));
// wait(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, versionBatch->second));
batchIndex--;
}
TraceEvent("FastRestoreMasterDispatchVersionBatches").detail("VersionBatchEnd", batchIndex);
ASSERT(batchIndex == 0);
}
wait(waitForAll(fBatches));
@ -293,8 +308,6 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<MasterBatchData> batchDat
std::map<UID, RestoreLoaderInterface>::iterator loader = loadersInterf.begin();
for (auto& file : *files) {
// NOTE: Cannot skip empty files because empty files, e.g., log file, still need to generate dummy mutation to
// drive applier's NotifiedVersion.
if (loader == loadersInterf.end()) {
loader = loadersInterf.begin();
}
@ -533,7 +546,7 @@ ACTOR static Future<Void> clearDB(Database cx) {
ACTOR static Future<Void> initializeVersionBatch(std::map<UID, RestoreApplierInterface> appliersInterf,
std::map<UID, RestoreLoaderInterface> loadersInterf, int batchIndex) {
TraceEvent("FastRestoreInitVersionBatch").detail("BatchIndex", batchIndex).detail("Appliers", appliersInterf.size()).detail("Loaders", loadersInterf.size());
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requestsToAppliers;
for (auto& applier : appliersInterf) {
requestsToAppliers.push_back(std::make_pair(applier.first, RestoreVersionBatchRequest(batchIndex)));

View File

@ -106,18 +106,9 @@ struct MasterBatchData : public ReferenceCounted<MasterBatchData> {
struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMasterData> {
std::map<Version, VersionBatch> versionBatches; // key is the beginVersion of the version batch
int batchIndex; // The largest index of in-progress version batchs
Reference<IBackupContainer> bc; // Backup container is used to read backup files
Key bcUrl; // The url used to get the bc
// // rangeToApplier is in master and loader node. Loader uses this to determine which applier a mutation should be
// sent.
// // KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
// std::map<Key, UID> rangeToApplier;
// IndexedSet<Key, int64_t> samples; // sample of range and log files
// double samplesSize; // sum of the metric of all samples
std::map<int, Reference<MasterBatchData>> batch;
void addref() { return ReferenceCounted<RestoreMasterData>::addref(); }
@ -126,7 +117,6 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
RestoreMasterData() {
role = RestoreRole::Master;
nodeID = UID();
batchIndex = 1; // starts with 1 because batchId (NotifiedVersion) in loaders and appliers start with 0
}
~RestoreMasterData() = default;
@ -141,18 +131,17 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
void resetPerRestoreRequest() {
TraceEvent("FastRestoreMasterReset").detail("OldVersionBatches", versionBatches.size());
versionBatches.clear();
batchIndex = 1;
batch.clear();
}
std::string describeNode() {
std::stringstream ss;
ss << "Master versionBatch:" << batchIndex;
ss << "Master";
return ss.str();
}
void dumpVersionBatches(const std::map<Version, VersionBatch>& versionBatches) {
int i = 0;
int i = 1;
for (auto& vb : versionBatches) {
TraceEvent("FastRestoreVersionBatches")
.detail("BatchIndex", i)

View File

@ -53,6 +53,7 @@ void handleFinishRestoreRequest(const RestoreFinishRequest& req, Reference<Resto
}
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self) {
TraceEvent("FastRestoreHandleInitVersionBatch", self->id()).detail("Role", getRoleStr(self->role)).detail("BatchIndex", req.batchIndex).detail("VersionBatchId", self->versionBatchId.get());
// batchId is continuous. (req.batchIndex-1) is the id of the just finished batch.
wait(self->versionBatchId.whenAtLeast(req.batchIndex - 1));

View File

@ -113,14 +113,11 @@ public:
std::map<UID, RestoreLoaderInterface> loadersInterf; // UID: loaderInterf's id
std::map<UID, RestoreApplierInterface> appliersInterf; // UID: applierInterf's id
RestoreApplierInterface masterApplierInterf; // TODO: Delete
NotifiedVersion versionBatchId; // Continuously increase for each versionBatch
bool versionBatchStart = false;
uint32_t inProgressFlag = 0;
RestoreRoleData() : role(RestoreRole::Invalid){};
virtual ~RestoreRoleData() {}

View File

@ -52,8 +52,6 @@ struct RestoreWorkerData : NonCopyable, public ReferenceCounted<RestoreWorkerDa
Optional<RestoreLoaderInterface> loaderInterf;
Optional<RestoreApplierInterface> applierInterf;
uint32_t inProgressFlag = 0; // To avoid race between duplicate message delivery that invokes the same actor multiple times
UID id() const { return workerID; };
RestoreWorkerData() = default;