Adding check for merge convergence

This commit is contained in:
Josh Slocum 2022-07-14 20:32:08 -05:00
parent 407300bfa6
commit 866dda5763
5 changed files with 41 additions and 3 deletions

View File

@ -166,3 +166,24 @@ bool compareFDBAndBlob(RangeResult fdb,
}
return correct;
}
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
// clear key range and check whether it is merged or not, repeatedly
state Transaction tr(cx);
loop {
try {
Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(range));
if (ranges.size() == 1) {
return Void();
}
TEST(true); // clearAndAwaitMerge doing clear
tr.clear(range);
wait(tr.commit());
wait(delay(30.0)); // sleep a bit before checking on merge again
tr.reset();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}

View File

@ -1939,7 +1939,7 @@ ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
}
// This code block must execute withou a wait for the lock checks (isMergeActive, mergeCandidates) to not
// deadlock and to avoid merge-merge races.
if ((!g_network->isSimulated() || !g_simulator.speedUpSimulation) && !bmData->isMergeActive(bestGranuleRange)) {
if (!bmData->isMergeActive(bestGranuleRange)) {
// check to avoid races where a split eval came in while merge was evaluating
auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(bestGranuleRange);
bool mergeStillOk = true;

View File

@ -51,6 +51,8 @@ bool compareFDBAndBlob(RangeResult fdb,
Version v,
bool debug);
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range);
#include "flow/unactorcompiler.h"
#endif

View File

@ -149,6 +149,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
// parameters global across all clients
int64_t targetByteRate;
bool doMergeCheckAtEnd;
std::vector<Reference<ThreadData>> directories;
std::vector<Future<Void>> clients;
@ -162,6 +163,9 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
// different parameters within those constraints
int64_t randomness = sharedRandomNumber;
doMergeCheckAtEnd = randomness % 10 == 0;
randomness /= 10;
// randomize between low and high directory count
int64_t targetDirectories = 1 + (randomness % 8);
randomness /= 8;
@ -910,7 +914,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
}
wait(self->checkTenantRanges(self, cx, threadData));
bool initialCheck = result;
state bool initialCheck = result;
result &= threadData->mismatches == 0 && (threadData->timeTravelTooOld == 0);
fmt::print("Blob Granule Workload Directory {0} {1}:\n", threadData->directoryID, result ? "passed" : "failed");
@ -933,6 +937,11 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
// For some reason simulation is still passing when this fails?.. so assert for now
ASSERT(result);
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && self->doMergeCheckAtEnd) {
TEST(true); // BGCorrectness clearing database and awaiting merge
wait(clearAndAwaitMerge(cx, threadData->directoryRange));
}
return result;
}

View File

@ -451,7 +451,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
if (BGV_DEBUG && startReadVersion != readVersion) {
fmt::print("Availability check updated read version from {0} to {1}\n", startReadVersion, readVersion);
}
bool result = availabilityPassed && self->mismatches == 0 && (checks > 0) && (self->timeTravelTooOld == 0);
state bool result =
availabilityPassed && self->mismatches == 0 && (checks > 0) && (self->timeTravelTooOld == 0);
fmt::print("Blob Granule Verifier {0} {1}:\n", self->clientId, result ? "passed" : "failed");
fmt::print(" {} successful final granule checks\n", checks);
fmt::print(" {} failed final granule checks\n", availabilityPassed ? 0 : 1);
@ -470,6 +471,11 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
// For some reason simulation is still passing when this fails?.. so assert for now
ASSERT(result);
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && deterministicRandom()->random01() < 0.1) {
TEST(true); // BGV clearing database and awaiting merge
wait(clearAndAwaitMerge(cx, normalKeys));
}
return result;
}