FastRestoreApplier:Add delay to avoid overwelming DB

This commit is contained in:
Meng Xu 2020-05-05 08:46:18 -07:00
parent 67b9e0b29a
commit 62de02fb2c

View File

@ -165,11 +165,16 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
}
// Clear all ranges in input ranges
ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRangeRef>> ranges, Database cx,
UID applierID, int batchIndex) {
ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRangeRef>> ranges, double delayTime,
Database cx, UID applierID, int batchIndex) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int count = 0;
state double size = 0;
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
TraceEvent("FastRestoreApplierClearRangeMutationsStart", applierID)
.detail("BatchIndex", batchIndex)
.detail("Ranges", ranges.size())
.detail("DelayTime", delayTime);
loop {
try {
tr->reset();
@ -198,15 +203,17 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
// Get keys in incompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys
ACTOR static Future<Void> getAndComputeStagingKeys(
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, Database cx, UID applierID,
int batchIndex) {
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, double delayTime, Database cx,
UID applierID, int batchIndex) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state std::vector<Future<Optional<Value>>> fValues;
state int retries = 0;
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
.detail("BatchIndex", batchIndex)
.detail("GetKeys", incompleteStagingKeys.size());
.detail("GetKeys", incompleteStagingKeys.size())
.detail("DelayTime", delayTime);
loop {
try {
tr->reset();
@ -275,6 +282,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
state std::vector<Future<Void>> fClearRanges;
Standalone<VectorRef<KeyRangeRef>> clearRanges;
double curTxnSize = 0;
double delayTime = 0;
for (auto& rangeMutation : batchData->stagingKeyRanges) {
KeyRangeRef range(rangeMutation.mutation.param1, rangeMutation.mutation.param2);
debugFRMutation("FastRestoreApplierPrecomputeMutationsResultClearRange", rangeMutation.version.version,
@ -282,13 +290,14 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
clearRanges.push_back_deep(clearRanges.arena(), range);
curTxnSize += range.expectedSize();
if (curTxnSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx, applierID, batchIndex));
fClearRanges.push_back(applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex));
delayTime += 0.1;
clearRanges = Standalone<VectorRef<KeyRangeRef>>();
curTxnSize = 0;
}
}
if (curTxnSize > 0) {
fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx, applierID, batchIndex));
fClearRanges.push_back(applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex));
}
// Apply range mutations (i.e., clearRange) to stagingKeyRanges
@ -331,6 +340,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys;
std::map<Key, StagingKey>::iterator stagingKeyIter = batchData->stagingKeys.begin();
int numKeysInBatch = 0;
double delayTime = 0; // Start transactions at different time to avoid overwelming FDB.
for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) {
if (!stagingKeyIter->second.hasBaseValue()) {
incompleteStagingKeys.emplace(stagingKeyIter->first, stagingKeyIter);
@ -338,13 +348,16 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
numKeysInBatch++;
}
if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) {
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, cx, applierID, batchIndex));
fGetAndComputeKeys.push_back(
getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, batchIndex));
delayTime += 0.1;
numKeysInBatch = 0;
incompleteStagingKeys.clear();
}
}
if (numKeysInBatch > 0) {
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, cx, applierID, batchIndex));
fGetAndComputeKeys.push_back(
getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, batchIndex));
}
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)