RestoreApplier:Costmic change based on review

This commit is contained in:
Meng Xu 2020-06-06 21:17:57 -07:00
parent 8044368d90
commit 94be3afcf8

View File

@ -220,15 +220,15 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state std::vector<Future<Optional<Value>>> fValues(incompleteStagingKeys.size(), Never());
state int retries = 0;
// state UID randomID = deterministicRandom()->randomUniqueID();
state UID randomID = deterministicRandom()->randomUniqueID();
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
//.detail("RandomUID", randomID)
.detail("RandomUID", randomID)
.detail("BatchIndex", batchIndex)
.detail("GetKeys", incompleteStagingKeys.size())
.detail("DelayTime", delayTime);
state std::set<int> keyNotFounds;
state std::set<int> keysNotFound;
state int i = 0;
state bool hasError = false;
@ -240,13 +240,13 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
i = 0;
for (auto& key : incompleteStagingKeys) {
if (!fValues[i].isReady() || !keyNotFounds.count(i)) {
if (!fValues[i].isReady() || !keysNotFound.count(i)) {
fValues[i] = tr->get(key.first);
}
++i;
}
for (i = 0; i < incompleteStagingKeys.size(); ++i) {
if (keyNotFounds.count(i)) {
if (keysNotFound.count(i)) {
continue;
}
wait(success(fValues[i])); // NOTE: This may be waiting for ever!
@ -254,7 +254,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
} catch (Error& e) {
if (e.code() == error_code_key_not_found) { // e.code() == error_code_transaction_too_old || e.code() ==
// error_code_future_version
keyNotFounds.insert(i);
keysNotFound.insert(i);
} else {
hasError = true;
}
@ -276,7 +276,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
ASSERT(fValues.size() == incompleteStagingKeys.size());
int i = 0;
for (auto& key : incompleteStagingKeys) {
if (keyNotFounds.count(i) || (!fValues[i].get().present())) { // Key not exist in DB
if (keysNotFound.count(i) || (!fValues[i].get().present())) { // Key not exist in DB
// if condition: fValues[i].Valid() && fValues[i].isReady() && !fValues[i].isError() &&
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
.detail("BatchIndex", batchIndex)
@ -301,7 +301,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
}
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
//.detail("RandomUID", randomID)
.detail("RandomUID", randomID)
.detail("BatchIndex", batchIndex)
.detail("GetKeys", incompleteStagingKeys.size())
.detail("DelayTime", delayTime);
@ -392,7 +392,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
incompleteStagingKeys.clear();
}
}
if (numKeysInBatch >= 1) {
if (numKeysInBatch > 0) {
fGetAndComputeKeys.push_back(
getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, batchIndex));
}