added changes to allow writing of last epoch end version to special keys when performing recovery due to snapshot

This commit is contained in:
Jon Fu 2020-09-02 15:44:55 -04:00
parent d334b6484e
commit 22996284c7
5 changed files with 60 additions and 2 deletions

View File

@ -1063,5 +1063,4 @@ const KeyRangeRef testOnlyTxnStateStorePrefixRange(
const KeyRef writeRecoveryKey = LiteralStringRef("\xff/writeRecovery");
const ValueRef writeRecoveryKeyTrue = LiteralStringRef("1");
const ValueRef writeRecoveryKeyFalse = LiteralStringRef("0");
const KeyRef snapshotEndVersionKey = LiteralStringRef("\xff/snapshotEndVersion");

View File

@ -398,7 +398,7 @@ extern const KeyRangeRef testOnlyTxnStateStorePrefixRange;
// Snapshot + Incremental Restore
extern const KeyRef writeRecoveryKey;
extern const ValueRef writeRecoveryKeyTrue, writeRecoveryKeyFalse;
extern const ValueRef writeRecoveryKeyTrue;
extern const KeyRef snapshotEndVersionKey;
#pragma clang diagnostic pop

View File

@ -300,6 +300,9 @@ void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRe
if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
confChange = true;
TEST(true); // Recovering at a higher version.
} else if (m.param1 == writeRecoveryKey) {
TraceEvent("WriteRecoveryKeySet", dbgid);
if (!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
}
}
else if (m.param2.size() && m.param2[0] == systemKeys.begin[0] && m.type == MutationRef::ClearRange) {

View File

@ -1518,6 +1518,16 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
CommitTransactionRef &tr = recoveryCommitRequest.transaction;
int mmApplied = 0; // The number of mutations in tr.mutations that have been applied to the txnStateStore so far
if (self->lastEpochEnd != 0) {
Optional<Value> snapRecoveryFlag = self->txnStateStore->readValue(writeRecoveryKey).get();
TraceEvent("MasterRecoverySnap")
.detail("SnapRecoveryFlag", snapRecoveryFlag.present() ? snapRecoveryFlag.get().toString() : "N/A");
if (snapRecoveryFlag.present()) {
BinaryWriter bw(Unversioned());
tr.set(recoveryCommitRequest.arena, snapshotEndVersionKey, (bw << self->lastEpochEnd).toValue());
// Clear the key so multiple recoveries will not overwrite the first version recorded
self->txnStateStore->clear(singleKeyRange(writeRecoveryKey));
tr.clear(recoveryCommitRequest.arena, singleKeyRange(writeRecoveryKey));
}
if(self->forceRecovery) {
BinaryWriter bw(Unversioned());
tr.set(recoveryCommitRequest.arena, killStorageKey, (bw << self->safeLocality).toValue());

View File

@ -2,6 +2,7 @@
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbmonitor/SimpleIni.h"
#include "fdbserver/Status.h"
@ -194,6 +195,21 @@ public: // workload functions
// create even keys before the snapshot
wait(self->_create_keys(cx, "snapKey"));
} else if (self->testID == 1) {
state Reference<ReadYourWritesTransaction> tr1(new ReadYourWritesTransaction(cx));
loop {
try {
tr1->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr1->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<Value> val = wait(tr1->get(writeRecoveryKey));
state Optional<Value> val2 = wait(tr1->get(snapshotEndVersionKey));
TraceEvent("CheckSpecialKey1")
.detail("WriteRecoveryValue", val.present() ? val.get().toString() : "N/A")
.detail("EndVersionValue", val2.present() ? val2.get().toString() : "N/A");
break;
} catch (Error& e) {
wait(tr1->onError(e));
}
}
// create a snapshot
state double toDelay = fmod(deterministicRandom()->randomUInt32(), self->maxSnapDelay);
TraceEvent("ToDelay").detail("Value", toDelay);
@ -221,6 +237,21 @@ public: // workload functions
}
}
}
state Reference<ReadYourWritesTransaction> tr2(new ReadYourWritesTransaction(cx));
loop {
try {
tr2->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr2->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<Value> val3 = wait(tr2->get(writeRecoveryKey));
state Optional<Value> val4 = wait(tr2->get(snapshotEndVersionKey));
TraceEvent("CheckSpecialKey2")
.detail("WriteRecoveryValue", val3.present() ? val3.get().toString() : "N/A")
.detail("EndVersionValue", val4.present() ? val4.get().toString() : "N/A");
break;
} catch (Error& e) {
wait(tr2->onError(e));
}
}
CSimpleIni ini;
ini.SetUnicode();
ini.LoadFile(self->restartInfoLocation.c_str());
@ -243,6 +274,21 @@ public: // workload functions
TraceEvent(SevWarnAlways, "BackupFailedSkippingRestoreCheck");
return Void();
}
state Reference<ReadYourWritesTransaction> tr3(new ReadYourWritesTransaction(cx));
loop {
try {
tr3->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr3->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<Value> val5 = wait(tr3->get(writeRecoveryKey));
state Optional<Value> val6 = wait(tr3->get(snapshotEndVersionKey));
TraceEvent("CheckSpecialKey3")
.detail("WriteRecoveryValue", val5.present() ? val5.get().toString() : "N/A")
.detail("EndVersionValue", val6.present() ? val6.get().toString() : "N/A");
break;
} catch (Error& e) {
wait(tr3->onError(e));
}
}
state KeySelector begin = firstGreaterOrEqual(normalKeys.begin);
state KeySelector end = firstGreaterOrEqual(normalKeys.end);
state int cnt = 0;