RestoreApplier:getAndComputeStagingKeys:retry for keys that exist in DB

Test shows that we cannot just skip the key that exist in DB but has
future_version error.
This commit is contained in:
Meng Xu 2020-06-03 21:17:27 -07:00
parent 87a557dcb4
commit 633587a95a

View File

@ -216,7 +216,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, double delayTime, Database cx,
UID applierID, int batchIndex) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state std::vector<Future<Optional<Value>>> fValues;
state std::vector<Future<Optional<Value>>> fValues(incompleteStagingKeys.size(), Never());
state int retries = 0;
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
@ -226,17 +226,20 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
.detail("DelayTime", delayTime);
state std::set<int> keyNotFounds;
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
for (auto& key : incompleteStagingKeys) {
fValues.push_back(tr->get(key.first));
}
state int i = 0;
state bool hasError = false;
loop {
hasError = false;
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
i = 0;
for (auto& key : incompleteStagingKeys) {
if (!fValues[i].isReady() || !keyNotFounds.count(i)) {
fValues[i] = tr->get(key.first);
}
++i;
}
for (i = 0; i < incompleteStagingKeys.size(); ++i) {
try {
if (keyNotFounds.count(i)) {
@ -244,13 +247,13 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
}
wait(success(fValues[i]));
} catch (Error& e) {
if (e.code() == error_code_key_not_found || e.code() == error_code_transaction_too_old ||
e.code() == error_code_future_version) {
if (e.code() == error_code_key_not_found) { // e.code() == error_code_transaction_too_old || e.code() ==
// error_code_future_version
keyNotFounds.insert(i);
} else {
hasError = true;
}
if (retries > 20) {
if (retries > incompleteStagingKeys.size()) {
TraceEvent(SevError, "GetAndComputeStagingKeys", applierID)
.detail("BatchIndex", batchIndex)
.detail("KeyIndex", i)