getAndComputeStagingKeys: Improved handling of not exist keys

This commit is contained in:
Meng Xu 2020-06-03 15:28:59 -07:00
parent 6edcb8cf00
commit d5025a1779
2 changed files with 32 additions and 21 deletions

View File

@ -224,39 +224,49 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
.detail("BatchIndex", batchIndex)
.detail("GetKeys", incompleteStagingKeys.size())
.detail("DelayTime", delayTime);
state std::set<int> keyNotFounds;
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
for (auto& key : incompleteStagingKeys) {
fValues.push_back(tr->get(key.first));
}
state int i = 0;
state bool hasError = false;
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
for (auto& key : incompleteStagingKeys) {
fValues.push_back(tr->get(key.first));
hasError = false;
for (i = 0; i < incompleteStagingKeys.size(); ++i) {
try {
if (keyNotFounds.count(i)) {
continue;
}
wait(success(fValues[i]));
} catch (Error& e) {
if (e.code() == error_code_key_not_found) {
keyNotFounds.push_back(i);
} else {
hasError = true;
}
wait(tr->onError(e));
}
wait(waitForAll(fValues));
}
if (!hasError) {
break;
} catch (Error& e) {
if (retries++ > 10) { // TODO: Can we stop retry at the first error?
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysGetKeysStuck", applierID)
.detail("BatchIndex", batchIndex)
.detail("GetKeys", incompleteStagingKeys.size())
.error(e);
break;
}
wait(tr->onError(e));
fValues.clear();
}
}
ASSERT(fValues.size() == incompleteStagingKeys.size());
int i = 0;
for (auto& key : incompleteStagingKeys) {
if (!fValues[i].get().present()) { // Debug info to understand which key does not exist in DB
if (keyNotFounds.count(i) || !fValues[i].get().present()) { // Key not exist in DB
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
.detail("BatchIndex", batchIndex)
.detail("Key", key.first)
.detail("Reason", "Not found in DB")
.detail("IsReady", fValues[i].isReady())
.detail("PendingMutations", key.second->second.pendingMutations.size())
.detail("StagingKeyType", (int)key.second->second.type);
.detail("StagingKeyType", getTypeString(key.second->second.type));
for (auto& vm : key.second->second.pendingMutations) {
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
.detail("PendingMutationVersion", vm.first.toString())

View File

@ -126,7 +126,8 @@ struct StagingKey {
.detail("Value", val)
.detail("MType", type < MutationRef::MAX_ATOMIC_OP ? getTypeString(type) : "[Unset]")
.detail("LargestPendingVersion",
(pendingMutations.empty() ? "[none]" : pendingMutations.rbegin()->first.toString()));
(pendingMutations.empty() ? "[none]" : pendingMutations.rbegin()->first.toString()))
.detail("PendingMutations", pendingMutations.size());
std::map<LogMessageVersion, Standalone<MutationRef>>::iterator lb = pendingMutations.lower_bound(version);
if (lb == pendingMutations.end()) {
return;