diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 07214d396f..dacf89e754 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -843,6 +843,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BG_MAX_SPLIT_FANOUT, 10 ); if( randomize && BUGGIFY ) BG_MAX_SPLIT_FANOUT = deterministicRandom()->randomInt(5, 15); init( BG_HOT_SNAPSHOT_VERSIONS, 5000000 ); + init( BG_CONSISTENCY_CHECK_ENABLED, true ); if (randomize && BUGGIFY) BG_CONSISTENCY_CHECK_ENABLED = false; + init( BG_CONSISTENCY_CHECK_TARGET_SPEED_KB, 1000 ); if (randomize && BUGGIFY) BG_CONSISTENCY_CHECK_TARGET_SPEED_KB *= (deterministicRandom()->randomInt(2, 50) / 10); + init( BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM, 8 ); if( randomize && BUGGIFY ) BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM = 1; init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0; init( BLOB_WORKER_REQUEST_TIMEOUT, 5.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_REQUEST_TIMEOUT = 1.0; diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 5f88a9975c..e15a3100b5 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -798,6 +798,8 @@ public: int BG_DELTA_BYTES_BEFORE_COMPACT; int BG_MAX_SPLIT_FANOUT; int BG_HOT_SNAPSHOT_VERSIONS; + int BG_CONSISTENCY_CHECK_ENABLED; + int BG_CONSISTENCY_CHECK_TARGET_SPEED_KB; int BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM; double BLOB_WORKER_TIMEOUT; // Blob Manager's reaction time to a blob worker failure diff --git a/fdbserver/BlobGranuleValidation.actor.cpp b/fdbserver/BlobGranuleValidation.actor.cpp new file mode 100644 index 0000000000..9f8168ffc8 --- /dev/null +++ b/fdbserver/BlobGranuleValidation.actor.cpp @@ -0,0 +1,165 @@ +/* + * BlobGranuleValidation.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/BlobGranuleValidation.actor.h" +#include "flow/actorcompiler.h" // has to be last include + +ACTOR Future> readFromFDB(Database cx, KeyRange range) { + state bool first = true; + state Version v; + state RangeResult out; + state Transaction tr(cx); + state KeyRange currentRange = range; + loop { + try { + state RangeResult r = wait(tr.getRange(currentRange, CLIENT_KNOBS->TOO_MANY)); + Version grv = wait(tr.getReadVersion()); + // need consistent version snapshot of range + if (first) { + v = grv; + first = false; + } else if (v != grv) { + // reset the range and restart the read at a higher version + first = true; + out = RangeResult(); + currentRange = range; + tr.reset(); + continue; + } + out.arena().dependsOn(r.arena()); + out.append(out.arena(), r.begin(), r.size()); + if (r.more) { + currentRange = KeyRangeRef(keyAfter(r.back().key), currentRange.end); + } else { + break; + } + } catch (Error& e) { + wait(tr.onError(e)); + } + } + return std::pair(out, v); +} + +// FIXME: typedef this pair type and/or chunk list +ACTOR Future>>> readFromBlob( + Database cx, + Reference bstore, + KeyRange range, + Version beginVersion, + Version readVersion) { + state RangeResult out; + state Standalone> chunks; + state Transaction tr(cx); + + loop { + try { + Standalone> chunks_ = + wait(tr.readBlobGranules(range, beginVersion, readVersion)); + chunks = chunks_; + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + for (const BlobGranuleChunkRef& chunk : chunks) { + RangeResult chunkRows = wait(readBlobGranule(chunk, range, beginVersion, readVersion, bstore)); + out.arena().dependsOn(chunkRows.arena()); + out.append(out.arena(), chunkRows.begin(), chunkRows.size()); + } + return std::pair(out, chunks); +} + +bool compareFDBAndBlob(RangeResult fdb, + std::pair>> blob, + KeyRange range, + Version v, + bool debug) { + bool correct = fdb == blob.first; + if (!correct) { + TraceEvent ev(SevError, "GranuleMismatch"); + ev.detail("RangeStart", range.begin) + .detail("RangeEnd", range.end) + .detail("Version", v) + .detail("FDBSize", fdb.size()) + .detail("BlobSize", blob.first.size()); + + if (debug) { + fmt::print("\nMismatch for [{0} - {1}) @ {2} ({3}). F({4}) B({5}):\n", + range.begin.printable(), + range.end.printable(), + v, + fdb.size(), + blob.first.size()); + + Optional lastCorrect; + for (int i = 0; i < std::max(fdb.size(), blob.first.size()); i++) { + if (i >= fdb.size() || i >= blob.first.size() || fdb[i] != blob.first[i]) { + printf(" Found mismatch at %d.\n", i); + if (lastCorrect.present()) { + printf(" last correct: %s=%s\n", + lastCorrect.get().key.printable().c_str(), + lastCorrect.get().value.printable().c_str()); + } + if (i < fdb.size()) { + printf(" FDB: %s=%s\n", fdb[i].key.printable().c_str(), fdb[i].value.printable().c_str()); + } else { + printf(" FDB: \n"); + } + if (i < blob.first.size()) { + printf(" BLB: %s=%s\n", + blob.first[i].key.printable().c_str(), + blob.first[i].value.printable().c_str()); + } else { + printf(" BLB: \n"); + } + printf("\n"); + break; + } + if (i < fdb.size()) { + lastCorrect = fdb[i]; + } else { + lastCorrect = blob.first[i]; + } + } + + printf("Chunks:\n"); + for (auto& chunk : blob.second) { + printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str()); + + printf(" SnapshotFile:\n %s\n", + chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : ""); + printf(" DeltaFiles:\n"); + for (auto& df : chunk.deltaFiles) { + printf(" %s\n", df.toString().c_str()); + } + printf(" Deltas: (%d)", chunk.newDeltas.size()); + if (chunk.newDeltas.size() > 0) { + fmt::print(" with version [{0} - {1}]", + chunk.newDeltas[0].version, + chunk.newDeltas[chunk.newDeltas.size() - 1].version); + } + fmt::print(" IncludedVersion: {}\n", chunk.includedVersion); + } + printf("\n"); + } + } + return correct; +} \ No newline at end of file diff --git a/fdbserver/BlobGranuleValidation.actor.h b/fdbserver/BlobGranuleValidation.actor.h new file mode 100644 index 0000000000..749027055a --- /dev/null +++ b/fdbserver/BlobGranuleValidation.actor.h @@ -0,0 +1,53 @@ +/* + * BlobGranuleValidation.actor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_BLOBGRANULEVALIDATION_ACTOR_G_H) +#define FDBSERVER_BLOBGRANULEVALIDATION_ACTOR_G_H +#include "fdbserver/BlobGranuleValidation.actor.g.h" +#elif !defined(FDBSERVER_BLOBGRANULEVALIDATION_ACTOR_H) +#define FDBSERVER_BLOBGRANULEVALIDATION_ACTOR_H + +#pragma once + +#include "flow/flow.h" +#include "fdbclient/BlobGranuleReader.actor.h" +#include "fdbclient/CommitTransaction.h" +#include "fdbclient/FDBTypes.h" +#include "fdbclient/BlobGranuleCommon.h" +#include "flow/actorcompiler.h" // has to be last include + +/* Contains utility functions for validating blob granule data */ + +ACTOR Future>>> readFromBlob( + Database cx, + Reference bstore, + KeyRange range, + Version beginVersion, + Version readVersion); + +ACTOR Future> readFromFDB(Database cx, KeyRange range); + +bool compareFDBAndBlob(RangeResult fdb, + std::pair>> blob, + KeyRange range, + Version v, + bool debug); + +#endif \ No newline at end of file diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index e0eadbca4e..912b0842e8 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -34,6 +34,7 @@ #include "fdbclient/SystemData.h" #include "fdbserver/BlobManagerInterface.h" #include "fdbserver/Knobs.h" +#include "fdbserver/BlobGranuleValidation.actor.h" #include "fdbserver/BlobGranuleServerCommon.actor.h" #include "fdbserver/QuietDatabase.h" #include "fdbserver/WaitFailure.h" @@ -195,10 +196,11 @@ struct RangeAssignment { }; // SOMEDAY: track worker's reads/writes eventually -struct BlobWorkerStats { +// FIXME: namespace? +struct BlobWorkerInfo { int numGranulesAssigned; - BlobWorkerStats(int numGranulesAssigned = 0) : numGranulesAssigned(numGranulesAssigned) {} + BlobWorkerInfo(int numGranulesAssigned = 0) : numGranulesAssigned(numGranulesAssigned) {} }; struct SplitEvaluation { @@ -218,12 +220,17 @@ struct BlobManagerStats { Counter granuleSplits; Counter granuleWriteHotSplits; + Counter ccGranulesChecked; + Counter ccRowsChecked; + Counter ccBytesChecked; + Counter ccMismatches; Future logger; // Current stats maintained for a given blob worker process explicit BlobManagerStats(UID id, double interval, std::unordered_map* workers) : cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc), - granuleWriteHotSplits("GranuleWriteHotSplits", cc) { + granuleWriteHotSplits("GranuleWriteHotSplits", cc), ccGranulesChecked("CCGranulesChecked", cc), + ccRowsChecked("CCRowsChecked", cc), ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc) { specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); }); logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics"); } @@ -241,7 +248,7 @@ struct BlobManagerData : NonCopyable, ReferenceCounted { Reference bstore; std::unordered_map workersById; - std::unordered_map workerStats; // mapping between workerID -> workerStats + std::unordered_map workerStats; // mapping between workerID -> workerStats std::unordered_set workerAddresses; std::unordered_set deadWorkers; KeyRangeMap workerAssignments; @@ -269,6 +276,19 @@ struct BlobManagerData : NonCopyable, ReferenceCounted { : id(id), db(db), dcId(dcId), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &workersById), knownBlobRanges(false, normalKeys.end), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {} + + // only initialize blob store if actually needed + void initBStore() { + if (!bstore.isValid()) { + if (BM_DEBUG) { + fmt::print("BM {} constructing backup container from {}\n", epoch, SERVER_KNOBS->BG_URL.c_str()); + } + bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); + if (BM_DEBUG) { + fmt::print("BM {} constructed backup container\n", epoch); + } + } + } }; ACTOR Future>> splitRange(Reference bmData, @@ -1519,7 +1539,7 @@ ACTOR Future checkBlobWorkerList(Reference bmData, Promis worker.locality.dcId() == bmData->dcId) { bmData->workerAddresses.insert(worker.stableAddress()); bmData->workersById[worker.id()] = worker; - bmData->workerStats[worker.id()] = BlobWorkerStats(); + bmData->workerStats[worker.id()] = BlobWorkerInfo(); bmData->addActor.send(monitorBlobWorker(bmData, worker)); foundAnyNew = true; } else if (!bmData->workersById.count(worker.id())) { @@ -2022,7 +2042,7 @@ ACTOR Future initializeBlobWorker(Reference self, Recruit if (!self->workerAddresses.count(bwi.stableAddress()) && bwi.locality.dcId() == self->dcId) { self->workerAddresses.insert(bwi.stableAddress()); self->workersById[bwi.id()] = bwi; - self->workerStats[bwi.id()] = BlobWorkerStats(); + self->workerStats[bwi.id()] = BlobWorkerInfo(); self->addActor.send(monitorBlobWorker(self, bwi)); } else if (!self->workersById.count(bwi.id())) { self->addActor.send(killBlobWorker(self, bwi, false)); @@ -2554,14 +2574,7 @@ ACTOR Future pruneRange(Reference self, KeyRangeRef range * case that the timer is up before any new prune intents arrive). */ ACTOR Future monitorPruneKeys(Reference self) { - // setup bstore - if (BM_DEBUG) { - fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str()); - } - self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); - if (BM_DEBUG) { - printf("BM constructed backup container\n"); - } + self->initBStore(); loop { state Reference tr = makeReference(self->db); @@ -2730,6 +2743,73 @@ static void blobManagerExclusionSafetyCheck(Reference self, req.reply.send(reply); } +// FIXME: could eventually make this more thorough by storing some state in the DB or something +// FIXME: simpler solution could be to shuffle ranges +ACTOR Future bgConsistencyCheck(Reference bmData) { + + state Reference rateLimiter = + Reference(new SpeedLimit(SERVER_KNOBS->BG_CONSISTENCY_CHECK_TARGET_SPEED_KB * 1024, 1)); + bmData->initBStore(); + + if (BM_DEBUG) { + fmt::print("BGCC starting\n"); + } + + loop { + if (g_network->isSimulated() && g_simulator.speedUpSimulation) { + if (BM_DEBUG) { + printf("BGCC stopping\n"); + } + return Void(); + } + + if (bmData->workersById.size() >= 1) { + int tries = 10; + state KeyRange range; + while (tries > 0) { + auto randomRange = bmData->workerAssignments.randomRange(); + if (randomRange.value() != UID()) { + range = randomRange.range(); + break; + } + tries--; + } + + if (tries == 0) { + if (BM_DEBUG) { + printf("BGCC couldn't find random range to check, skipping\n"); + } + wait(rateLimiter->getAllowance(SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES)); + } else { + state std::pair fdbResult = wait(readFromFDB(bmData->db, range)); + + std::pair>> blobResult = + wait(readFromBlob(bmData->db, bmData->bstore, range, 0, fdbResult.second)); + + if (!compareFDBAndBlob(fdbResult.first, blobResult, range, fdbResult.second, BM_DEBUG)) { + ++bmData->stats.ccMismatches; + } + + int64_t bytesRead = fdbResult.first.expectedSize(); + + ++bmData->stats.ccGranulesChecked; + bmData->stats.ccRowsChecked += fdbResult.first.size(); + bmData->stats.ccBytesChecked += bytesRead; + + // clear fdb result to release memory since it is a state variable + fdbResult = std::pair(RangeResult(), 0); + + wait(rateLimiter->getAllowance(bytesRead)); + } + } else { + if (BM_DEBUG) { + fmt::print("BGCC found no workers, skipping\n", bmData->workerAssignments.size()); + } + wait(delay(60.0)); + } + } +} + // Simulation validation that multiple blob managers aren't started with the same epoch static std::map managerEpochsSeen; @@ -2776,6 +2856,9 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, self->addActor.send(doLockChecks(self)); self->addActor.send(monitorClientRanges(self)); self->addActor.send(monitorPruneKeys(self)); + if (SERVER_KNOBS->BG_CONSISTENCY_CHECK_ENABLED) { + self->addActor.send(bgConsistencyCheck(self)); + } if (BUGGIFY) { self->addActor.send(chaosRangeMover(self)); diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 7970dd18ef..3ffe1febbd 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -7,6 +7,8 @@ set(FDBSERVER_SRCS BackupWorker.actor.cpp BlobGranuleServerCommon.actor.cpp BlobGranuleServerCommon.actor.h + BlobGranuleValidation.actor.cpp + BlobGranuleValidation.actor.h BlobManager.actor.cpp BlobManagerInterface.h BlobWorker.actor.cpp diff --git a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp index fc6d3035ae..d7ffd4e92c 100644 --- a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp +++ b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp @@ -29,6 +29,7 @@ #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/ReadYourWrites.h" #include "fdbclient/SystemData.h" +#include "fdbserver/BlobGranuleValidation.actor.h" #include "fdbserver/Knobs.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" @@ -271,36 +272,6 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { return Void(); } - // FIXME: typedef this pair type and/or chunk list - ACTOR Future>>> readFromBlob( - Database cx, - BlobGranuleCorrectnessWorkload* self, - KeyRange range, - Version beginVersion, - Version readVersion) { - state RangeResult out; - state Standalone> chunks; - state Transaction tr(cx); - - loop { - try { - Standalone> chunks_ = - wait(tr.readBlobGranules(range, beginVersion, readVersion)); - chunks = chunks_; - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - for (const BlobGranuleChunkRef& chunk : chunks) { - RangeResult chunkRows = wait(readBlobGranule(chunk, range, beginVersion, readVersion, self->bstore)); - out.arena().dependsOn(chunkRows.arena()); - out.append(out.arena(), chunkRows.begin(), chunkRows.size()); - } - return std::pair(out, chunks); - } - // handle retries + errors // It's ok to reset the transaction here because its read version is only used for reading the granule mapping from // the system keyspace @@ -326,7 +297,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { Version rv = wait(self->doGrv(&tr)); state Version readVersion = rv; std::pair>> blob = - wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion)); + wait(readFromBlob(cx, self->bstore, threadData->directoryRange, 0, readVersion)); fmt::print("Directory {0} got {1} RV {2}\n", threadData->directoryID, doSetup ? "initial" : "final", @@ -690,7 +661,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { } std::pair>> blob = - wait(self->readFromBlob(cx, self, range, beginVersion, readVersion)); + wait(readFromBlob(cx, self->bstore, range, beginVersion, readVersion)); self->validateResult(threadData, blob, startKey, endKey, beginVersion, readVersion); int resultBytes = blob.first.expectedSize(); @@ -884,7 +855,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { fmt::print("Directory {0} doing final data check @ {1}\n", threadData->directoryID, readVersion); } std::pair>> blob = - wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion)); + wait(readFromBlob(cx, self->bstore, threadData->directoryRange, 0, readVersion)); result = self->validateResult(threadData, blob, 0, std::numeric_limits::max(), 0, readVersion); finalRowsValidated = blob.first.size(); diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index ba49923bf1..a2f0062720 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -28,6 +28,7 @@ #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/ReadYourWrites.h" #include "fdbclient/SystemData.h" +#include "fdbserver/BlobGranuleValidation.actor.h" #include "fdbserver/Knobs.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" @@ -167,154 +168,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload { } } - // assumes we can read the whole range in one transaction at a single version - ACTOR Future> readFromFDB(Database cx, KeyRange range) { - state bool first = true; - state Version v; - state RangeResult out; - state Transaction tr(cx); - state KeyRange currentRange = range; - loop { - try { - state RangeResult r = wait(tr.getRange(currentRange, CLIENT_KNOBS->TOO_MANY)); - Version grv = wait(tr.getReadVersion()); - // need consistent version snapshot of range - if (first) { - v = grv; - first = false; - } else if (v != grv) { - // reset the range and restart the read at a higher version - TraceEvent(SevDebug, "BGVFDBReadReset").detail("ReadVersion", v); - TEST(true); // BGV transaction reset - fmt::print("Resetting BGV GRV {0} -> {1}\n", v, grv); - first = true; - out = RangeResult(); - currentRange = range; - tr.reset(); - continue; - } - out.arena().dependsOn(r.arena()); - out.append(out.arena(), r.begin(), r.size()); - if (r.more) { - currentRange = KeyRangeRef(keyAfter(r.back().key), currentRange.end); - } else { - break; - } - } catch (Error& e) { - wait(tr.onError(e)); - } - } - return std::pair(out, v); - } - - // FIXME: typedef this pair type and/or chunk list - ACTOR Future>>> - readFromBlob(Database cx, BlobGranuleVerifierWorkload* self, KeyRange range, Version version) { - state RangeResult out; - state Standalone> chunks; - state Transaction tr(cx); - - loop { - try { - Standalone> chunks_ = wait(tr.readBlobGranules(range, 0, version)); - chunks = chunks_; - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - for (const BlobGranuleChunkRef& chunk : chunks) { - RangeResult chunkRows = wait(readBlobGranule(chunk, range, 0, version, self->bstore)); - out.arena().dependsOn(chunkRows.arena()); - out.append(out.arena(), chunkRows.begin(), chunkRows.size()); - } - return std::pair(out, chunks); - } - - bool compareResult(RangeResult fdb, - std::pair>> blob, - KeyRange range, - Version v, - bool initialRequest) { - bool correct = fdb == blob.first; - if (!correct) { - mismatches++; - TraceEvent ev(SevError, "GranuleMismatch"); - ev.detail("RangeStart", range.begin) - .detail("RangeEnd", range.end) - .detail("Version", v) - .detail("RequestType", initialRequest ? "RealTime" : "TimeTravel") - .detail("FDBSize", fdb.size()) - .detail("BlobSize", blob.first.size()); - - if (BGV_DEBUG) { - fmt::print("\nMismatch for [{0} - {1}) @ {2} ({3}). F({4}) B({5}):\n", - range.begin.printable(), - range.end.printable(), - v, - initialRequest ? "RealTime" : "TimeTravel", - fdb.size(), - blob.first.size()); - - Optional lastCorrect; - for (int i = 0; i < std::max(fdb.size(), blob.first.size()); i++) { - if (i >= fdb.size() || i >= blob.first.size() || fdb[i] != blob.first[i]) { - printf(" Found mismatch at %d.\n", i); - if (lastCorrect.present()) { - printf(" last correct: %s=%s\n", - lastCorrect.get().key.printable().c_str(), - lastCorrect.get().value.printable().c_str()); - } - if (i < fdb.size()) { - printf( - " FDB: %s=%s\n", fdb[i].key.printable().c_str(), fdb[i].value.printable().c_str()); - } else { - printf(" FDB: \n"); - } - if (i < blob.first.size()) { - printf(" BLB: %s=%s\n", - blob.first[i].key.printable().c_str(), - blob.first[i].value.printable().c_str()); - } else { - printf(" BLB: \n"); - } - printf("\n"); - break; - } - if (i < fdb.size()) { - lastCorrect = fdb[i]; - } else { - lastCorrect = blob.first[i]; - } - } - - printf("Chunks:\n"); - for (auto& chunk : blob.second) { - printf("[%s - %s)\n", - chunk.keyRange.begin.printable().c_str(), - chunk.keyRange.end.printable().c_str()); - - printf(" SnapshotFile:\n %s\n", - chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : ""); - printf(" DeltaFiles:\n"); - for (auto& df : chunk.deltaFiles) { - printf(" %s\n", df.toString().c_str()); - } - printf(" Deltas: (%d)", chunk.newDeltas.size()); - if (chunk.newDeltas.size() > 0) { - fmt::print(" with version [{0} - {1}]", - chunk.newDeltas[0].version, - chunk.newDeltas[chunk.newDeltas.size() - 1].version); - } - fmt::print(" IncludedVersion: {}\n", chunk.includedVersion); - } - printf("\n"); - } - } - return correct; - } - struct OldRead { KeyRange range; Version v; @@ -469,21 +322,23 @@ struct BlobGranuleVerifierWorkload : TestWorkload { } } std::pair>> reReadResult = - wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v)); - self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false); + wait(readFromBlob(cx, self->bstore, oldRead.range, 0, oldRead.v)); + if (!compareFDBAndBlob(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, BGV_DEBUG)) { + self->mismatches++; + } self->timeTravelReads++; if (doPruning) { wait(self->killBlobWorkers(cx, self)); std::pair>> versionRead = - wait(self->readFromBlob(cx, self, oldRead.range, prevPruneVersion)); + wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPruneVersion)); try { Version minSnapshotVersion = newPruneVersion; for (auto& it : versionRead.second) { minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion); } std::pair>> versionRead = - wait(self->readFromBlob(cx, self, oldRead.range, minSnapshotVersion - 1)); + wait(readFromBlob(cx, self->bstore, oldRead.range, 0, minSnapshotVersion - 1)); ASSERT(false); } catch (Error& e) { if (e.code() == error_code_actor_cancelled) { @@ -504,10 +359,10 @@ struct BlobGranuleVerifierWorkload : TestWorkload { int rIndex = deterministicRandom()->randomInt(0, self->granuleRanges.get().size()); state KeyRange range = self->granuleRanges.get()[rIndex]; - state std::pair fdb = wait(self->readFromFDB(cx, range)); + state std::pair fdb = wait(readFromFDB(cx, range)); std::pair>> blob = - wait(self->readFromBlob(cx, self, range, fdb.second)); - if (self->compareResult(fdb.first, blob, range, fdb.second, true)) { + wait(readFromBlob(cx, self->bstore, range, 0, fdb.second)); + if (compareFDBAndBlob(fdb.first, blob, range, fdb.second, BGV_DEBUG)) { // TODO: bias for immediately re-reading to catch rollback cases double reReadTime = currentTime + deterministicRandom()->random01() * self->timeTravelLimit; int memory = fdb.first.expectedSize(); @@ -516,6 +371,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload { timeTravelChecks[reReadTime] = OldRead(range, fdb.second, fdb.first); timeTravelChecksMemory += memory; } + } else { + self->mismatches++; } self->rowsRead += fdb.first.size(); self->bytesRead += fdb.first.expectedSize();