mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 18:02:31 +08:00
Finished implementing beginVersion
This commit is contained in:
parent
1b1182f414
commit
989dd8d7eb
@ -119,29 +119,50 @@ static void applyDelta(KeyRangeRef keyRange, MutationRef m, std::map<KeyRef, Val
|
|||||||
|
|
||||||
static void applyDeltas(const GranuleDeltas& deltas,
|
static void applyDeltas(const GranuleDeltas& deltas,
|
||||||
KeyRangeRef keyRange,
|
KeyRangeRef keyRange,
|
||||||
|
Version beginVersion,
|
||||||
Version readVersion,
|
Version readVersion,
|
||||||
Version& lastFileEndVersion,
|
Version& lastFileEndVersion,
|
||||||
std::map<KeyRef, ValueRef>& dataMap) {
|
std::map<KeyRef, ValueRef>& dataMap) {
|
||||||
if (!deltas.empty()) {
|
if (deltas.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
// check that consecutive delta file versions are disjoint
|
// check that consecutive delta file versions are disjoint
|
||||||
ASSERT(lastFileEndVersion < deltas.front().version);
|
ASSERT(lastFileEndVersion < deltas.front().version);
|
||||||
|
|
||||||
|
const MutationsAndVersionRef* mutationIt = deltas.begin();
|
||||||
|
// prune beginVersion if necessary
|
||||||
|
if (beginVersion > deltas.front().version) {
|
||||||
|
if (beginVersion > deltas.back().version) {
|
||||||
|
printf("beginVersion=%lld, deltas.front=%lld, deltas.back=%lld, deltas.size=%d\n",
|
||||||
|
beginVersion,
|
||||||
|
deltas.front().version,
|
||||||
|
deltas.back().version,
|
||||||
|
deltas.size());
|
||||||
}
|
}
|
||||||
for (const MutationsAndVersionRef& delta : deltas) {
|
ASSERT(beginVersion <= deltas.back().version);
|
||||||
if (delta.version > readVersion) {
|
// binary search for beginVersion
|
||||||
|
mutationIt = std::lower_bound(deltas.begin(),
|
||||||
|
deltas.end(),
|
||||||
|
MutationsAndVersionRef(beginVersion, 0),
|
||||||
|
MutationsAndVersionRef::OrderByVersion());
|
||||||
|
}
|
||||||
|
|
||||||
|
while (mutationIt != deltas.end()) {
|
||||||
|
if (mutationIt->version > readVersion) {
|
||||||
lastFileEndVersion = readVersion;
|
lastFileEndVersion = readVersion;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (auto& m : delta.mutations) {
|
for (auto& m : mutationIt->mutations) {
|
||||||
applyDelta(keyRange, m, dataMap);
|
applyDelta(keyRange, m, dataMap);
|
||||||
}
|
}
|
||||||
|
mutationIt++;
|
||||||
}
|
}
|
||||||
if (!deltas.empty()) {
|
|
||||||
lastFileEndVersion = deltas.back().version;
|
lastFileEndVersion = deltas.back().version;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
static Arena loadDeltaFile(StringRef deltaData,
|
static Arena loadDeltaFile(StringRef deltaData,
|
||||||
KeyRangeRef keyRange,
|
KeyRangeRef keyRange,
|
||||||
|
Version beginVersion,
|
||||||
Version readVersion,
|
Version readVersion,
|
||||||
Version& lastFileEndVersion,
|
Version& lastFileEndVersion,
|
||||||
std::map<KeyRef, ValueRef>& dataMap) {
|
std::map<KeyRef, ValueRef>& dataMap) {
|
||||||
@ -163,19 +184,18 @@ static Arena loadDeltaFile(StringRef deltaData,
|
|||||||
ASSERT(deltas[i].version <= deltas[i + 1].version);
|
ASSERT(deltas[i].version <= deltas[i + 1].version);
|
||||||
}
|
}
|
||||||
|
|
||||||
applyDeltas(deltas, keyRange, readVersion, lastFileEndVersion, dataMap);
|
applyDeltas(deltas, keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
|
||||||
return parseArena;
|
return parseArena;
|
||||||
}
|
}
|
||||||
|
|
||||||
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
|
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
|
||||||
KeyRangeRef keyRange,
|
KeyRangeRef keyRange,
|
||||||
|
Version beginVersion,
|
||||||
Version readVersion,
|
Version readVersion,
|
||||||
Optional<StringRef> snapshotData,
|
Optional<StringRef> snapshotData,
|
||||||
StringRef deltaFileData[]) {
|
StringRef deltaFileData[]) {
|
||||||
// TODO REMOVE with early replying
|
// TODO REMOVE with early replying
|
||||||
ASSERT(readVersion == chunk.includedVersion);
|
ASSERT(readVersion == chunk.includedVersion);
|
||||||
ASSERT(chunk.snapshotFile.present());
|
|
||||||
ASSERT(snapshotData.present());
|
|
||||||
|
|
||||||
// Arena to hold all allocations for applying deltas. Most of it, and the arenas produced by reading the files,
|
// Arena to hold all allocations for applying deltas. Most of it, and the arenas produced by reading the files,
|
||||||
// will likely be tossed if there are a significant number of mutations, so we copy at the end instead of doing a
|
// will likely be tossed if there are a significant number of mutations, so we copy at the end instead of doing a
|
||||||
@ -195,13 +215,14 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
|
|||||||
fmt::print("Applying {} delta files\n", chunk.deltaFiles.size());
|
fmt::print("Applying {} delta files\n", chunk.deltaFiles.size());
|
||||||
}
|
}
|
||||||
for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) {
|
for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) {
|
||||||
Arena deltaArena = loadDeltaFile(deltaFileData[deltaIdx], keyRange, readVersion, lastFileEndVersion, dataMap);
|
Arena deltaArena =
|
||||||
|
loadDeltaFile(deltaFileData[deltaIdx], keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
|
||||||
arena.dependsOn(deltaArena);
|
arena.dependsOn(deltaArena);
|
||||||
}
|
}
|
||||||
if (BG_READ_DEBUG) {
|
if (BG_READ_DEBUG) {
|
||||||
fmt::print("Applying {} memory deltas\n", chunk.newDeltas.size());
|
fmt::print("Applying {} memory deltas\n", chunk.newDeltas.size());
|
||||||
}
|
}
|
||||||
applyDeltas(chunk.newDeltas, keyRange, readVersion, lastFileEndVersion, dataMap);
|
applyDeltas(chunk.newDeltas, keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
|
||||||
|
|
||||||
RangeResult ret;
|
RangeResult ret;
|
||||||
for (auto& it : dataMap) {
|
for (auto& it : dataMap) {
|
||||||
@ -262,7 +283,7 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
|
|||||||
}
|
}
|
||||||
|
|
||||||
// materialize rows from chunk
|
// materialize rows from chunk
|
||||||
chunkRows = materializeBlobGranule(chunk, keyRange, readVersion, snapshotData, deltaData);
|
chunkRows = materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData);
|
||||||
|
|
||||||
results.arena().dependsOn(chunkRows.arena());
|
results.arena().dependsOn(chunkRows.arena());
|
||||||
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
|
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
|
||||||
|
@ -33,6 +33,7 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
|
|||||||
|
|
||||||
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
|
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
|
||||||
KeyRangeRef keyRange,
|
KeyRangeRef keyRange,
|
||||||
|
Version beginVersion,
|
||||||
Version readVersion,
|
Version readVersion,
|
||||||
Optional<StringRef> snapshotData,
|
Optional<StringRef> snapshotData,
|
||||||
StringRef deltaFileData[]);
|
StringRef deltaFileData[]);
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#include "fdbclient/BlobGranuleReader.actor.h"
|
#include "fdbclient/BlobGranuleReader.actor.h"
|
||||||
#include "fdbclient/BlobWorkerCommon.h"
|
#include "fdbclient/BlobWorkerCommon.h"
|
||||||
#include "fdbclient/BlobWorkerInterface.h"
|
#include "fdbclient/BlobWorkerInterface.h"
|
||||||
|
#include "fdbclient/FDBTypes.h"
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
// TODO more efficient data structure besides std::map? PTree is unnecessary since this isn't versioned, but some other
|
// TODO more efficient data structure besides std::map? PTree is unnecessary since this isn't versioned, but some other
|
||||||
@ -63,22 +64,25 @@ ACTOR Future<Standalone<StringRef>> readFile(Reference<BackupContainerFileSystem
|
|||||||
// sub-functions that BlobGranuleFiles actually exposes?
|
// sub-functions that BlobGranuleFiles actually exposes?
|
||||||
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
|
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
|
||||||
KeyRangeRef keyRange,
|
KeyRangeRef keyRange,
|
||||||
|
Version beginVersion,
|
||||||
Version readVersion,
|
Version readVersion,
|
||||||
Reference<BackupContainerFileSystem> bstore,
|
Reference<BackupContainerFileSystem> bstore,
|
||||||
Optional<BlobWorkerStats*> stats) {
|
Optional<BlobWorkerStats*> stats) {
|
||||||
|
|
||||||
// TODO REMOVE with early replying
|
// TODO REMOVE with early replying
|
||||||
ASSERT(readVersion == chunk.includedVersion);
|
ASSERT(readVersion == chunk.includedVersion);
|
||||||
ASSERT(chunk.snapshotFile.present());
|
|
||||||
|
|
||||||
state Arena arena;
|
state Arena arena;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Future<Standalone<StringRef>> readSnapshotFuture = readFile(bstore, chunk.snapshotFile.get());
|
Future<Standalone<StringRef>> readSnapshotFuture;
|
||||||
state std::vector<Future<Standalone<StringRef>>> readDeltaFutures;
|
if (chunk.snapshotFile.present()) {
|
||||||
|
readSnapshotFuture = readFile(bstore, chunk.snapshotFile.get());
|
||||||
if (stats.present()) {
|
if (stats.present()) {
|
||||||
++stats.get()->s3GetReqs;
|
++stats.get()->s3GetReqs;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
state std::vector<Future<Standalone<StringRef>>> readDeltaFutures;
|
||||||
|
|
||||||
readDeltaFutures.reserve(chunk.deltaFiles.size());
|
readDeltaFutures.reserve(chunk.deltaFiles.size());
|
||||||
for (BlobFilePointerRef deltaFile : chunk.deltaFiles) {
|
for (BlobFilePointerRef deltaFile : chunk.deltaFiles) {
|
||||||
@ -88,8 +92,12 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
state Standalone<StringRef> snapshotData = wait(readSnapshotFuture);
|
state Optional<StringRef> snapshotData; // not present if snapshotFile isn't present
|
||||||
arena.dependsOn(snapshotData.arena());
|
if (chunk.snapshotFile.present()) {
|
||||||
|
state Standalone<StringRef> s = wait(readSnapshotFuture);
|
||||||
|
arena.dependsOn(s.arena());
|
||||||
|
snapshotData = s;
|
||||||
|
}
|
||||||
|
|
||||||
state int numDeltaFiles = chunk.deltaFiles.size();
|
state int numDeltaFiles = chunk.deltaFiles.size();
|
||||||
state StringRef* deltaData = new (arena) StringRef[numDeltaFiles];
|
state StringRef* deltaData = new (arena) StringRef[numDeltaFiles];
|
||||||
@ -102,7 +110,7 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
|
|||||||
arena.dependsOn(data.arena());
|
arena.dependsOn(data.arena());
|
||||||
}
|
}
|
||||||
|
|
||||||
return materializeBlobGranule(chunk, keyRange, readVersion, snapshotData, deltaData);
|
return materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData);
|
||||||
|
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
throw e;
|
throw e;
|
||||||
@ -119,8 +127,8 @@ ACTOR Future<Void> readBlobGranules(BlobGranuleFileRequest request,
|
|||||||
try {
|
try {
|
||||||
state int i;
|
state int i;
|
||||||
for (i = 0; i < reply.chunks.size(); i++) {
|
for (i = 0; i < reply.chunks.size(); i++) {
|
||||||
RangeResult chunkResult =
|
RangeResult chunkResult = wait(
|
||||||
wait(readBlobGranule(reply.chunks[i], request.keyRange, request.readVersion, bstore));
|
readBlobGranule(reply.chunks[i], request.keyRange, request.beginVersion, request.readVersion, bstore));
|
||||||
results.send(std::move(chunkResult));
|
results.send(std::move(chunkResult));
|
||||||
}
|
}
|
||||||
results.sendError(end_of_stream());
|
results.sendError(end_of_stream());
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
// the request
|
// the request
|
||||||
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
|
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
|
||||||
KeyRangeRef keyRange,
|
KeyRangeRef keyRange,
|
||||||
|
Version beginVersion,
|
||||||
Version readVersion,
|
Version readVersion,
|
||||||
Reference<BackupContainerFileSystem> bstore,
|
Reference<BackupContainerFileSystem> bstore,
|
||||||
Optional<BlobWorkerStats*> stats = Optional<BlobWorkerStats*>());
|
Optional<BlobWorkerStats*> stats = Optional<BlobWorkerStats*>());
|
||||||
|
@ -7442,6 +7442,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
|
|||||||
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
|
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
|
||||||
req.beginVersion = begin;
|
req.beginVersion = begin;
|
||||||
req.readVersion = rv;
|
req.readVersion = rv;
|
||||||
|
req.canCollapseBegin = true; // TODO make this a parameter once we support it
|
||||||
|
|
||||||
std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>> v;
|
std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>> v;
|
||||||
v.push_back(
|
v.push_back(
|
||||||
|
@ -834,7 +834,7 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
|
|||||||
rowsStream,
|
rowsStream,
|
||||||
false);
|
false);
|
||||||
RangeResult newGranule =
|
RangeResult newGranule =
|
||||||
wait(readBlobGranule(chunk, metadata->keyRange, version, bwData->bstore, &bwData->stats));
|
wait(readBlobGranule(chunk, metadata->keyRange, 0, version, bwData->bstore, &bwData->stats));
|
||||||
|
|
||||||
bwData->stats.bytesReadFromS3ForCompaction += compactBytesRead;
|
bwData->stats.bytesReadFromS3ForCompaction += compactBytesRead;
|
||||||
rowsStream.send(std::move(newGranule));
|
rowsStream.send(std::move(newGranule));
|
||||||
@ -2095,11 +2095,16 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
|
|||||||
|
|
||||||
ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, BlobGranuleFileRequest req) {
|
ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, BlobGranuleFileRequest req) {
|
||||||
if (BW_REQUEST_DEBUG) {
|
if (BW_REQUEST_DEBUG) {
|
||||||
fmt::print("BW {0} processing blobGranuleFileRequest for range [{1} - {2}) @ {3}\n",
|
fmt::print("BW {0} processing blobGranuleFileRequest for range [{1} - {2}) @ ",
|
||||||
bwData->id.toString(),
|
bwData->id.toString(),
|
||||||
req.keyRange.begin.printable(),
|
req.keyRange.begin.printable(),
|
||||||
req.keyRange.end.printable(),
|
req.keyRange.end.printable(),
|
||||||
req.readVersion);
|
req.readVersion);
|
||||||
|
if (req.beginVersion > 0) {
|
||||||
|
fmt::print("{0} - {1}\n", req.beginVersion, req.readVersion);
|
||||||
|
} else {
|
||||||
|
fmt::print("{}", req.readVersion);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
state bool didCollapse = false;
|
state bool didCollapse = false;
|
||||||
@ -2298,7 +2303,8 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||||||
ASSERT(metadata->cancelled.canBeSet());
|
ASSERT(metadata->cancelled.canBeSet());
|
||||||
|
|
||||||
// Right now we force a collapse if the version range crosses granule boundaries, for simplicity
|
// Right now we force a collapse if the version range crosses granule boundaries, for simplicity
|
||||||
if (chunkFiles.snapshotFiles.front().version < granuleBeginVersion) {
|
if (granuleBeginVersion <= chunkFiles.snapshotFiles.front().version) {
|
||||||
|
TEST(true); // collapsed begin version request because of boundaries
|
||||||
didCollapse = true;
|
didCollapse = true;
|
||||||
granuleBeginVersion = 0;
|
granuleBeginVersion = 0;
|
||||||
}
|
}
|
||||||
@ -2309,13 +2315,14 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||||||
|
|
||||||
chunkFiles.getFiles(granuleBeginVersion, req.readVersion, req.canCollapseBegin, chunk, rep.arena);
|
chunkFiles.getFiles(granuleBeginVersion, req.readVersion, req.canCollapseBegin, chunk, rep.arena);
|
||||||
if (granuleBeginVersion > 0 && chunk.snapshotFile.present()) {
|
if (granuleBeginVersion > 0 && chunk.snapshotFile.present()) {
|
||||||
|
TEST(true); // collapsed begin version request for efficiency
|
||||||
didCollapse = true;
|
didCollapse = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// new deltas (if version is larger than version of last delta file)
|
// new deltas (if version is larger than version of last delta file)
|
||||||
// FIXME: do trivial key bounds here if key range is not fully contained in request key
|
// FIXME: do trivial key bounds here if key range is not fully contained in request key
|
||||||
// range
|
// range
|
||||||
if (req.readVersion > metadata->durableDeltaVersion.get() && metadata->currentDeltas.size()) {
|
if (req.readVersion > metadata->durableDeltaVersion.get() && !metadata->currentDeltas.empty()) {
|
||||||
if (metadata->durableDeltaVersion.get() != metadata->pendingDeltaVersion) {
|
if (metadata->durableDeltaVersion.get() != metadata->pendingDeltaVersion) {
|
||||||
fmt::print("real-time read [{0} - {1}) @ {2} doesn't have mutations!! durable={3}, pending={4}\n",
|
fmt::print("real-time read [{0} - {1}) @ {2} doesn't have mutations!! durable={3}, pending={4}\n",
|
||||||
metadata->keyRange.begin.printable(),
|
metadata->keyRange.begin.printable(),
|
||||||
@ -2327,6 +2334,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||||||
|
|
||||||
// prune mutations based on begin version, if possible
|
// prune mutations based on begin version, if possible
|
||||||
ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion);
|
ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion);
|
||||||
|
// FIXME: I think we can remove this dependsOn since we are doing push_back_deep
|
||||||
rep.arena.dependsOn(metadata->currentDeltas.arena());
|
rep.arena.dependsOn(metadata->currentDeltas.arena());
|
||||||
MutationsAndVersionRef* mutationIt = metadata->currentDeltas.begin();
|
MutationsAndVersionRef* mutationIt = metadata->currentDeltas.begin();
|
||||||
if (granuleBeginVersion > metadata->currentDeltas.back().version) {
|
if (granuleBeginVersion > metadata->currentDeltas.back().version) {
|
||||||
|
@ -272,15 +272,20 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: typedef this pair type and/or chunk list
|
// FIXME: typedef this pair type and/or chunk list
|
||||||
ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>>
|
ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>> readFromBlob(
|
||||||
readFromBlob(Database cx, BlobGranuleCorrectnessWorkload* self, KeyRange range, Version version) {
|
Database cx,
|
||||||
|
BlobGranuleCorrectnessWorkload* self,
|
||||||
|
KeyRange range,
|
||||||
|
Version beginVersion,
|
||||||
|
Version readVersion) {
|
||||||
state RangeResult out;
|
state RangeResult out;
|
||||||
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks;
|
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks;
|
||||||
state Transaction tr(cx);
|
state Transaction tr(cx);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
Standalone<VectorRef<BlobGranuleChunkRef>> chunks_ = wait(tr.readBlobGranules(range, 0, version));
|
Standalone<VectorRef<BlobGranuleChunkRef>> chunks_ =
|
||||||
|
wait(tr.readBlobGranules(range, beginVersion, readVersion));
|
||||||
chunks = chunks_;
|
chunks = chunks_;
|
||||||
break;
|
break;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
@ -289,7 +294,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const BlobGranuleChunkRef& chunk : chunks) {
|
for (const BlobGranuleChunkRef& chunk : chunks) {
|
||||||
RangeResult chunkRows = wait(readBlobGranule(chunk, range, version, self->bstore));
|
RangeResult chunkRows = wait(readBlobGranule(chunk, range, beginVersion, readVersion, self->bstore));
|
||||||
out.arena().dependsOn(chunkRows.arena());
|
out.arena().dependsOn(chunkRows.arena());
|
||||||
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
|
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
|
||||||
}
|
}
|
||||||
@ -321,7 +326,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
Version rv = wait(self->doGrv(&tr));
|
Version rv = wait(self->doGrv(&tr));
|
||||||
state Version readVersion = rv;
|
state Version readVersion = rv;
|
||||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
|
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
|
||||||
wait(self->readFromBlob(cx, self, threadData->directoryRange, readVersion));
|
wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion));
|
||||||
fmt::print("Directory {0} got {1} RV {2}\n",
|
fmt::print("Directory {0} got {1} RV {2}\n",
|
||||||
threadData->directoryID,
|
threadData->directoryID,
|
||||||
doSetup ? "initial" : "final",
|
doSetup ? "initial" : "final",
|
||||||
@ -349,6 +354,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
const Optional<Value>& blobValue,
|
const Optional<Value>& blobValue,
|
||||||
uint32_t startKey,
|
uint32_t startKey,
|
||||||
uint32_t endKey,
|
uint32_t endKey,
|
||||||
|
Version beginVersion,
|
||||||
Version readVersion,
|
Version readVersion,
|
||||||
const std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>& blob) {
|
const std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>& blob) {
|
||||||
threadData->mismatches++;
|
threadData->mismatches++;
|
||||||
@ -360,11 +366,13 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
ev.detail("DirectoryID", format("%08x", threadData->directoryID))
|
ev.detail("DirectoryID", format("%08x", threadData->directoryID))
|
||||||
.detail("RangeStart", format("%08x", startKey))
|
.detail("RangeStart", format("%08x", startKey))
|
||||||
.detail("RangeEnd", format("%08x", endKey))
|
.detail("RangeEnd", format("%08x", endKey))
|
||||||
|
.detail("BeginVersion", beginVersion)
|
||||||
.detail("Version", readVersion);
|
.detail("Version", readVersion);
|
||||||
fmt::print("Found mismatch! Request for dir {0} [{1} - {2}) @ {3}\n",
|
fmt::print("Found mismatch! Request for dir {0} [{1} - {2}) @ {3} - {4}\n",
|
||||||
format("%08x", threadData->directoryID),
|
format("%08x", threadData->directoryID),
|
||||||
format("%08x", startKey),
|
format("%08x", startKey),
|
||||||
format("%08x", endKey),
|
format("%08x", endKey),
|
||||||
|
beginVersion,
|
||||||
readVersion);
|
readVersion);
|
||||||
if (lastMatching.present()) {
|
if (lastMatching.present()) {
|
||||||
fmt::print(" last correct: {}\n", lastMatching.get().printable());
|
fmt::print(" last correct: {}\n", lastMatching.get().printable());
|
||||||
@ -456,6 +464,29 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
readVersion);
|
readVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// because each chunk could be separately collapsed or not if we set beginVersion, we have to track it by chunk
|
||||||
|
KeyRangeMap<Version> beginVersionByChunk;
|
||||||
|
beginVersionByChunk.insert(normalKeys, 0);
|
||||||
|
int beginCollapsed = 0;
|
||||||
|
int beginNotCollapsed = 0;
|
||||||
|
for (auto& chunk : blob.second) {
|
||||||
|
if (!chunk.snapshotFile.present()) {
|
||||||
|
ASSERT(beginVersion > 0);
|
||||||
|
ASSERT(chunk.snapshotVersion == invalidVersion);
|
||||||
|
beginCollapsed++;
|
||||||
|
beginVersionByChunk.insert(chunk.keyRange, beginVersion);
|
||||||
|
} else {
|
||||||
|
ASSERT(chunk.snapshotVersion != invalidVersion);
|
||||||
|
if (beginVersion > 0) {
|
||||||
|
beginNotCollapsed++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TEST(beginCollapsed > 0); // BGCorrectness got collapsed request with beginVersion > 0
|
||||||
|
TEST(beginNotCollapsed > 0); // BGCorrectness got un-collapsed request with beginVersion > 0
|
||||||
|
TEST(beginCollapsed > 0 &&
|
||||||
|
beginNotCollapsed > 0); // BGCorrectness got both collapsed and uncollapsed in the same request!
|
||||||
|
|
||||||
while (checkIt != threadData->keyData.end() && checkIt->first < endKeyExclusive) {
|
while (checkIt != threadData->keyData.end() && checkIt->first < endKeyExclusive) {
|
||||||
uint32_t key = checkIt->first;
|
uint32_t key = checkIt->first;
|
||||||
if (DEBUG_READ_OP(threadData->directoryID, readVersion)) {
|
if (DEBUG_READ_OP(threadData->directoryID, readVersion)) {
|
||||||
@ -475,6 +506,16 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
for (; idIdx < checkIt->second.writes.size() && checkIt->second.writes[idIdx].writeVersion <= readVersion;
|
for (; idIdx < checkIt->second.writes.size() && checkIt->second.writes[idIdx].writeVersion <= readVersion;
|
||||||
idIdx++) {
|
idIdx++) {
|
||||||
Key nextKeyShouldBe = threadData->getKey(key, idIdx);
|
Key nextKeyShouldBe = threadData->getKey(key, idIdx);
|
||||||
|
Version keyBeginVersion = beginVersionByChunk.rangeContaining(nextKeyShouldBe).cvalue();
|
||||||
|
if (keyBeginVersion > checkIt->second.writes[idIdx].writeVersion) {
|
||||||
|
if (DEBUG_READ_OP(threadData->directoryID, readVersion)) {
|
||||||
|
fmt::print("DBG READ: Skip ID {0} written @ {1} < beginVersion {2}\n",
|
||||||
|
idIdx,
|
||||||
|
checkIt->second.writes[idIdx].clearVersion,
|
||||||
|
keyBeginVersion);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (DEBUG_READ_OP(threadData->directoryID, readVersion)) {
|
if (DEBUG_READ_OP(threadData->directoryID, readVersion)) {
|
||||||
fmt::print("DBG READ: Checking ID {0} ({1}) written @ {2}\n",
|
fmt::print("DBG READ: Checking ID {0} ({1}) written @ {2}\n",
|
||||||
format("%08x", idIdx),
|
format("%08x", idIdx),
|
||||||
@ -491,6 +532,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
Optional<Value>(),
|
Optional<Value>(),
|
||||||
startKeyInclusive,
|
startKeyInclusive,
|
||||||
endKeyExclusive,
|
endKeyExclusive,
|
||||||
|
beginVersion,
|
||||||
readVersion,
|
readVersion,
|
||||||
blob);
|
blob);
|
||||||
return false;
|
return false;
|
||||||
@ -509,6 +551,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
Optional<Value>(),
|
Optional<Value>(),
|
||||||
startKeyInclusive,
|
startKeyInclusive,
|
||||||
endKeyExclusive,
|
endKeyExclusive,
|
||||||
|
beginVersion,
|
||||||
readVersion,
|
readVersion,
|
||||||
blob);
|
blob);
|
||||||
return false;
|
return false;
|
||||||
@ -523,6 +566,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
blob.first[resultIdx].value,
|
blob.first[resultIdx].value,
|
||||||
startKeyInclusive,
|
startKeyInclusive,
|
||||||
endKeyExclusive,
|
endKeyExclusive,
|
||||||
|
beginVersion,
|
||||||
readVersion,
|
readVersion,
|
||||||
blob);
|
blob);
|
||||||
return false;
|
return false;
|
||||||
@ -545,6 +589,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
Optional<Value>(),
|
Optional<Value>(),
|
||||||
startKeyInclusive,
|
startKeyInclusive,
|
||||||
endKeyExclusive,
|
endKeyExclusive,
|
||||||
|
beginVersion,
|
||||||
readVersion,
|
readVersion,
|
||||||
blob);
|
blob);
|
||||||
return false;
|
return false;
|
||||||
@ -565,6 +610,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
state double targetReadBytesPerSec = threadData->targetByteRate * 4;
|
state double targetReadBytesPerSec = threadData->targetByteRate * 4;
|
||||||
ASSERT(targetReadBytesPerSec > 0);
|
ASSERT(targetReadBytesPerSec > 0);
|
||||||
|
|
||||||
|
state Version beginVersion;
|
||||||
state Version readVersion;
|
state Version readVersion;
|
||||||
|
|
||||||
TraceEvent("BlobGranuleCorrectnessReaderStart").log();
|
TraceEvent("BlobGranuleCorrectnessReaderStart").log();
|
||||||
@ -610,26 +656,42 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
state KeyRange range = KeyRangeRef(threadData->getKey(startKey, 0), threadData->getKey(endKey, 0));
|
state KeyRange range = KeyRangeRef(threadData->getKey(startKey, 0), threadData->getKey(endKey, 0));
|
||||||
|
|
||||||
// pick read version
|
// pick read version
|
||||||
// TODO could also pick begin version here
|
|
||||||
ASSERT(threadData->writeVersions.back() >= threadData->minSuccessfulReadVersion);
|
ASSERT(threadData->writeVersions.back() >= threadData->minSuccessfulReadVersion);
|
||||||
|
size_t readVersionIdx;
|
||||||
// randomly choose up to date vs time travel read
|
// randomly choose up to date vs time travel read
|
||||||
if (deterministicRandom()->random01() < 0.5) {
|
if (deterministicRandom()->random01() < 0.5) {
|
||||||
threadData->reads++;
|
threadData->reads++;
|
||||||
|
readVersionIdx = threadData->writeVersions.size() - 1;
|
||||||
readVersion = threadData->writeVersions.back();
|
readVersion = threadData->writeVersions.back();
|
||||||
} else {
|
} else {
|
||||||
threadData->timeTravelReads++;
|
threadData->timeTravelReads++;
|
||||||
|
size_t startIdx = 0;
|
||||||
loop {
|
loop {
|
||||||
int readVersionIdx = deterministicRandom()->randomInt(0, threadData->writeVersions.size());
|
readVersionIdx = deterministicRandom()->randomInt(startIdx, threadData->writeVersions.size());
|
||||||
readVersion = threadData->writeVersions[readVersionIdx];
|
readVersion = threadData->writeVersions[readVersionIdx];
|
||||||
if (readVersion >= threadData->minSuccessfulReadVersion) {
|
if (readVersion >= threadData->minSuccessfulReadVersion) {
|
||||||
break;
|
break;
|
||||||
|
} else {
|
||||||
|
startIdx = readVersionIdx + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// randomly choose begin version or not
|
||||||
|
beginVersion = 0;
|
||||||
|
if (deterministicRandom()->random01() < 0.5) {
|
||||||
|
int startIdx = 0;
|
||||||
|
int endIdxExclusive = readVersionIdx + 1;
|
||||||
|
// Choose skewed towards later versions. It's ok if beginVersion isn't readable though because it
|
||||||
|
// will collapse
|
||||||
|
size_t beginVersionIdx = (size_t)std::sqrt(
|
||||||
|
deterministicRandom()->randomInt(startIdx * startIdx, endIdxExclusive * endIdxExclusive));
|
||||||
|
beginVersion = threadData->writeVersions[beginVersionIdx];
|
||||||
|
}
|
||||||
|
|
||||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
|
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
|
||||||
wait(self->readFromBlob(cx, self, range, readVersion));
|
wait(self->readFromBlob(cx, self, range, beginVersion, readVersion));
|
||||||
self->validateResult(threadData, blob, startKey, endKey, 0, readVersion);
|
self->validateResult(threadData, blob, startKey, endKey, beginVersion, readVersion);
|
||||||
|
|
||||||
int resultBytes = blob.first.expectedSize();
|
int resultBytes = blob.first.expectedSize();
|
||||||
threadData->rowsRead += blob.first.size();
|
threadData->rowsRead += blob.first.size();
|
||||||
@ -822,7 +884,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||||||
fmt::print("Directory {0} doing final data check @ {1}\n", threadData->directoryID, readVersion);
|
fmt::print("Directory {0} doing final data check @ {1}\n", threadData->directoryID, readVersion);
|
||||||
}
|
}
|
||||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
|
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
|
||||||
wait(self->readFromBlob(cx, self, threadData->directoryRange, readVersion));
|
wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion));
|
||||||
result = self->validateResult(threadData, blob, 0, std::numeric_limits<uint32_t>::max(), 0, readVersion);
|
result = self->validateResult(threadData, blob, 0, std::numeric_limits<uint32_t>::max(), 0, readVersion);
|
||||||
finalRowsValidated = blob.first.size();
|
finalRowsValidated = blob.first.size();
|
||||||
|
|
||||||
|
@ -225,7 +225,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const BlobGranuleChunkRef& chunk : chunks) {
|
for (const BlobGranuleChunkRef& chunk : chunks) {
|
||||||
RangeResult chunkRows = wait(readBlobGranule(chunk, range, version, self->bstore));
|
RangeResult chunkRows = wait(readBlobGranule(chunk, range, 0, version, self->bstore));
|
||||||
out.arena().dependsOn(chunkRows.arena());
|
out.arena().dependsOn(chunkRows.arena());
|
||||||
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
|
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user