1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-31 18:19:35 +08:00

Added basic blob granule consistency check

This commit is contained in:
Josh Slocum 2022-03-31 12:36:01 -05:00
parent 268caa5ac8
commit cb918b9cef
8 changed files with 338 additions and 202 deletions

@ -843,6 +843,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BG_MAX_SPLIT_FANOUT, 10 ); if( randomize && BUGGIFY ) BG_MAX_SPLIT_FANOUT = deterministicRandom()->randomInt(5, 15);
init( BG_HOT_SNAPSHOT_VERSIONS, 5000000 );
init( BG_CONSISTENCY_CHECK_ENABLED, true ); if (randomize && BUGGIFY) BG_CONSISTENCY_CHECK_ENABLED = false;
init( BG_CONSISTENCY_CHECK_TARGET_SPEED_KB, 1000 ); if (randomize && BUGGIFY) BG_CONSISTENCY_CHECK_TARGET_SPEED_KB *= (deterministicRandom()->randomInt(2, 50) / 10);
init( BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM, 8 ); if( randomize && BUGGIFY ) BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM = 1;
init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0;
init( BLOB_WORKER_REQUEST_TIMEOUT, 5.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_REQUEST_TIMEOUT = 1.0;

@ -798,6 +798,8 @@ public:
int BG_DELTA_BYTES_BEFORE_COMPACT;
int BG_MAX_SPLIT_FANOUT;
int BG_HOT_SNAPSHOT_VERSIONS;
int BG_CONSISTENCY_CHECK_ENABLED;
int BG_CONSISTENCY_CHECK_TARGET_SPEED_KB;
int BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM;
double BLOB_WORKER_TIMEOUT; // Blob Manager's reaction time to a blob worker failure

@ -0,0 +1,165 @@
/*
* BlobGranuleValidation.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/BlobGranuleValidation.actor.h"
#include "flow/actorcompiler.h" // has to be last include
ACTOR Future<std::pair<RangeResult, Version>> readFromFDB(Database cx, KeyRange range) {
state bool first = true;
state Version v;
state RangeResult out;
state Transaction tr(cx);
state KeyRange currentRange = range;
loop {
try {
state RangeResult r = wait(tr.getRange(currentRange, CLIENT_KNOBS->TOO_MANY));
Version grv = wait(tr.getReadVersion());
// need consistent version snapshot of range
if (first) {
v = grv;
first = false;
} else if (v != grv) {
// reset the range and restart the read at a higher version
first = true;
out = RangeResult();
currentRange = range;
tr.reset();
continue;
}
out.arena().dependsOn(r.arena());
out.append(out.arena(), r.begin(), r.size());
if (r.more) {
currentRange = KeyRangeRef(keyAfter(r.back().key), currentRange.end);
} else {
break;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
return std::pair(out, v);
}
// FIXME: typedef this pair type and/or chunk list
ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>> readFromBlob(
Database cx,
Reference<BackupContainerFileSystem> bstore,
KeyRange range,
Version beginVersion,
Version readVersion) {
state RangeResult out;
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks;
state Transaction tr(cx);
loop {
try {
Standalone<VectorRef<BlobGranuleChunkRef>> chunks_ =
wait(tr.readBlobGranules(range, beginVersion, readVersion));
chunks = chunks_;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
for (const BlobGranuleChunkRef& chunk : chunks) {
RangeResult chunkRows = wait(readBlobGranule(chunk, range, beginVersion, readVersion, bstore));
out.arena().dependsOn(chunkRows.arena());
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
}
return std::pair(out, chunks);
}
bool compareFDBAndBlob(RangeResult fdb,
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob,
KeyRange range,
Version v,
bool debug) {
bool correct = fdb == blob.first;
if (!correct) {
TraceEvent ev(SevError, "GranuleMismatch");
ev.detail("RangeStart", range.begin)
.detail("RangeEnd", range.end)
.detail("Version", v)
.detail("FDBSize", fdb.size())
.detail("BlobSize", blob.first.size());
if (debug) {
fmt::print("\nMismatch for [{0} - {1}) @ {2} ({3}). F({4}) B({5}):\n",
range.begin.printable(),
range.end.printable(),
v,
fdb.size(),
blob.first.size());
Optional<KeyValueRef> lastCorrect;
for (int i = 0; i < std::max(fdb.size(), blob.first.size()); i++) {
if (i >= fdb.size() || i >= blob.first.size() || fdb[i] != blob.first[i]) {
printf(" Found mismatch at %d.\n", i);
if (lastCorrect.present()) {
printf(" last correct: %s=%s\n",
lastCorrect.get().key.printable().c_str(),
lastCorrect.get().value.printable().c_str());
}
if (i < fdb.size()) {
printf(" FDB: %s=%s\n", fdb[i].key.printable().c_str(), fdb[i].value.printable().c_str());
} else {
printf(" FDB: <missing>\n");
}
if (i < blob.first.size()) {
printf(" BLB: %s=%s\n",
blob.first[i].key.printable().c_str(),
blob.first[i].value.printable().c_str());
} else {
printf(" BLB: <missing>\n");
}
printf("\n");
break;
}
if (i < fdb.size()) {
lastCorrect = fdb[i];
} else {
lastCorrect = blob.first[i];
}
}
printf("Chunks:\n");
for (auto& chunk : blob.second) {
printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str());
printf(" SnapshotFile:\n %s\n",
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
}
printf(" Deltas: (%d)", chunk.newDeltas.size());
if (chunk.newDeltas.size() > 0) {
fmt::print(" with version [{0} - {1}]",
chunk.newDeltas[0].version,
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
}
fmt::print(" IncludedVersion: {}\n", chunk.includedVersion);
}
printf("\n");
}
}
return correct;
}

@ -0,0 +1,53 @@
/*
* BlobGranuleValidation.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_BLOBGRANULEVALIDATION_ACTOR_G_H)
#define FDBSERVER_BLOBGRANULEVALIDATION_ACTOR_G_H
#include "fdbserver/BlobGranuleValidation.actor.g.h"
#elif !defined(FDBSERVER_BLOBGRANULEVALIDATION_ACTOR_H)
#define FDBSERVER_BLOBGRANULEVALIDATION_ACTOR_H
#pragma once
#include "flow/flow.h"
#include "fdbclient/BlobGranuleReader.actor.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/BlobGranuleCommon.h"
#include "flow/actorcompiler.h" // has to be last include
/* Contains utility functions for validating blob granule data */
ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>> readFromBlob(
Database cx,
Reference<BackupContainerFileSystem> bstore,
KeyRange range,
Version beginVersion,
Version readVersion);
ACTOR Future<std::pair<RangeResult, Version>> readFromFDB(Database cx, KeyRange range);
bool compareFDBAndBlob(RangeResult fdb,
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob,
KeyRange range,
Version v,
bool debug);
#endif

@ -34,6 +34,7 @@
#include "fdbclient/SystemData.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/BlobGranuleValidation.actor.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/WaitFailure.h"
@ -195,10 +196,11 @@ struct RangeAssignment {
};
// SOMEDAY: track worker's reads/writes eventually
struct BlobWorkerStats {
// FIXME: namespace?
struct BlobWorkerInfo {
int numGranulesAssigned;
BlobWorkerStats(int numGranulesAssigned = 0) : numGranulesAssigned(numGranulesAssigned) {}
BlobWorkerInfo(int numGranulesAssigned = 0) : numGranulesAssigned(numGranulesAssigned) {}
};
struct SplitEvaluation {
@ -218,12 +220,17 @@ struct BlobManagerStats {
Counter granuleSplits;
Counter granuleWriteHotSplits;
Counter ccGranulesChecked;
Counter ccRowsChecked;
Counter ccBytesChecked;
Counter ccMismatches;
Future<Void> logger;
// Current stats maintained for a given blob worker process
explicit BlobManagerStats(UID id, double interval, std::unordered_map<UID, BlobWorkerInterface>* workers)
: cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc),
granuleWriteHotSplits("GranuleWriteHotSplits", cc) {
granuleWriteHotSplits("GranuleWriteHotSplits", cc), ccGranulesChecked("CCGranulesChecked", cc),
ccRowsChecked("CCRowsChecked", cc), ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc) {
specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); });
logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics");
}
@ -241,7 +248,7 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
Reference<BackupContainerFileSystem> bstore;
std::unordered_map<UID, BlobWorkerInterface> workersById;
std::unordered_map<UID, BlobWorkerStats> workerStats; // mapping between workerID -> workerStats
std::unordered_map<UID, BlobWorkerInfo> workerStats; // mapping between workerID -> workerStats
std::unordered_set<NetworkAddress> workerAddresses;
std::unordered_set<UID> deadWorkers;
KeyRangeMap<UID> workerAssignments;
@ -269,6 +276,19 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
: id(id), db(db), dcId(dcId), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &workersById),
knownBlobRanges(false, normalKeys.end), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY),
recruitingStream(0) {}
// only initialize blob store if actually needed
void initBStore() {
if (!bstore.isValid()) {
if (BM_DEBUG) {
fmt::print("BM {} constructing backup container from {}\n", epoch, SERVER_KNOBS->BG_URL.c_str());
}
bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BM_DEBUG) {
fmt::print("BM {} constructed backup container\n", epoch);
}
}
}
};
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<BlobManagerData> bmData,
@ -1519,7 +1539,7 @@ ACTOR Future<Void> checkBlobWorkerList(Reference<BlobManagerData> bmData, Promis
worker.locality.dcId() == bmData->dcId) {
bmData->workerAddresses.insert(worker.stableAddress());
bmData->workersById[worker.id()] = worker;
bmData->workerStats[worker.id()] = BlobWorkerStats();
bmData->workerStats[worker.id()] = BlobWorkerInfo();
bmData->addActor.send(monitorBlobWorker(bmData, worker));
foundAnyNew = true;
} else if (!bmData->workersById.count(worker.id())) {
@ -2022,7 +2042,7 @@ ACTOR Future<Void> initializeBlobWorker(Reference<BlobManagerData> self, Recruit
if (!self->workerAddresses.count(bwi.stableAddress()) && bwi.locality.dcId() == self->dcId) {
self->workerAddresses.insert(bwi.stableAddress());
self->workersById[bwi.id()] = bwi;
self->workerStats[bwi.id()] = BlobWorkerStats();
self->workerStats[bwi.id()] = BlobWorkerInfo();
self->addActor.send(monitorBlobWorker(self, bwi));
} else if (!self->workersById.count(bwi.id())) {
self->addActor.send(killBlobWorker(self, bwi, false));
@ -2554,14 +2574,7 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range
* case that the timer is up before any new prune intents arrive).
*/
ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
// setup bstore
if (BM_DEBUG) {
fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str());
}
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BM_DEBUG) {
printf("BM constructed backup container\n");
}
self->initBStore();
loop {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
@ -2730,6 +2743,73 @@ static void blobManagerExclusionSafetyCheck(Reference<BlobManagerData> self,
req.reply.send(reply);
}
// FIXME: could eventually make this more thorough by storing some state in the DB or something
// FIXME: simpler solution could be to shuffle ranges
ACTOR Future<Void> bgConsistencyCheck(Reference<BlobManagerData> bmData) {
state Reference<IRateControl> rateLimiter =
Reference<IRateControl>(new SpeedLimit(SERVER_KNOBS->BG_CONSISTENCY_CHECK_TARGET_SPEED_KB * 1024, 1));
bmData->initBStore();
if (BM_DEBUG) {
fmt::print("BGCC starting\n");
}
loop {
if (g_network->isSimulated() && g_simulator.speedUpSimulation) {
if (BM_DEBUG) {
printf("BGCC stopping\n");
}
return Void();
}
if (bmData->workersById.size() >= 1) {
int tries = 10;
state KeyRange range;
while (tries > 0) {
auto randomRange = bmData->workerAssignments.randomRange();
if (randomRange.value() != UID()) {
range = randomRange.range();
break;
}
tries--;
}
if (tries == 0) {
if (BM_DEBUG) {
printf("BGCC couldn't find random range to check, skipping\n");
}
wait(rateLimiter->getAllowance(SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES));
} else {
state std::pair<RangeResult, Version> fdbResult = wait(readFromFDB(bmData->db, range));
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blobResult =
wait(readFromBlob(bmData->db, bmData->bstore, range, 0, fdbResult.second));
if (!compareFDBAndBlob(fdbResult.first, blobResult, range, fdbResult.second, BM_DEBUG)) {
++bmData->stats.ccMismatches;
}
int64_t bytesRead = fdbResult.first.expectedSize();
++bmData->stats.ccGranulesChecked;
bmData->stats.ccRowsChecked += fdbResult.first.size();
bmData->stats.ccBytesChecked += bytesRead;
// clear fdb result to release memory since it is a state variable
fdbResult = std::pair(RangeResult(), 0);
wait(rateLimiter->getAllowance(bytesRead));
}
} else {
if (BM_DEBUG) {
fmt::print("BGCC found no workers, skipping\n", bmData->workerAssignments.size());
}
wait(delay(60.0));
}
}
}
// Simulation validation that multiple blob managers aren't started with the same epoch
static std::map<int64_t, UID> managerEpochsSeen;
@ -2776,6 +2856,9 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
self->addActor.send(doLockChecks(self));
self->addActor.send(monitorClientRanges(self));
self->addActor.send(monitorPruneKeys(self));
if (SERVER_KNOBS->BG_CONSISTENCY_CHECK_ENABLED) {
self->addActor.send(bgConsistencyCheck(self));
}
if (BUGGIFY) {
self->addActor.send(chaosRangeMover(self));

@ -7,6 +7,8 @@ set(FDBSERVER_SRCS
BackupWorker.actor.cpp
BlobGranuleServerCommon.actor.cpp
BlobGranuleServerCommon.actor.h
BlobGranuleValidation.actor.cpp
BlobGranuleValidation.actor.h
BlobManager.actor.cpp
BlobManagerInterface.h
BlobWorker.actor.cpp

@ -29,6 +29,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/BlobGranuleValidation.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -271,36 +272,6 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
return Void();
}
// FIXME: typedef this pair type and/or chunk list
ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>> readFromBlob(
Database cx,
BlobGranuleCorrectnessWorkload* self,
KeyRange range,
Version beginVersion,
Version readVersion) {
state RangeResult out;
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks;
state Transaction tr(cx);
loop {
try {
Standalone<VectorRef<BlobGranuleChunkRef>> chunks_ =
wait(tr.readBlobGranules(range, beginVersion, readVersion));
chunks = chunks_;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
for (const BlobGranuleChunkRef& chunk : chunks) {
RangeResult chunkRows = wait(readBlobGranule(chunk, range, beginVersion, readVersion, self->bstore));
out.arena().dependsOn(chunkRows.arena());
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
}
return std::pair(out, chunks);
}
// handle retries + errors
// It's ok to reset the transaction here because its read version is only used for reading the granule mapping from
// the system keyspace
@ -326,7 +297,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
Version rv = wait(self->doGrv(&tr));
state Version readVersion = rv;
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion));
wait(readFromBlob(cx, self->bstore, threadData->directoryRange, 0, readVersion));
fmt::print("Directory {0} got {1} RV {2}\n",
threadData->directoryID,
doSetup ? "initial" : "final",
@ -690,7 +661,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
wait(self->readFromBlob(cx, self, range, beginVersion, readVersion));
wait(readFromBlob(cx, self->bstore, range, beginVersion, readVersion));
self->validateResult(threadData, blob, startKey, endKey, beginVersion, readVersion);
int resultBytes = blob.first.expectedSize();
@ -884,7 +855,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
fmt::print("Directory {0} doing final data check @ {1}\n", threadData->directoryID, readVersion);
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion));
wait(readFromBlob(cx, self->bstore, threadData->directoryRange, 0, readVersion));
result = self->validateResult(threadData, blob, 0, std::numeric_limits<uint32_t>::max(), 0, readVersion);
finalRowsValidated = blob.first.size();

@ -28,6 +28,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/BlobGranuleValidation.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -167,154 +168,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
}
}
// assumes we can read the whole range in one transaction at a single version
ACTOR Future<std::pair<RangeResult, Version>> readFromFDB(Database cx, KeyRange range) {
state bool first = true;
state Version v;
state RangeResult out;
state Transaction tr(cx);
state KeyRange currentRange = range;
loop {
try {
state RangeResult r = wait(tr.getRange(currentRange, CLIENT_KNOBS->TOO_MANY));
Version grv = wait(tr.getReadVersion());
// need consistent version snapshot of range
if (first) {
v = grv;
first = false;
} else if (v != grv) {
// reset the range and restart the read at a higher version
TraceEvent(SevDebug, "BGVFDBReadReset").detail("ReadVersion", v);
TEST(true); // BGV transaction reset
fmt::print("Resetting BGV GRV {0} -> {1}\n", v, grv);
first = true;
out = RangeResult();
currentRange = range;
tr.reset();
continue;
}
out.arena().dependsOn(r.arena());
out.append(out.arena(), r.begin(), r.size());
if (r.more) {
currentRange = KeyRangeRef(keyAfter(r.back().key), currentRange.end);
} else {
break;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
return std::pair(out, v);
}
// FIXME: typedef this pair type and/or chunk list
ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>>
readFromBlob(Database cx, BlobGranuleVerifierWorkload* self, KeyRange range, Version version) {
state RangeResult out;
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks;
state Transaction tr(cx);
loop {
try {
Standalone<VectorRef<BlobGranuleChunkRef>> chunks_ = wait(tr.readBlobGranules(range, 0, version));
chunks = chunks_;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
for (const BlobGranuleChunkRef& chunk : chunks) {
RangeResult chunkRows = wait(readBlobGranule(chunk, range, 0, version, self->bstore));
out.arena().dependsOn(chunkRows.arena());
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
}
return std::pair(out, chunks);
}
bool compareResult(RangeResult fdb,
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob,
KeyRange range,
Version v,
bool initialRequest) {
bool correct = fdb == blob.first;
if (!correct) {
mismatches++;
TraceEvent ev(SevError, "GranuleMismatch");
ev.detail("RangeStart", range.begin)
.detail("RangeEnd", range.end)
.detail("Version", v)
.detail("RequestType", initialRequest ? "RealTime" : "TimeTravel")
.detail("FDBSize", fdb.size())
.detail("BlobSize", blob.first.size());
if (BGV_DEBUG) {
fmt::print("\nMismatch for [{0} - {1}) @ {2} ({3}). F({4}) B({5}):\n",
range.begin.printable(),
range.end.printable(),
v,
initialRequest ? "RealTime" : "TimeTravel",
fdb.size(),
blob.first.size());
Optional<KeyValueRef> lastCorrect;
for (int i = 0; i < std::max(fdb.size(), blob.first.size()); i++) {
if (i >= fdb.size() || i >= blob.first.size() || fdb[i] != blob.first[i]) {
printf(" Found mismatch at %d.\n", i);
if (lastCorrect.present()) {
printf(" last correct: %s=%s\n",
lastCorrect.get().key.printable().c_str(),
lastCorrect.get().value.printable().c_str());
}
if (i < fdb.size()) {
printf(
" FDB: %s=%s\n", fdb[i].key.printable().c_str(), fdb[i].value.printable().c_str());
} else {
printf(" FDB: <missing>\n");
}
if (i < blob.first.size()) {
printf(" BLB: %s=%s\n",
blob.first[i].key.printable().c_str(),
blob.first[i].value.printable().c_str());
} else {
printf(" BLB: <missing>\n");
}
printf("\n");
break;
}
if (i < fdb.size()) {
lastCorrect = fdb[i];
} else {
lastCorrect = blob.first[i];
}
}
printf("Chunks:\n");
for (auto& chunk : blob.second) {
printf("[%s - %s)\n",
chunk.keyRange.begin.printable().c_str(),
chunk.keyRange.end.printable().c_str());
printf(" SnapshotFile:\n %s\n",
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
}
printf(" Deltas: (%d)", chunk.newDeltas.size());
if (chunk.newDeltas.size() > 0) {
fmt::print(" with version [{0} - {1}]",
chunk.newDeltas[0].version,
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
}
fmt::print(" IncludedVersion: {}\n", chunk.includedVersion);
}
printf("\n");
}
}
return correct;
}
struct OldRead {
KeyRange range;
Version v;
@ -469,21 +322,23 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
}
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v));
self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false);
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, oldRead.v));
if (!compareFDBAndBlob(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, BGV_DEBUG)) {
self->mismatches++;
}
self->timeTravelReads++;
if (doPruning) {
wait(self->killBlobWorkers(cx, self));
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(self->readFromBlob(cx, self, oldRead.range, prevPruneVersion));
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPruneVersion));
try {
Version minSnapshotVersion = newPruneVersion;
for (auto& it : versionRead.second) {
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(self->readFromBlob(cx, self, oldRead.range, minSnapshotVersion - 1));
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, minSnapshotVersion - 1));
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
@ -504,10 +359,10 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
int rIndex = deterministicRandom()->randomInt(0, self->granuleRanges.get().size());
state KeyRange range = self->granuleRanges.get()[rIndex];
state std::pair<RangeResult, Version> fdb = wait(self->readFromFDB(cx, range));
state std::pair<RangeResult, Version> fdb = wait(readFromFDB(cx, range));
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
wait(self->readFromBlob(cx, self, range, fdb.second));
if (self->compareResult(fdb.first, blob, range, fdb.second, true)) {
wait(readFromBlob(cx, self->bstore, range, 0, fdb.second));
if (compareFDBAndBlob(fdb.first, blob, range, fdb.second, BGV_DEBUG)) {
// TODO: bias for immediately re-reading to catch rollback cases
double reReadTime = currentTime + deterministicRandom()->random01() * self->timeTravelLimit;
int memory = fdb.first.expectedSize();
@ -516,6 +371,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
timeTravelChecks[reReadTime] = OldRead(range, fdb.second, fdb.first);
timeTravelChecksMemory += memory;
}
} else {
self->mismatches++;
}
self->rowsRead += fdb.first.size();
self->bytesRead += fdb.first.expectedSize();