Catch initial flush error and avoid crashing blob manager (#10836)

This commit is contained in:
Hui Liu 2023-08-25 20:32:20 -07:00 committed by GitHub
parent f43b20e15c
commit 30d4f07395
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -5675,6 +5675,45 @@ ACTOR Future<Void> updateLastFlushVersion(Database db, Version flushVersion) {
return Void();
}
// Flush a blob range and retry for non-fatal errors
ACTOR Future<Void> tryFlushRange(Reference<BlobManagerData> bmData, KeyRange range, Version logEndVersion) {
state int retryCount = 0;
loop {
try {
FlushGranuleRequest req(bmData->epoch, range, logEndVersion, false);
wait(success(doBlobGranuleRequests(bmData->db, range, req, &BlobWorkerInterface::flushGranuleRequest)));
return Void();
} catch (Error& e) {
if (e.code() != error_code_blob_granule_transaction_too_old) {
throw; // terminate for unretryable error
}
// check if the range is blobified and then decide retry or skip.
// it may take long time to flush the whole key range and some ranges may have been unblobified or purged.
// so we try to check that first when seeing non-fatal errors
bool knownRange = false;
for (auto& r : bmData->knownBlobRanges.intersectingRanges(range)) {
if (r.value()) {
knownRange = true;
break;
}
}
if (knownRange) {
TraceEvent(retryCount > 100 ? SevError : SevInfo, "BlobGranulesFlushRetry")
.detail("Range", range)
.detail("Version", logEndVersion);
CODE_PROBE(true, "Retry blob granule flush", probe::decoration::rare);
wait(delayJittered(1));
++retryCount;
} else {
TraceEvent("BlobGranulesFlushSkip").detail("Range", range).detail("Version", logEndVersion);
CODE_PROBE(true, "Skip blob granule flush", probe::decoration::rare);
return Void();
}
}
}
}
// Try to flush blob granules. Return the flushed version if it's successful.
ACTOR Future<Version> maybeFlushGranules(Reference<BlobManagerData> bmData) {
state BlobGranuleBackupConfig config;
@ -5708,9 +5747,7 @@ ACTOR Future<Version> maybeFlushGranules(Reference<BlobManagerData> bmData) {
TraceEvent("FlushingBlobGranules").detail("Ranges", flushRanges.size());
state std::vector<Future<Void>> futures;
for (auto& range : flushRanges) {
FlushGranuleRequest req(bmData->epoch, range, logEndVersion, false);
Future<Void> future =
success(doBlobGranuleRequests(bmData->db, range, req, &BlobWorkerInterface::flushGranuleRequest));
Future<Void> future = tryFlushRange(bmData, range, logEndVersion);
futures.push_back(future);
if (futures.size() > SERVER_KNOBS->BLOB_GRANULES_FLUSH_BATCH_SIZE) {
wait(waitForAll(futures));