FastRestore:Filter out log mutations whose version is smaller than range mutation version

This commit is contained in:
Meng Xu 2020-04-15 13:32:52 -07:00
parent a3598a7616
commit d6c1baa784
7 changed files with 161 additions and 20 deletions

View File

@ -362,20 +362,24 @@ struct RestoreSysInfoRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 75960741;
RestoreSysInfo sysInfo;
Standalone<VectorRef<std::pair<KeyRangeRef, Version>>> rangeVersions;
ReplyPromise<RestoreCommonReply> reply;
RestoreSysInfoRequest() = default;
explicit RestoreSysInfoRequest(RestoreSysInfo sysInfo) : sysInfo(sysInfo) {}
explicit RestoreSysInfoRequest(RestoreSysInfo sysInfo,
Standalone<VectorRef<std::pair<KeyRangeRef, Version>>> rangeVersions)
: sysInfo(sysInfo), rangeVersions(rangeVersions) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, sysInfo, reply);
serializer(ar, sysInfo, rangeVersions, reply);
}
std::string toString() {
std::stringstream ss;
ss << "RestoreSysInfoRequest";
ss << "RestoreSysInfoRequest "
<< "rangeVersions.size:" << rangeVersions.size();
return ss.str();
}
};

View File

@ -112,6 +112,7 @@ public:
Val const& operator[]( const Key& k ) { return rangeContaining(k).value(); }
Ranges ranges() { return Ranges( Iterator(map.begin()), Iterator(map.lastItem()) ); }
// intersectingRanges returns [begin, end] where begin <= r.begin and end >= r.end
Ranges intersectingRanges( const Range& r ) { return Ranges(rangeContaining(r.begin), Iterator(map.lower_bound(r.end))); }
// containedRanges() will return all ranges that are fully contained by the passed range (note that a range fully contains itself)
Ranges containedRanges( const Range& r ) {

View File

@ -343,7 +343,7 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
return results;
} catch (Error& e) {
TraceEvent(SevWarn, "FileRestoreCorruptRangeFileBlock")
TraceEvent(SevError, "FileRestoreCorruptRangeFileBlock")
.error(e)
.detail("Filename", file->getFilename())
.detail("BlockOffset", offset)
@ -388,7 +388,7 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IA
return results;
} catch (Error& e) {
TraceEvent(SevWarn, "FileRestoreCorruptLogFileBlock")
TraceEvent(SevError, "FileRestoreCorruptLogFileBlock")
.error(e)
.detail("Filename", file->getFilename())
.detail("BlockOffset", offset)

View File

@ -38,6 +38,8 @@
#include "flow/actorcompiler.h" // has to be last include
#define MAX_VERSION (std::numeric_limits<int64_t>::max())
// RestoreConfig copied from FileBackupAgent.actor.cpp
// We copy RestoreConfig instead of using (and potentially changing) it in place
// to avoid conflict with the existing code.

View File

@ -38,7 +38,8 @@ typedef std::map<Standalone<StringRef>, uint32_t> SerializedMutationPartMap;
std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier);
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
const RestoreAsset& asset);
@ -126,6 +127,21 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
return Void();
}
static __inline__ bool _logMutationTooOld(KeyRangeMap<Version>* pRangeVersions, KeyRangeRef keyRange, Version v) {
auto ranges = pRangeVersions->intersectingRanges(keyRange);
Version minVersion = MAX_VERSION;
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
minVersion = std::min(minVersion, r->value());
}
return minVersion >= v;
}
static __inline__ bool logMutationTooOld(KeyRangeMap<Version>* pRangeVersions, MutationRef mutation, Version v) {
return isRangeMutation(mutation)
? _logMutationTooOld(pRangeVersions, KeyRangeRef(mutation.param1, mutation.param2), v)
: _logMutationTooOld(pRangeVersions, KeyRangeRef(singleKeyRange(mutation.param1)), v);
}
// Assume: Only update the local data if it (applierInterf) has not been set
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self) {
TraceEvent("FastRestoreLoader", self->id()).detail("HandleRestoreSysInfoRequest", self->id());
@ -138,6 +154,22 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
}
self->appliersInterf = req.sysInfo.appliers;
// Update rangeVersions
ASSERT(self->rangeVersions.size() == 1); // rangeVersions has not been set
for (auto rv = req.rangeVersions.begin(); rv != req.rangeVersions.end(); ++rv) {
self->rangeVersions.insert(rv->first, rv->second);
}
// Debug message for range version in each loader
auto ranges = self->rangeVersions.ranges();
int i = 0;
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
TraceEvent("FastRestoreLoader", self->id())
.detail("RangeIndex", i++)
.detail("RangeBegin", r->begin())
.detail("RangeEnd", r->end())
.detail("Version", r->value());
}
req.reply.send(RestoreCommonReply(self->id()));
}
@ -145,7 +177,8 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
// Parse a data block in a partitioned mutation log file and store mutations
// into "kvOpsIter" and samples into "samplesIter".
ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
NotifiedVersion* processedFileOffset, std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
KeyRangeMap<Version>* pRangeVersions, NotifiedVersion* processedFileOffset,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, Reference<IBackupContainer> bc, RestoreAsset asset) {
state Standalone<StringRef> buf = makeString(asset.len);
state Reference<IAsyncFile> file = wait(bc->readFile(asset.filename));
@ -190,6 +223,11 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
MutationRef mutation;
rd >> mutation;
// Skip mutation whose commitVesion < range kv's version
if (logMutationTooOld(pRangeVersions, mutation, msgVersion.version)) {
continue;
}
// Should this mutation be skipped?
if (mutation.param1 >= asset.range.end ||
(isRangeMutation(mutation) && mutation.param2 < asset.range.begin) ||
@ -228,7 +266,8 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
return Void();
}
ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<LoaderBatchData> batchData, UID loaderID,
ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, LoadingParam param,
Reference<LoaderBatchData> batchData, UID loaderID,
Reference<IBackupContainer> bc) {
// Temporary data structure for parsing log files into (version, <K, V, mutationType>)
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
@ -263,8 +302,8 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<LoaderBatc
} else {
// TODO: Sanity check the log file's range is overlapped with the restored version range
if (param.isPartitionedLog()) {
fileParserFutures.push_back(_parsePartitionedLogFileOnLoader(&processedFileOffset, kvOpsPerLPIter,
samplesIter, bc, subAsset));
fileParserFutures.push_back(_parsePartitionedLogFileOnLoader(
pRangeVersions, &processedFileOffset, kvOpsPerLPIter, samplesIter, bc, subAsset));
} else {
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap,
&mutationPartMap, bc, subAsset));
@ -274,7 +313,8 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<LoaderBatc
wait(waitForAll(fileParserFutures));
if (!param.isRangeFile && !param.isPartitionedLog()) {
_parseSerializedMutation(kvOpsPerLPIter, &mutationMap, samplesIter, &batchData->counters, param.asset);
_parseSerializedMutation(pRangeVersions, kvOpsPerLPIter, &mutationMap, samplesIter, &batchData->counters,
param.asset);
}
TraceEvent("FastRestoreLoaderProcessLoadingParamDone", loaderID).detail("LoadingParam", param.toString());
@ -304,7 +344,8 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
.detail("BatchIndex", req.batchIndex)
.detail("ProcessLoadParam", req.param.toString());
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
batchData->processedFileParams[req.param] = _processLoadingParam(req.param, batchData, self->id(), self->bc);
batchData->processedFileParams[req.param] =
_processLoadingParam(&self->rangeVersions, req.param, batchData, self->id(), self->bc);
isDuplicated = false;
} else {
TraceEvent("FastRestoreLoadFile", self->id())
@ -669,7 +710,8 @@ bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standal
// we may not get the entire mutation list for the version encoded_list_of_mutations:
// [mutation1][mutation2]...[mutationk], where
// a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent]
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
const RestoreAsset& asset) {
@ -709,6 +751,11 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
MutationRef mutation((MutationRef::Type)type, KeyRef(k, kLen), KeyRef(v, vLen));
// Should this mutation be skipped?
// Skip mutation whose commitVesion < range kv's version
if (logMutationTooOld(pRangeVersions, mutation, commitVersion)) {
continue;
}
if (mutation.param1 >= asset.range.end ||
(isRangeMutation(mutation) && mutation.param2 < asset.range.begin) ||
(!isRangeMutation(mutation) && mutation.param1 < asset.range.begin)) {

View File

@ -129,6 +129,8 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
std::map<int, Reference<LoaderBatchData>> batch;
std::map<int, Reference<LoaderBatchStatus>> status;
KeyRangeMap<Version> rangeVersions;
Reference<IBackupContainer> bc; // Backup container is used to read backup files
Key bcUrl; // The url used to get the bc

View File

@ -20,6 +20,7 @@
// This file implements the functions for RestoreMaster role
#include "fdbrpc/RangeMap.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/BackupAgent.actor.h"
@ -39,6 +40,8 @@ ACTOR static Future<Void> clearDB(Database cx);
ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc, std::vector<RestoreFileFR>* rangeFiles,
std::vector<RestoreFileFR>* logFiles, Database cx,
RestoreRequest request);
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
std::vector<RestoreFileFR>* pRangeFiles, Key url);
ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData> self, Database cx, RestoreRequest request);
ACTOR static Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self, Database cx);
@ -48,8 +51,8 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
ACTOR static Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker,
Reference<RestoreMasterData> masterData);
ACTOR static Future<Void> distributeRestoreSysInfo(Reference<RestoreWorkerData> masterWorker,
Reference<RestoreMasterData> masterData);
ACTOR static Future<Void> distributeRestoreSysInfo(Reference<RestoreMasterData> masterData,
KeyRangeMap<Version>* pRangeVersions);
ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequests(Database cx);
ACTOR static Future<Void> initializeVersionBatch(std::map<UID, RestoreApplierInterface> appliersInterf,
@ -79,7 +82,7 @@ ACTOR Future<Void> startRestoreMaster(Reference<RestoreWorkerData> masterWorker,
actors.add(updateHeartbeatTime(self));
actors.add(checkRolesLiveness(self));
wait(distributeRestoreSysInfo(masterWorker, self));
// wait(distributeRestoreSysInfo(masterWorker, self));
wait(startProcessRestoreRequests(self, cx));
} catch (Error& e) {
@ -148,14 +151,27 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker
return Void();
}
ACTOR Future<Void> distributeRestoreSysInfo(Reference<RestoreWorkerData> masterWorker,
Reference<RestoreMasterData> masterData) {
ACTOR Future<Void> distributeRestoreSysInfo(Reference<RestoreMasterData> masterData,
KeyRangeMap<Version>* pRangeVersions) {
ASSERT(masterData.isValid());
ASSERT(!masterData->loadersInterf.empty());
RestoreSysInfo sysInfo(masterData->appliersInterf);
// Construct serializable KeyRange versions
Standalone<VectorRef<std::pair<KeyRangeRef, Version>>> rangeVersionsVec;
auto ranges = pRangeVersions->ranges();
int i = 0;
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
rangeVersionsVec.push_back(rangeVersionsVec.arena(),
std::make_pair(KeyRangeRef(r->begin(), r->end()), r->value()));
TraceEvent(SevDebug, "DistributeRangeVersions")
.detail("RangeIndex", i++)
.detail("RangeBegin", r->begin())
.detail("RangeEnd", r->end())
.detail("RangeVersion", r->value());
}
std::vector<std::pair<UID, RestoreSysInfoRequest>> requests;
for (auto& loader : masterData->loadersInterf) {
requests.emplace_back(loader.first, RestoreSysInfoRequest(sysInfo));
requests.emplace_back(loader.first, RestoreSysInfoRequest(sysInfo, rangeVersionsVec));
}
TraceEvent("FastRestoreDistributeRestoreSysInfoToLoaders", masterData->id())
@ -233,12 +249,13 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
state std::vector<RestoreFileFR> rangeFiles;
state std::vector<RestoreFileFR> logFiles;
state std::vector<RestoreFileFR> allFiles;
state KeyRangeMap<Version> rangeVersions(MAX_VERSION, allKeys.end);
state ActorCollection actors(false);
self->initBackupContainer(request.url);
// Get all backup files' description and save them to files
Version targetVersion = wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, cx, request));
state Version targetVersion = wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, cx, request));
ASSERT(targetVersion > 0);
std::sort(rangeFiles.begin(), rangeFiles.end());
@ -247,6 +264,11 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
std::tie(f2.endVersion, f2.beginVersion, f2.fileIndex, f2.fileName);
});
// Build range versions: version of key ranges in range file
wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url));
wait(distributeRestoreSysInfo(self, &rangeVersions));
// Divide files into version batches.
self->buildVersionBatches(rangeFiles, logFiles, &self->versionBatches, targetVersion);
self->dumpVersionBatches(self->versionBatches);
@ -675,6 +697,69 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
return request.targetVersion;
}
ACTOR static Future<Void> insertRangeVersion(KeyRangeMap<Version>* pRangeVersions, RestoreFileFR* file,
Reference<IBackupContainer> bc) {
TraceEvent("FastRestoreMasterDecodeRangeVersion").detail("File", file->toString());
Reference<IAsyncFile> inFile = wait(bc->readFile(file->fileName));
state bool beginKeySet = false;
Key beginKey;
Key endKey;
for (int64_t j = 0; j < file->fileSize; j += file->blockSize) {
int64_t len = std::min<int64_t>(file->blockSize, file->fileSize - j);
Standalone<VectorRef<KeyValueRef>> blockData = wait(parallelFileRestore::decodeRangeFileBlock(inFile, j, len));
if (!beginKeySet) {
beginKey = blockData.front().key;
}
endKey = blockData.back().key;
}
// First and last key are the range for this file: endKey is exclusive
KeyRange fileRange = KeyRangeRef(beginKey.contents(), endKey.contents());
TraceEvent("FastRestoreMasterInsertRangeVersion")
.detail("DecodedRangeFile", file->fileName)
.detail("KeyRange", fileRange)
.detail("Version", file->version)
.detail("DataSize", blockData.contents().size());
// Update version for pRangeVersions's ranges in fileRange
auto ranges = pRangeVersions->modify(fileRange);
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
r->value() = r->value() == MAX_VERSION ? file->version : std::max(r->value(), file->version);
}
// Dump the new key ranges
ranges = pRangeVersions->ranges();
int i = 0;
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
TraceEvent(SevDebug, "RangeVersionsAfterUpdate")
.detail("File", file->toString())
.detail("FileRange", fileRange.toString())
.detail("FileVersion", file->version)
.detail("RangeIndex", i++)
.detail("RangeBegin", r->begin())
.detail("RangeEnd", r->end())
.detail("RangeVersion", r->value());
}
return Void();
}
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
std::vector<RestoreFileFR>* pRangeFiles, Key url) {
Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
// Key ranges not in range files are empty;
// Assign highest version to avoid applying any mutation in these ranges
state int fileIndex = 0;
state std::vector<Future<Void>> fInsertRangeVersions;
for (; fileIndex < pRangeFiles->size(); ++fileIndex) {
fInsertRangeVersions.push_back(insertRangeVersion(pRangeVersions, &pRangeFiles->at(fileIndex), bc));
}
wait(waitForAll(fInsertRangeVersions));
return Void();
}
ACTOR static Future<Void> clearDB(Database cx) {
wait(runRYWTransaction(cx, [](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);