FastRestoreApplier:Add sanity check and trace for debugging stall

This commit is contained in:
Meng Xu 2020-05-04 22:32:57 -07:00
parent 2fec56e7e2
commit 67b9e0b29a

View File

@ -165,8 +165,11 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
}
// Clear all ranges in input ranges
ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRangeRef>> ranges, Database cx) {
ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRangeRef>> ranges, Database cx,
UID applierID, int batchIndex) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int count = 0;
state double size = 0;
loop {
try {
tr->reset();
@ -180,6 +183,13 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
wait(tr->commit());
break;
} catch (Error& e) {
count++;
if (count > 100) {
TraceEvent(SevWarnAlways, "RestoreApplierApplyClearRangeMutationsStuck", applierID)
.detail("BatchIndex", batchIndex)
.detail("ClearRanges", ranges.size())
.error(e);
}
wait(tr->onError(e));
}
}
@ -263,33 +273,32 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
.detail("Step", "Applying clear range mutations to DB")
.detail("ClearRanges", batchData->stagingKeyRanges.size());
state std::vector<Future<Void>> fClearRanges;
std::vector<Standalone<VectorRef<KeyRangeRef>>> clearBuf;
clearBuf.push_back(Standalone<VectorRef<KeyRangeRef>>());
Standalone<VectorRef<KeyRangeRef>> clearRanges = clearBuf.back();
Standalone<VectorRef<KeyRangeRef>> clearRanges;
double curTxnSize = 0;
for (auto& rangeMutation : batchData->stagingKeyRanges) {
KeyRangeRef range(rangeMutation.mutation.param1, rangeMutation.mutation.param2);
debugFRMutation("FastRestoreApplierPrecomputeMutationsResultClearRange", rangeMutation.version.version,
MutationRef(MutationRef::ClearRange, range.begin, range.end));
clearRanges.push_back(clearRanges.arena(), range);
clearRanges.push_back_deep(clearRanges.arena(), range);
curTxnSize += range.expectedSize();
if (curTxnSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx));
clearBuf.push_back(Standalone<VectorRef<KeyRangeRef>>());
clearRanges = clearBuf.back();
fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx, applierID, batchIndex));
clearRanges = Standalone<VectorRef<KeyRangeRef>>();
curTxnSize = 0;
}
}
if (curTxnSize > 0) {
fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx));
fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx, applierID, batchIndex));
}
// Apply range mutations (i.e., clearRange) to stagingKeyRanges
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
.detail("BatchIndex", batchIndex)
.detail("Step", "Applying clear range mutations to staging keys")
.detail("ClearRanges", batchData->stagingKeyRanges.size());
.detail("ClearRanges", batchData->stagingKeyRanges.size())
.detail("FutureClearRanges", fClearRanges.size());
for (auto& rangeMutation : batchData->stagingKeyRanges) {
ASSERT(rangeMutation.mutation.param1 <= rangeMutation.mutation.param2);
std::map<Key, StagingKey>::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1);
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param2);
while (lb != ub) {
@ -306,6 +315,10 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
lb++;
}
}
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
.detail("BatchIndex", batchIndex)
.detail("Step", "Wait on applying clear range mutations to DB")
.detail("FutureClearRanges", fClearRanges.size());
wait(waitForAll(fClearRanges));
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)