Applier:getAndComputeStagingKeys:reset txn at first error

When tr->onError() is ready, the txn state has been reset.
We cannot wait on the get() future from the txn because its state has been deleted.
If we do that, it will throw txn_cancelled error, which will be throw all the way
up to the RestoreApplier main loop.

The batchData->dbApplier, which is assigned by writeMutationsToDB(self->id(), req.batchIndex, batchData, cx),
will become ready but isError(). This will make all handleApplyToDBRequest throw error silently.
This commit is contained in:
Meng Xu 2020-06-05 16:40:19 -07:00
parent e9af22085b
commit ffe949b04d

View File

@ -83,6 +83,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
updateProcessStats(self);
updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
}
when(wait(actors.getResult())) {}
when(wait(exitRole)) {
TraceEvent("RestoreApplierCoreExitRole", self->id());
break;
@ -92,6 +93,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
TraceEvent(SevWarn, "FastRestoreApplierError", self->id())
.detail("RequestType", requestTypeStr)
.error(e, true);
actors.clear(false);
break;
}
}
@ -232,38 +234,39 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
state bool hasError = false;
loop {
hasError = false;
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
i = 0;
for (auto& key : incompleteStagingKeys) {
if (!fValues[i].isReady() || !keyNotFounds.count(i)) {
fValues[i] = tr->get(key.first);
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
i = 0;
for (auto& key : incompleteStagingKeys) {
if (!fValues[i].isReady() || !keyNotFounds.count(i)) {
fValues[i] = tr->get(key.first);
}
++i;
}
++i;
}
for (i = 0; i < incompleteStagingKeys.size(); ++i) {
try {
for (i = 0; i < incompleteStagingKeys.size(); ++i) {
if (keyNotFounds.count(i)) {
continue;
}
wait(success(fValues[i])); // NOTE: This may be waiting for ever!
} catch (Error& e) {
if (e.code() == error_code_key_not_found) { // e.code() == error_code_transaction_too_old || e.code() ==
// error_code_future_version
keyNotFounds.insert(i);
} else {
hasError = true;
}
if (retries > incompleteStagingKeys.size()) {
TraceEvent(SevError, "GetAndComputeStagingKeys", applierID)
.detail("BatchIndex", batchIndex)
.detail("KeyIndex", i)
.error(e);
}
wait(tr->onError(e));
}
} catch (Error& e) {
if (e.code() == error_code_key_not_found) { // e.code() == error_code_transaction_too_old || e.code() ==
// error_code_future_version
keyNotFounds.insert(i);
} else {
hasError = true;
}
if (retries > incompleteStagingKeys.size()) {
TraceEvent(SevError, "GetAndComputeStagingKeys", applierID)
.detail("BatchIndex", batchIndex)
.detail("KeyIndex", i)
.error(e);
}
wait(tr->onError(e));
}
if (!hasError) {
break;
}
@ -528,6 +531,7 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
.detail("FinishedBatch", self->finishedBatch.get());
// Ensure batch (i-1) is applied before batch i
// TODO: Add a counter to warn when too many requests are waiting on the actor
wait(self->finishedBatch.whenAtLeast(req.batchIndex - 1));
state bool isDuplicated = true;
@ -549,6 +553,8 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
}
ASSERT(batchData->dbApplier.present());
ASSERT(batchData->dbApplier.get().isError()); // writeMutationsToDB actor cannot have error.
// We cannot blindly retry because it is not idempodent
wait(batchData->dbApplier.get());