FRApplier:Fix applyingDataBytes accounting at exception

When exception is thrown out after txnSize is calculated but before
it is accounted into applyingDataBytes, we will decrease applyingDataBytes in the
error handling block incorrectly.
This commit is contained in:
Meng Xu 2020-09-23 15:18:59 -07:00
parent a4aad591fd
commit aa683c0d26

View File

@ -469,14 +469,14 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
return Void(); return Void();
} }
bool notEnoughOutstandingTxns(double targetMB, double applyingDataBytes) { bool okToReleaseTxns(double targetMB, double applyingDataBytes) {
return applyingDataBytes < targetMB * 1024 * 1024; return applyingDataBytes < targetMB * 1024 * 1024;
} }
ACTOR static Future<Void> shouldReleaseTransaction(double* targetMB, double* applyingDataBytes, ACTOR static Future<Void> shouldReleaseTransaction(double* targetMB, double* applyingDataBytes,
AsyncTrigger* releaseTxns) { AsyncTrigger* releaseTxns) {
loop { loop {
if (notEnoughOutstandingTxns(*targetMB, *applyingDataBytes)) { if (okToReleaseTxns(*targetMB, *applyingDataBytes)) {
break; break;
} else { } else {
wait(releaseTxns->onTrigger()); wait(releaseTxns->onTrigger());
@ -505,10 +505,12 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
state int clears = 0; state int clears = 0;
state Key endKey = begin->first; state Key endKey = begin->first;
state double txnSize = 0; state double txnSize = 0;
state double txnSizeUsed = 0; // txn size accounted in applyingDataBytes
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first); TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first);
loop { loop {
try { try {
txnSize = 0; txnSize = 0;
txnSizeUsed = 0;
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
std::map<Key, StagingKey>::iterator iter = begin; std::map<Key, StagingKey>::iterator iter = begin;
@ -552,20 +554,21 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
.detail("Sets", sets) .detail("Sets", sets)
.detail("Clears", clears); .detail("Clears", clears);
tr->addWriteConflictRange(KeyRangeRef(begin->first, keyAfter(endKey))); // Reduce resolver load tr->addWriteConflictRange(KeyRangeRef(begin->first, keyAfter(endKey))); // Reduce resolver load
*applyingDataBytes += txnSize; // Must account for applying bytes before wait for write traffic control txnSizeUsed = txnSize;
*applyingDataBytes += txnSizeUsed; // Must account for applying bytes before wait for write traffic control
wait(tr->commit()); wait(tr->commit());
cc->appliedTxns += 1; cc->appliedTxns += 1;
cc->appliedBytes += txnSize; cc->appliedBytes += txnSize;
*appliedBytes += txnSize; *appliedBytes += txnSize;
*applyingDataBytes -= txnSize; *applyingDataBytes -= txnSizeUsed;
if (notEnoughOutstandingTxns(*targetMB, *applyingDataBytes)) { if (okToReleaseTxns(*targetMB, *applyingDataBytes)) {
releaseTxnTrigger->trigger(); releaseTxnTrigger->trigger();
} }
break; break;
} catch (Error& e) { } catch (Error& e) {
cc->appliedTxnRetries += 1; cc->appliedTxnRetries += 1;
wait(tr->onError(e)); wait(tr->onError(e));
*applyingDataBytes -= txnSize; *applyingDataBytes -= txnSizeUsed;
} }
} }
return Void(); return Void();