FastRestore:Handle retriable blob error

This commit is contained in:
Meng Xu 2020-09-09 07:23:18 -07:00
parent 20733ae1eb
commit 83d1350d8d
2 changed files with 98 additions and 17 deletions

View File

@ -307,6 +307,12 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
if (ongoingReplies[j].isReady()) { if (ongoingReplies[j].isReady()) {
std::get<2>(replyDurations[ongoingRepliesIndex[j]]) = now(); std::get<2>(replyDurations[ongoingRepliesIndex[j]]) = now();
--oustandingReplies; --oustandingReplies;
} else if (ongoingReplies[j].isError()) {
// When this happens,
// the above assertion ASSERT(ongoingReplies.size() == oustandingReplies) will fail
TraceEvent(SevError, "FastRestoreGetBatchRepliesReplyError")
.detail("OngoingReplyIndex", j)
.detail("FutureError", ongoingReplies[j].getError().what());
} }
} }
} }

View File

@ -57,6 +57,9 @@ ACTOR Future<Void> sendMutationsToApplier(
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset, ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap, SerializedMutationListMap* mutationMap,
Reference<IBackupContainer> bc, RestoreAsset asset); Reference<IBackupContainer> bc, RestoreAsset asset);
ACTOR static Future<Void> parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
Reference<IBackupContainer> bc, RestoreAsset asset);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader( ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter, std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc, std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
@ -472,6 +475,36 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
return Void(); return Void();
} }
// wrapper of _parsePartitionedLogFileOnLoader to retry on blob error
ACTOR static Future<Void> parsePartitionedLogFileOnLoader(
KeyRangeMap<Version>* pRangeVersions, NotifiedVersion* processedFileOffset,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
Reference<IBackupContainer> bc, RestoreAsset asset) {
state int readFileRetries = 0;
loop {
try {
wait(_parsePartitionedLogFileOnLoader(pRangeVersions, processedFileOffset, kvOpsIter, samplesIter, cc, bc,
asset));
break;
} catch (Error& e) {
if (e.code() == error_code_restore_bad_read || e.code() == error_code_restore_unsupported_file_version ||
e.code() == error_code_restore_corrupted_data_padding) { // no retriable error
TraceEvent(SevError, "FileRestoreCorruptedPartitionedLogFileBlock").error(e);
throw;
} else if (e.code() == error_code_http_request_failed || e.code() == error_code_connection_failed ||
e.code() == error_code_timed_out || e.code() == error_code_lookup_failed) {
// blob http request failure, retry
TraceEvent(SevWarnAlways, "FastRestoreDecodedPartitionedLogFileConnectionFailure")
.detail("Retries", ++readFileRetries)
.error(e);
wait(delayJittered(0.1));
}
}
}
return Void();
}
ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, LoadingParam param, ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, LoadingParam param,
Reference<LoaderBatchData> batchData, UID loaderID, Reference<LoaderBatchData> batchData, UID loaderID,
Reference<IBackupContainer> bc) { Reference<IBackupContainer> bc) {
@ -508,12 +541,12 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
} else { } else {
// TODO: Sanity check the log file's range is overlapped with the restored version range // TODO: Sanity check the log file's range is overlapped with the restored version range
if (param.isPartitionedLog()) { if (param.isPartitionedLog()) {
fileParserFutures.push_back(_parsePartitionedLogFileOnLoader(pRangeVersions, &processedFileOffset, fileParserFutures.push_back(parsePartitionedLogFileOnLoader(pRangeVersions, &processedFileOffset,
kvOpsPerLPIter, samplesIter, kvOpsPerLPIter, samplesIter,
&batchData->counters, bc, subAsset)); &batchData->counters, bc, subAsset));
} else { } else {
fileParserFutures.push_back( fileParserFutures.push_back(
_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap, bc, subAsset)); parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap, bc, subAsset));
} }
} }
} }
@ -1122,21 +1155,36 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// Sanity check the range file is within the restored version range // Sanity check the range file is within the restored version range
ASSERT_WE_THINK(asset.isInVersionRange(version)); ASSERT_WE_THINK(asset.isInVersionRange(version));
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
Reference<IAsyncFile> inFile = wait(bc->readFile(asset.filename));
state Standalone<VectorRef<KeyValueRef>> blockData; state Standalone<VectorRef<KeyValueRef>> blockData;
// should retry here // should retry here
try { state int readFileRetries = 0;
Standalone<VectorRef<KeyValueRef>> kvs = loop {
wait(fileBackup::decodeRangeFileBlock(inFile, asset.offset, asset.len)); try {
TraceEvent("FastRestoreLoaderDecodedRangeFile") // The set of key value version is rangeFile.version. the key-value set in the same range file has the same
.detail("BatchIndex", asset.batchIndex) // version
.detail("Filename", asset.filename) Reference<IAsyncFile> inFile = wait(bc->readFile(asset.filename));
.detail("DataSize", kvs.contents().size()); Standalone<VectorRef<KeyValueRef>> kvs =
blockData = kvs; wait(fileBackup::decodeRangeFileBlock(inFile, asset.offset, asset.len));
} catch (Error& e) { TraceEvent("FastRestoreLoaderDecodedRangeFile")
TraceEvent(SevError, "FileRestoreCorruptRangeFileBlock").error(e); .detail("BatchIndex", asset.batchIndex)
throw; .detail("Filename", asset.filename)
.detail("DataSize", kvs.contents().size());
blockData = kvs;
break;
} catch (Error& e) {
if (e.code() == error_code_restore_bad_read || e.code() == error_code_restore_unsupported_file_version ||
e.code() == error_code_restore_corrupted_data_padding) { // no retriable error
TraceEvent(SevError, "FileRestoreCorruptedRangeFileBlock").error(e);
throw;
} else if (e.code() == error_code_http_request_failed || e.code() == error_code_connection_failed ||
e.code() == error_code_timed_out || e.code() == error_code_lookup_failed) {
// blob http request failure, retry
TraceEvent(SevWarnAlways, "FastRestoreDecodedRangeFileConnectionFailure")
.detail("Retries", ++readFileRetries)
.error(e);
wait(delayJittered(0.1));
}
}
} }
// First and last key are the range for this file // First and last key are the range for this file
@ -1234,6 +1282,33 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
return Void(); return Void();
} }
// retry on _parseLogFileToMutationsOnLoader
ACTOR static Future<Void> parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* pMutationMap,
Reference<IBackupContainer> bc, RestoreAsset asset) {
state int readFileRetries = 0;
loop {
try {
wait(_parseLogFileToMutationsOnLoader(pProcessedFileOffset, pMutationMap, bc, asset));
break;
} catch (Error& e) {
if (e.code() == error_code_restore_bad_read || e.code() == error_code_restore_unsupported_file_version ||
e.code() == error_code_restore_corrupted_data_padding) { // non retriable error
TraceEvent(SevError, "FileRestoreCorruptedLogFileBlock").error(e);
throw;
} else if (e.code() == error_code_http_request_failed || e.code() == error_code_connection_failed ||
e.code() == error_code_timed_out || e.code() == error_code_lookup_failed) {
// blob http request failure, retry
TraceEvent(SevWarnAlways, "FastRestoreDecodedLogFileConnectionFailure")
.detail("Retries", ++readFileRetries)
.error(e);
wait(delayJittered(0.1));
}
}
}
return Void();
}
// Return applier IDs that are used to apply key-values // Return applier IDs that are used to apply key-values
std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier) { std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier) {
std::vector<UID> applierIDs; std::vector<UID> applierIDs;