mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 18:56:00 +08:00
Fixing code review comments.
This commit is contained in:
parent
b7ce9d996c
commit
11668bb359
@ -1460,22 +1460,22 @@ public:
|
||||
|
||||
TraceEvent("DBA_switchover_started");
|
||||
|
||||
state ReadYourWritesTransaction tr3(dest);
|
||||
loop {
|
||||
try {
|
||||
tr2.reset();
|
||||
tr2.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr2.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Version destVersion = wait(tr2.getReadVersion());
|
||||
tr3.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr3.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Version destVersion = wait(tr3.getReadVersion());
|
||||
if (destVersion <= commitVersion) {
|
||||
TraceEvent("DBA_switchover_version_upgrade").detail("src", commitVersion).detail("dest", destVersion);
|
||||
TEST(true); // Forcing dest backup cluster to higher version
|
||||
tr2.set(minRequiredCommitVersionKey, BinaryWriter::toValue(commitVersion+1, Unversioned()));
|
||||
Void _ = wait(tr2.commit());
|
||||
tr3.set(minRequiredCommitVersionKey, BinaryWriter::toValue(commitVersion+1, Unversioned()));
|
||||
Void _ = wait(tr3.commit());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} catch( Error &e ) {
|
||||
Void _ = wait(tr2.onError(e));
|
||||
Void _ = wait(tr3.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1513,6 +1513,7 @@ public:
|
||||
|
||||
ACTOR static Future<Void> abortBackup(DatabaseBackupAgent* backupAgent, Database cx, Key tagName, bool partial) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state UID logUid;
|
||||
state Value backupUid;
|
||||
@ -1542,9 +1543,9 @@ public:
|
||||
Optional<Value> lastApplied = wait(tr->get(log_uid.withPrefix(applyMutationsBeginRange.begin)));
|
||||
if (lastApplied.present()) {
|
||||
Version applied = BinaryReader::fromStringRef<Version>(lastApplied.get(), Unversioned());
|
||||
if (current < applied) {
|
||||
if (current <= applied) {
|
||||
TraceEvent("DBA_abort_version_upgrade").detail("src", applied).detail("dest", current);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
TEST(true); // Upgrading version of local database.
|
||||
tr->set(minRequiredCommitVersionKey, BinaryWriter::toValue(applied+1, Unversioned()));
|
||||
}
|
||||
}
|
||||
|
@ -220,6 +220,7 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
if (popVersion != 0 && popVersion < requested) {
|
||||
if (confChange) *confChange = true;
|
||||
TEST(true); // Recovering at a higher version.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -128,17 +128,15 @@ struct VersionStampWorkload : TestWorkload {
|
||||
cx = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
}
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
// We specifically wish to grab the smalles read version that we can get and maintain it, to
|
||||
// have the strictest check we can on versionstamps monotonically increasing.
|
||||
state Version readVersion = wait(tr.getReadVersion());
|
||||
loop{
|
||||
try {
|
||||
Standalone<RangeResultRef> result = wait(tr.getRange(KeyRangeRef(self->vsValuePrefix, endOfRange(self->vsValuePrefix)), self->nodeCount + 1));
|
||||
ASSERT(result.size() <= self->nodeCount);
|
||||
if (self->key_commit.size() > 0) {
|
||||
ASSERT(result.size() > 0); // Even with failIfDataLost, we still shouldn't lose everything.
|
||||
}
|
||||
if (self->failIfDataLost) {
|
||||
ASSERT(result.size() == self->key_commit.size());
|
||||
}
|
||||
if (self->failIfDataLost) ASSERT(result.size() == self->key_commit.size());
|
||||
else TEST(result.size() > 0); // Not all data should always be lost.
|
||||
|
||||
//TraceEvent("VST_check0").detail("size", result.size()).detail("nodeCount", self->nodeCount).detail("key_commit", self->key_commit.size()).detail("readVersion", readVersion);
|
||||
for (auto it : result) {
|
||||
@ -153,7 +151,7 @@ struct VersionStampWorkload : TestWorkload {
|
||||
const auto& all_values = all_values_iter->second;
|
||||
|
||||
const auto& value_pair_iter = std::find_if(all_values.cbegin(), all_values.cend(),
|
||||
[parsedVersion](const std::pair<Version, Standalone<StringRef>>& pair) { return pair.first == parsedVersion; });
|
||||
[parsedVersion](const std::pair<Version, Standalone<StringRef>>& pair) { return pair.first == parsedVersion; });
|
||||
ASSERT(value_pair_iter != all_values.cend()); // The key exists, but we never wrote the timestamp.
|
||||
if (self->failIfDataLost) {
|
||||
auto last_element_iter = all_values.cend(); last_element_iter--;
|
||||
@ -169,9 +167,8 @@ struct VersionStampWorkload : TestWorkload {
|
||||
|
||||
Standalone<RangeResultRef> result = wait(tr.getRange(KeyRangeRef(self->vsKeyPrefix, endOfRange(self->vsKeyPrefix)), self->nodeCount + 1));
|
||||
ASSERT(result.size() <= self->nodeCount);
|
||||
if (self->failIfDataLost) {
|
||||
ASSERT(result.size() == self->versionStampKey_commit.size());
|
||||
}
|
||||
if (self->failIfDataLost) ASSERT(result.size() == self->versionStampKey_commit.size());
|
||||
else TEST(result.size() > 0); // Not all data should always be lost.
|
||||
|
||||
//TraceEvent("VST_check1").detail("size", result.size()).detail("vsKey_commit_size", self->versionStampKey_commit.size());
|
||||
for (auto it : result) {
|
||||
@ -225,6 +222,8 @@ struct VersionStampWorkload : TestWorkload {
|
||||
loop{
|
||||
Void _ = wait(poisson(&lastTime, delay));
|
||||
|
||||
state bool cx_is_primary = true;
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state Key key = self->keyForIndex(g_random->randomInt(0, self->nodeCount));
|
||||
state Value value = std::string(g_random->randomInt(10, 100), 'x');
|
||||
state Key versionStampKey = self->versionStampKeyForIndex(g_random->randomInt(0, self->nodeCount));
|
||||
@ -235,7 +234,6 @@ struct VersionStampWorkload : TestWorkload {
|
||||
state Version committedVersion;
|
||||
loop{
|
||||
state bool error = false;
|
||||
state ReadYourWritesTransaction tr(g_simulator.extraDB != nullptr ? (g_random->random01() < 0.5 ? extraDB : cx) : cx);
|
||||
//TraceEvent("VST_commit_begin").detail("key", printable(key)).detail("vsKey", printable(versionStampKey)).detail("clear", printable(range));
|
||||
try {
|
||||
tr.atomicOp(key, value, MutationRef::SetVersionstampedValue);
|
||||
@ -249,19 +247,19 @@ struct VersionStampWorkload : TestWorkload {
|
||||
committedVersionStamp = committedVersionStamp_;
|
||||
}
|
||||
catch (Error &e) {
|
||||
if (e.code() == error_code_database_locked) {
|
||||
state Error err = e;
|
||||
if (err.code() == error_code_database_locked) {
|
||||
//TraceEvent("VST_commit_database_locked");
|
||||
cx_is_primary = !cx_is_primary;
|
||||
tr = ReadYourWritesTransaction(cx_is_primary ? cx : extraDB);
|
||||
break;
|
||||
}
|
||||
// There's a senseless amount of duplication in this block, because Flow doesn't save `e` across suspend/resume points.
|
||||
if (e.code() == error_code_commit_unknown_result) {
|
||||
} else if (err.code() == error_code_commit_unknown_result) {
|
||||
//TraceEvent("VST_commit_unknown_result").detail("key", printable(key)).detail("vsKey", printable(versionStampKey)).error(e);
|
||||
Void _ = wait(tr.onError(e));
|
||||
loop {
|
||||
state ReadYourWritesTransaction cur_tr(cx_is_primary ? cx : extraDB);
|
||||
cur_tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> vs_value = wait(tr.get(key));
|
||||
Optional<Value> vs_value = wait(cur_tr.get(key));
|
||||
if (!vs_value.present()) {
|
||||
error = true;
|
||||
break;
|
||||
@ -283,20 +281,20 @@ struct VersionStampWorkload : TestWorkload {
|
||||
}
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr.onError(e));
|
||||
Void _ = wait(cur_tr.onError(e));
|
||||
}
|
||||
}
|
||||
if (error) {
|
||||
//TraceEvent("VST_commit_failed").detail("key", printable(key)).detail("vsKey", printable(versionStampKey));
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
//TraceEvent("VST_commit_failed").detail("key", printable(key)).detail("vsKey", printable(versionStampKey)).error(e);
|
||||
Void _ = wait(tr.onError(e));
|
||||
continue;
|
||||
error = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (error) {
|
||||
//TraceEvent("VST_commit_failed").detail("key", printable(key)).detail("vsKey", printable(versionStampKey)).error(e);
|
||||
Void _ = wait(tr.onError(err));
|
||||
continue;
|
||||
}
|
||||
|
||||
const Standalone<StringRef> vsKeyKey = versionStampKey.removePrefix(self->vsKeyPrefix).substr(4, 16);
|
||||
const auto& committedVersionPair = std::make_pair(committedVersion, committedVersionStamp);
|
||||
//TraceEvent("VST_commit_success").detail("key", printable(key)).detail("vsKey", printable(versionStampKey)).detail("vsKeyKey", printable(vsKeyKey)).detail("clear", printable(range)).detail("version", tr.getCommittedVersion()).detail("vsValue", printable(committedVersionPair.second));
|
||||
|
@ -13,4 +13,4 @@ testTitle=VersionStampBackupToDB
|
||||
machinesToKill=10
|
||||
machinesToLeave=3
|
||||
reboot=true
|
||||
testDuration=80.0
|
||||
testDuration=60.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user