From aa683c0d26b91b2182450e13149c76f09d247678 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 23 Sep 2020 15:18:59 -0700 Subject: [PATCH] FRApplier:Fix applyingDataBytes accounting at exception When exception is thrown out after txnSize is calculated but before it is accounted into applyingDataBytes, we will decrease applyingDataBytes in the error handling block incorrectly. --- fdbserver/RestoreApplier.actor.cpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index c5d45cc3cd..b6b70a9a41 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -469,14 +469,14 @@ ACTOR static Future precomputeMutationsResult(Reference return Void(); } -bool notEnoughOutstandingTxns(double targetMB, double applyingDataBytes) { +bool okToReleaseTxns(double targetMB, double applyingDataBytes) { return applyingDataBytes < targetMB * 1024 * 1024; } ACTOR static Future shouldReleaseTransaction(double* targetMB, double* applyingDataBytes, AsyncTrigger* releaseTxns) { loop { - if (notEnoughOutstandingTxns(*targetMB, *applyingDataBytes)) { + if (okToReleaseTxns(*targetMB, *applyingDataBytes)) { break; } else { wait(releaseTxns->onTrigger()); @@ -505,10 +505,12 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera state int clears = 0; state Key endKey = begin->first; state double txnSize = 0; + state double txnSizeUsed = 0; // txn size accounted in applyingDataBytes TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first); loop { try { txnSize = 0; + txnSizeUsed = 0; tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); std::map::iterator iter = begin; @@ -552,20 +554,21 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera .detail("Sets", sets) .detail("Clears", clears); tr->addWriteConflictRange(KeyRangeRef(begin->first, keyAfter(endKey))); // Reduce resolver load - *applyingDataBytes += txnSize; // Must account for applying bytes before wait for write traffic control + txnSizeUsed = txnSize; + *applyingDataBytes += txnSizeUsed; // Must account for applying bytes before wait for write traffic control wait(tr->commit()); cc->appliedTxns += 1; cc->appliedBytes += txnSize; *appliedBytes += txnSize; - *applyingDataBytes -= txnSize; - if (notEnoughOutstandingTxns(*targetMB, *applyingDataBytes)) { + *applyingDataBytes -= txnSizeUsed; + if (okToReleaseTxns(*targetMB, *applyingDataBytes)) { releaseTxnTrigger->trigger(); } break; } catch (Error& e) { cc->appliedTxnRetries += 1; wait(tr->onError(e)); - *applyingDataBytes -= txnSize; + *applyingDataBytes -= txnSizeUsed; } } return Void();