diff --git a/fdbclient/BlobGranuleFiles.cpp b/fdbclient/BlobGranuleFiles.cpp index 6c2cd83a5f..67b119531b 100644 --- a/fdbclient/BlobGranuleFiles.cpp +++ b/fdbclient/BlobGranuleFiles.cpp @@ -119,29 +119,50 @@ static void applyDelta(KeyRangeRef keyRange, MutationRef m, std::map& dataMap) { - if (!deltas.empty()) { - // check that consecutive delta file versions are disjoint - ASSERT(lastFileEndVersion < deltas.front().version); + if (deltas.empty()) { + return; } - for (const MutationsAndVersionRef& delta : deltas) { - if (delta.version > readVersion) { + // check that consecutive delta file versions are disjoint + ASSERT(lastFileEndVersion < deltas.front().version); + + const MutationsAndVersionRef* mutationIt = deltas.begin(); + // prune beginVersion if necessary + if (beginVersion > deltas.front().version) { + if (beginVersion > deltas.back().version) { + printf("beginVersion=%lld, deltas.front=%lld, deltas.back=%lld, deltas.size=%d\n", + beginVersion, + deltas.front().version, + deltas.back().version, + deltas.size()); + } + ASSERT(beginVersion <= deltas.back().version); + // binary search for beginVersion + mutationIt = std::lower_bound(deltas.begin(), + deltas.end(), + MutationsAndVersionRef(beginVersion, 0), + MutationsAndVersionRef::OrderByVersion()); + } + + while (mutationIt != deltas.end()) { + if (mutationIt->version > readVersion) { lastFileEndVersion = readVersion; return; } - for (auto& m : delta.mutations) { + for (auto& m : mutationIt->mutations) { applyDelta(keyRange, m, dataMap); } + mutationIt++; } - if (!deltas.empty()) { - lastFileEndVersion = deltas.back().version; - } + lastFileEndVersion = deltas.back().version; } static Arena loadDeltaFile(StringRef deltaData, KeyRangeRef keyRange, + Version beginVersion, Version readVersion, Version& lastFileEndVersion, std::map& dataMap) { @@ -163,19 +184,18 @@ static Arena loadDeltaFile(StringRef deltaData, ASSERT(deltas[i].version <= deltas[i + 1].version); } - applyDeltas(deltas, keyRange, readVersion, lastFileEndVersion, dataMap); + applyDeltas(deltas, keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap); return parseArena; } RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, KeyRangeRef keyRange, + Version beginVersion, Version readVersion, Optional snapshotData, StringRef deltaFileData[]) { // TODO REMOVE with early replying ASSERT(readVersion == chunk.includedVersion); - ASSERT(chunk.snapshotFile.present()); - ASSERT(snapshotData.present()); // Arena to hold all allocations for applying deltas. Most of it, and the arenas produced by reading the files, // will likely be tossed if there are a significant number of mutations, so we copy at the end instead of doing a @@ -195,13 +215,14 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, fmt::print("Applying {} delta files\n", chunk.deltaFiles.size()); } for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) { - Arena deltaArena = loadDeltaFile(deltaFileData[deltaIdx], keyRange, readVersion, lastFileEndVersion, dataMap); + Arena deltaArena = + loadDeltaFile(deltaFileData[deltaIdx], keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap); arena.dependsOn(deltaArena); } if (BG_READ_DEBUG) { fmt::print("Applying {} memory deltas\n", chunk.newDeltas.size()); } - applyDeltas(chunk.newDeltas, keyRange, readVersion, lastFileEndVersion, dataMap); + applyDeltas(chunk.newDeltas, keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap); RangeResult ret; for (auto& it : dataMap) { @@ -262,7 +283,7 @@ ErrorOr loadAndMaterializeBlobGranules(const Standalone loadAndMaterializeBlobGranules(const Standalone snapshotData, StringRef deltaFileData[]); diff --git a/fdbclient/BlobGranuleReader.actor.cpp b/fdbclient/BlobGranuleReader.actor.cpp index 0b9a4fba30..2fde30be0d 100644 --- a/fdbclient/BlobGranuleReader.actor.cpp +++ b/fdbclient/BlobGranuleReader.actor.cpp @@ -28,6 +28,7 @@ #include "fdbclient/BlobGranuleReader.actor.h" #include "fdbclient/BlobWorkerCommon.h" #include "fdbclient/BlobWorkerInterface.h" +#include "fdbclient/FDBTypes.h" #include "flow/actorcompiler.h" // This must be the last #include. // TODO more efficient data structure besides std::map? PTree is unnecessary since this isn't versioned, but some other @@ -63,22 +64,25 @@ ACTOR Future> readFile(Reference readBlobGranule(BlobGranuleChunkRef chunk, KeyRangeRef keyRange, + Version beginVersion, Version readVersion, Reference bstore, Optional stats) { // TODO REMOVE with early replying ASSERT(readVersion == chunk.includedVersion); - ASSERT(chunk.snapshotFile.present()); state Arena arena; try { - Future> readSnapshotFuture = readFile(bstore, chunk.snapshotFile.get()); - state std::vector>> readDeltaFutures; - if (stats.present()) { - ++stats.get()->s3GetReqs; + Future> readSnapshotFuture; + if (chunk.snapshotFile.present()) { + readSnapshotFuture = readFile(bstore, chunk.snapshotFile.get()); + if (stats.present()) { + ++stats.get()->s3GetReqs; + } } + state std::vector>> readDeltaFutures; readDeltaFutures.reserve(chunk.deltaFiles.size()); for (BlobFilePointerRef deltaFile : chunk.deltaFiles) { @@ -88,8 +92,12 @@ ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, } } - state Standalone snapshotData = wait(readSnapshotFuture); - arena.dependsOn(snapshotData.arena()); + state Optional snapshotData; // not present if snapshotFile isn't present + if (chunk.snapshotFile.present()) { + state Standalone s = wait(readSnapshotFuture); + arena.dependsOn(s.arena()); + snapshotData = s; + } state int numDeltaFiles = chunk.deltaFiles.size(); state StringRef* deltaData = new (arena) StringRef[numDeltaFiles]; @@ -102,7 +110,7 @@ ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, arena.dependsOn(data.arena()); } - return materializeBlobGranule(chunk, keyRange, readVersion, snapshotData, deltaData); + return materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData); } catch (Error& e) { throw e; @@ -119,8 +127,8 @@ ACTOR Future readBlobGranules(BlobGranuleFileRequest request, try { state int i; for (i = 0; i < reply.chunks.size(); i++) { - RangeResult chunkResult = - wait(readBlobGranule(reply.chunks[i], request.keyRange, request.readVersion, bstore)); + RangeResult chunkResult = wait( + readBlobGranule(reply.chunks[i], request.keyRange, request.beginVersion, request.readVersion, bstore)); results.send(std::move(chunkResult)); } results.sendError(end_of_stream()); diff --git a/fdbclient/BlobGranuleReader.actor.h b/fdbclient/BlobGranuleReader.actor.h index 1b168ebc5d..958dc817e8 100644 --- a/fdbclient/BlobGranuleReader.actor.h +++ b/fdbclient/BlobGranuleReader.actor.h @@ -40,6 +40,7 @@ // the request ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, KeyRangeRef keyRange, + Version beginVersion, Version readVersion, Reference bstore, Optional stats = Optional()); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index a46c16641d..af465df0c9 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7442,6 +7442,7 @@ ACTOR Future>> readBlobGranulesActor( req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey)); req.beginVersion = begin; req.readVersion = rv; + req.canCollapseBegin = true; // TODO make this a parameter once we support it std::vector>> v; v.push_back( diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 5b6d22c2b1..5c6bba8a8e 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -834,7 +834,7 @@ ACTOR Future compactFromBlob(Reference bwData, rowsStream, false); RangeResult newGranule = - wait(readBlobGranule(chunk, metadata->keyRange, version, bwData->bstore, &bwData->stats)); + wait(readBlobGranule(chunk, metadata->keyRange, 0, version, bwData->bstore, &bwData->stats)); bwData->stats.bytesReadFromS3ForCompaction += compactBytesRead; rowsStream.send(std::move(newGranule)); @@ -2095,11 +2095,16 @@ ACTOR Future waitForVersion(Reference metadata, Version v ACTOR Future doBlobGranuleFileRequest(Reference bwData, BlobGranuleFileRequest req) { if (BW_REQUEST_DEBUG) { - fmt::print("BW {0} processing blobGranuleFileRequest for range [{1} - {2}) @ {3}\n", + fmt::print("BW {0} processing blobGranuleFileRequest for range [{1} - {2}) @ ", bwData->id.toString(), req.keyRange.begin.printable(), req.keyRange.end.printable(), req.readVersion); + if (req.beginVersion > 0) { + fmt::print("{0} - {1}\n", req.beginVersion, req.readVersion); + } else { + fmt::print("{}", req.readVersion); + } } state bool didCollapse = false; @@ -2298,7 +2303,8 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl ASSERT(metadata->cancelled.canBeSet()); // Right now we force a collapse if the version range crosses granule boundaries, for simplicity - if (chunkFiles.snapshotFiles.front().version < granuleBeginVersion) { + if (granuleBeginVersion <= chunkFiles.snapshotFiles.front().version) { + TEST(true); // collapsed begin version request because of boundaries didCollapse = true; granuleBeginVersion = 0; } @@ -2309,13 +2315,14 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl chunkFiles.getFiles(granuleBeginVersion, req.readVersion, req.canCollapseBegin, chunk, rep.arena); if (granuleBeginVersion > 0 && chunk.snapshotFile.present()) { + TEST(true); // collapsed begin version request for efficiency didCollapse = true; } // new deltas (if version is larger than version of last delta file) // FIXME: do trivial key bounds here if key range is not fully contained in request key // range - if (req.readVersion > metadata->durableDeltaVersion.get() && metadata->currentDeltas.size()) { + if (req.readVersion > metadata->durableDeltaVersion.get() && !metadata->currentDeltas.empty()) { if (metadata->durableDeltaVersion.get() != metadata->pendingDeltaVersion) { fmt::print("real-time read [{0} - {1}) @ {2} doesn't have mutations!! durable={3}, pending={4}\n", metadata->keyRange.begin.printable(), @@ -2327,6 +2334,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl // prune mutations based on begin version, if possible ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion); + // FIXME: I think we can remove this dependsOn since we are doing push_back_deep rep.arena.dependsOn(metadata->currentDeltas.arena()); MutationsAndVersionRef* mutationIt = metadata->currentDeltas.begin(); if (granuleBeginVersion > metadata->currentDeltas.back().version) { diff --git a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp index ea43cebbb7..e0d309954e 100644 --- a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp +++ b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp @@ -272,15 +272,20 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { } // FIXME: typedef this pair type and/or chunk list - ACTOR Future>>> - readFromBlob(Database cx, BlobGranuleCorrectnessWorkload* self, KeyRange range, Version version) { + ACTOR Future>>> readFromBlob( + Database cx, + BlobGranuleCorrectnessWorkload* self, + KeyRange range, + Version beginVersion, + Version readVersion) { state RangeResult out; state Standalone> chunks; state Transaction tr(cx); loop { try { - Standalone> chunks_ = wait(tr.readBlobGranules(range, 0, version)); + Standalone> chunks_ = + wait(tr.readBlobGranules(range, beginVersion, readVersion)); chunks = chunks_; break; } catch (Error& e) { @@ -289,7 +294,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { } for (const BlobGranuleChunkRef& chunk : chunks) { - RangeResult chunkRows = wait(readBlobGranule(chunk, range, version, self->bstore)); + RangeResult chunkRows = wait(readBlobGranule(chunk, range, beginVersion, readVersion, self->bstore)); out.arena().dependsOn(chunkRows.arena()); out.append(out.arena(), chunkRows.begin(), chunkRows.size()); } @@ -321,7 +326,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { Version rv = wait(self->doGrv(&tr)); state Version readVersion = rv; std::pair>> blob = - wait(self->readFromBlob(cx, self, threadData->directoryRange, readVersion)); + wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion)); fmt::print("Directory {0} got {1} RV {2}\n", threadData->directoryID, doSetup ? "initial" : "final", @@ -349,6 +354,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { const Optional& blobValue, uint32_t startKey, uint32_t endKey, + Version beginVersion, Version readVersion, const std::pair>>& blob) { threadData->mismatches++; @@ -360,11 +366,13 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { ev.detail("DirectoryID", format("%08x", threadData->directoryID)) .detail("RangeStart", format("%08x", startKey)) .detail("RangeEnd", format("%08x", endKey)) + .detail("BeginVersion", beginVersion) .detail("Version", readVersion); - fmt::print("Found mismatch! Request for dir {0} [{1} - {2}) @ {3}\n", + fmt::print("Found mismatch! Request for dir {0} [{1} - {2}) @ {3} - {4}\n", format("%08x", threadData->directoryID), format("%08x", startKey), format("%08x", endKey), + beginVersion, readVersion); if (lastMatching.present()) { fmt::print(" last correct: {}\n", lastMatching.get().printable()); @@ -456,6 +464,29 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { readVersion); } + // because each chunk could be separately collapsed or not if we set beginVersion, we have to track it by chunk + KeyRangeMap beginVersionByChunk; + beginVersionByChunk.insert(normalKeys, 0); + int beginCollapsed = 0; + int beginNotCollapsed = 0; + for (auto& chunk : blob.second) { + if (!chunk.snapshotFile.present()) { + ASSERT(beginVersion > 0); + ASSERT(chunk.snapshotVersion == invalidVersion); + beginCollapsed++; + beginVersionByChunk.insert(chunk.keyRange, beginVersion); + } else { + ASSERT(chunk.snapshotVersion != invalidVersion); + if (beginVersion > 0) { + beginNotCollapsed++; + } + } + } + TEST(beginCollapsed > 0); // BGCorrectness got collapsed request with beginVersion > 0 + TEST(beginNotCollapsed > 0); // BGCorrectness got un-collapsed request with beginVersion > 0 + TEST(beginCollapsed > 0 && + beginNotCollapsed > 0); // BGCorrectness got both collapsed and uncollapsed in the same request! + while (checkIt != threadData->keyData.end() && checkIt->first < endKeyExclusive) { uint32_t key = checkIt->first; if (DEBUG_READ_OP(threadData->directoryID, readVersion)) { @@ -475,6 +506,16 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { for (; idIdx < checkIt->second.writes.size() && checkIt->second.writes[idIdx].writeVersion <= readVersion; idIdx++) { Key nextKeyShouldBe = threadData->getKey(key, idIdx); + Version keyBeginVersion = beginVersionByChunk.rangeContaining(nextKeyShouldBe).cvalue(); + if (keyBeginVersion > checkIt->second.writes[idIdx].writeVersion) { + if (DEBUG_READ_OP(threadData->directoryID, readVersion)) { + fmt::print("DBG READ: Skip ID {0} written @ {1} < beginVersion {2}\n", + idIdx, + checkIt->second.writes[idIdx].clearVersion, + keyBeginVersion); + } + continue; + } if (DEBUG_READ_OP(threadData->directoryID, readVersion)) { fmt::print("DBG READ: Checking ID {0} ({1}) written @ {2}\n", format("%08x", idIdx), @@ -491,6 +532,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { Optional(), startKeyInclusive, endKeyExclusive, + beginVersion, readVersion, blob); return false; @@ -509,6 +551,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { Optional(), startKeyInclusive, endKeyExclusive, + beginVersion, readVersion, blob); return false; @@ -523,6 +566,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { blob.first[resultIdx].value, startKeyInclusive, endKeyExclusive, + beginVersion, readVersion, blob); return false; @@ -545,6 +589,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { Optional(), startKeyInclusive, endKeyExclusive, + beginVersion, readVersion, blob); return false; @@ -565,6 +610,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { state double targetReadBytesPerSec = threadData->targetByteRate * 4; ASSERT(targetReadBytesPerSec > 0); + state Version beginVersion; state Version readVersion; TraceEvent("BlobGranuleCorrectnessReaderStart").log(); @@ -610,26 +656,42 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { state KeyRange range = KeyRangeRef(threadData->getKey(startKey, 0), threadData->getKey(endKey, 0)); // pick read version - // TODO could also pick begin version here ASSERT(threadData->writeVersions.back() >= threadData->minSuccessfulReadVersion); + size_t readVersionIdx; // randomly choose up to date vs time travel read if (deterministicRandom()->random01() < 0.5) { threadData->reads++; + readVersionIdx = threadData->writeVersions.size() - 1; readVersion = threadData->writeVersions.back(); } else { threadData->timeTravelReads++; + size_t startIdx = 0; loop { - int readVersionIdx = deterministicRandom()->randomInt(0, threadData->writeVersions.size()); + readVersionIdx = deterministicRandom()->randomInt(startIdx, threadData->writeVersions.size()); readVersion = threadData->writeVersions[readVersionIdx]; if (readVersion >= threadData->minSuccessfulReadVersion) { break; + } else { + startIdx = readVersionIdx + 1; } } } + // randomly choose begin version or not + beginVersion = 0; + if (deterministicRandom()->random01() < 0.5) { + int startIdx = 0; + int endIdxExclusive = readVersionIdx + 1; + // Choose skewed towards later versions. It's ok if beginVersion isn't readable though because it + // will collapse + size_t beginVersionIdx = (size_t)std::sqrt( + deterministicRandom()->randomInt(startIdx * startIdx, endIdxExclusive * endIdxExclusive)); + beginVersion = threadData->writeVersions[beginVersionIdx]; + } + std::pair>> blob = - wait(self->readFromBlob(cx, self, range, readVersion)); - self->validateResult(threadData, blob, startKey, endKey, 0, readVersion); + wait(self->readFromBlob(cx, self, range, beginVersion, readVersion)); + self->validateResult(threadData, blob, startKey, endKey, beginVersion, readVersion); int resultBytes = blob.first.expectedSize(); threadData->rowsRead += blob.first.size(); @@ -822,7 +884,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { fmt::print("Directory {0} doing final data check @ {1}\n", threadData->directoryID, readVersion); } std::pair>> blob = - wait(self->readFromBlob(cx, self, threadData->directoryRange, readVersion)); + wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion)); result = self->validateResult(threadData, blob, 0, std::numeric_limits::max(), 0, readVersion); finalRowsValidated = blob.first.size(); diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index d4264058ca..cd97b1960b 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -225,7 +225,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload { } for (const BlobGranuleChunkRef& chunk : chunks) { - RangeResult chunkRows = wait(readBlobGranule(chunk, range, version, self->bstore)); + RangeResult chunkRows = wait(readBlobGranule(chunk, range, 0, version, self->bstore)); out.arena().dependsOn(chunkRows.arena()); out.append(out.arena(), chunkRows.begin(), chunkRows.size()); }