From af60e2ea3291a701ee8f5e201aedacd26ef4beac Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Mon, 25 Jul 2022 12:19:41 -0500 Subject: [PATCH] Fixed granule purging bug and improved debugging for purging --- fdbserver/BlobGranuleValidation.actor.cpp | 42 +++--- fdbserver/BlobManager.actor.cpp | 142 +++++++++++------- .../fdbserver/BlobGranuleValidation.actor.h | 2 + .../workloads/BlobGranuleVerifier.actor.cpp | 22 ++- 4 files changed, 127 insertions(+), 81 deletions(-) diff --git a/fdbserver/BlobGranuleValidation.actor.cpp b/fdbserver/BlobGranuleValidation.actor.cpp index af395bf71d..3168951a46 100644 --- a/fdbserver/BlobGranuleValidation.actor.cpp +++ b/fdbserver/BlobGranuleValidation.actor.cpp @@ -143,30 +143,34 @@ bool compareFDBAndBlob(RangeResult fdb, } } - printf("Chunks:\n"); - for (auto& chunk : blob.second) { - printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str()); - - printf(" SnapshotFile:\n %s\n", - chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : ""); - printf(" DeltaFiles:\n"); - for (auto& df : chunk.deltaFiles) { - printf(" %s\n", df.toString().c_str()); - } - printf(" Deltas: (%d)", chunk.newDeltas.size()); - if (chunk.newDeltas.size() > 0) { - fmt::print(" with version [{0} - {1}]", - chunk.newDeltas[0].version, - chunk.newDeltas[chunk.newDeltas.size() - 1].version); - } - fmt::print(" IncludedVersion: {}\n", chunk.includedVersion); - } - printf("\n"); + printGranuleChunks(blob.second); } } return correct; } +void printGranuleChunks(const Standalone>& chunks) { + printf("Chunks:\n"); + for (auto& chunk : chunks) { + printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str()); + + printf(" SnapshotFile:\n %s\n", + chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : ""); + printf(" DeltaFiles:\n"); + for (auto& df : chunk.deltaFiles) { + printf(" %s\n", df.toString().c_str()); + } + printf(" Deltas: (%d)", chunk.newDeltas.size()); + if (chunk.newDeltas.size() > 0) { + fmt::print(" with version [{0} - {1}]", + chunk.newDeltas[0].version, + chunk.newDeltas[chunk.newDeltas.size() - 1].version); + } + fmt::print(" IncludedVersion: {}\n", chunk.includedVersion); + } + printf("\n"); +} + ACTOR Future clearAndAwaitMerge(Database cx, KeyRange range) { // clear key range and check whether it is merged or not, repeatedly state Transaction tr(cx); diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 925630e9e7..18a49a6d7e 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -52,6 +52,7 @@ */ #define BM_DEBUG false +#define BM_PURGE_DEBUG false void handleClientBlobRange(KeyRangeMap* knownBlobRanges, Arena& ar, @@ -3168,8 +3169,8 @@ ACTOR Future fullyDeleteGranule(Reference self, Key historyKey, Version purgeVersion, KeyRange granuleRange) { - if (BM_DEBUG) { - fmt::print("Fully deleting granule {0}: init\n", granuleId.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Fully deleting granule {1}: init\n", self->epoch, granuleId.toString()); } // if granule is still splitting and files are needed for new sub-granules to re-snapshot, we can only partially @@ -3195,8 +3196,8 @@ ACTOR Future fullyDeleteGranule(Reference self, filesToDelete.emplace_back(fname); } - if (BM_DEBUG) { - fmt::print("Fully deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Fully deleting granule {1}: deleting {2} files\n", self->epoch, granuleId.toString(), filesToDelete.size()); for (auto filename : filesToDelete) { fmt::print(" - {}\n", filename.c_str()); } @@ -3209,8 +3210,8 @@ ACTOR Future fullyDeleteGranule(Reference self, wait(waitForAll(deletions)); // delete metadata in FDB (history entry and file keys) - if (BM_DEBUG) { - fmt::print("Fully deleting granule {0}: deleting history and file keys\n", granuleId.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Fully deleting granule {1}: deleting history and file keys\n", self->epoch, granuleId.toString()); } state Transaction tr(self->db); @@ -3229,8 +3230,8 @@ ACTOR Future fullyDeleteGranule(Reference self, } } - if (BM_DEBUG) { - fmt::print("Fully deleting granule {0}: success\n", granuleId.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Fully deleting granule {1}: success\n", self->epoch, granuleId.toString()); } TraceEvent("GranuleFullPurge", self->id) @@ -3259,8 +3260,8 @@ ACTOR Future partiallyDeleteGranule(Reference self, UID granuleId, Version purgeVersion, KeyRange granuleRange) { - if (BM_DEBUG) { - fmt::print("Partially deleting granule {0}: init\n", granuleId.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Partially deleting granule {1}: init\n", self->epoch, granuleId.toString()); } state Reference bstore = wait(getBStoreForGranule(self, granuleRange)); @@ -3309,8 +3310,8 @@ ACTOR Future partiallyDeleteGranule(Reference self, filesToDelete.emplace_back(fname); } - if (BM_DEBUG) { - fmt::print("Partially deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Partially deleting granule {1}: deleting {2} files\n", self->epoch, granuleId.toString(), filesToDelete.size()); for (auto filename : filesToDelete) { fmt::print(" - {0}\n", filename); } @@ -3327,8 +3328,8 @@ ACTOR Future partiallyDeleteGranule(Reference self, wait(waitForAll(deletions)); // delete metadata in FDB (deleted file keys) - if (BM_DEBUG) { - fmt::print("Partially deleting granule {0}: deleting file keys\n", granuleId.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Partially deleting granule {1}: deleting file keys\n", self->epoch, granuleId.toString()); } state Transaction tr(self->db); @@ -3347,8 +3348,8 @@ ACTOR Future partiallyDeleteGranule(Reference self, } } - if (BM_DEBUG) { - fmt::print("Partially deleting granule {0}: success\n", granuleId.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Partially deleting granule {1}: success\n", self->epoch, granuleId.toString()); } TraceEvent("GranulePartialPurge", self->id) .detail("Epoch", self->epoch) @@ -3373,8 +3374,9 @@ ACTOR Future partiallyDeleteGranule(Reference self, * processing this purge intent. */ ACTOR Future purgeRange(Reference self, KeyRangeRef range, Version purgeVersion, bool force) { - if (BM_DEBUG) { - fmt::print("purgeRange starting for range [{0} - {1}) @ purgeVersion={2}, force={3}\n", + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} purgeRange starting for range [{1} - {2}) @ purgeVersion={3}, force={4}\n", + self->epoch, range.begin.printable(), range.end.printable(), purgeVersion, @@ -3396,7 +3398,7 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range // track which granules we have already added to traversal // note: (startKey, startVersion) uniquely identifies a granule - state std::unordered_set, boost::hash>> + state std::unordered_set, boost::hash>> visited; // find all active granules (that comprise the range) and add to the queue @@ -3408,8 +3410,9 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range state KeyRangeMap::iterator activeRange; for (activeRange = activeRanges.begin(); activeRange != activeRanges.end(); ++activeRange) { - if (BM_DEBUG) { - fmt::print("Checking if active range [{0} - {1}), owned by BW {2}, should be purged\n", + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Checking if active range [{1} - {2}), owned by BW {3}, should be purged\n", + self->epoch, activeRange.begin().printable(), activeRange.end().printable(), activeRange.value().toString()); @@ -3417,6 +3420,7 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range // assumption: purge boundaries must respect granule boundaries if (activeRange.begin() < range.begin || activeRange.end() > range.end) { + TraceEvent(SevWarn, "GranulePurgeRangesUnaligned", self->id).detail("Epoch", self->epoch).detail("PurgeRange", range).detail("GranuleRange", activeRange.range()); continue; } @@ -3426,20 +3430,24 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range loop { try { - if (BM_DEBUG) { - fmt::print("Fetching latest history entry for range [{0} - {1})\n", + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Fetching latest history entry for range [{1} - {2})\n", + self->epoch, activeRange.begin().printable(), activeRange.end().printable()); } + // FIXME: doing this serially will likely be too slow for large purges Optional history = wait(getLatestGranuleHistory(&tr, activeRange.range())); // TODO: can we tell from the krm that this range is not valid, so that we don't need to do a // get if (history.present()) { - if (BM_DEBUG) { - printf("Adding range to history queue\n"); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Adding range to history queue: [{1} - {2}) @ {3} ({4})\n", self->epoch, activeRange.begin().printable(), activeRange.end().printable(), history.get().version, (void*)(activeRange.range().begin.begin())); } - visited.insert({ activeRange.range().begin.begin(), history.get().version }); + visited.insert({ activeRange.range().begin.toString(), history.get().version }); historyEntryQueue.push({ activeRange.range(), history.get().version, MAX_VERSION }); + } else if (BM_PURGE_DEBUG) { + fmt::print("BM {0} No history for range, ignoring\n", self->epoch); } break; } catch (Error& e) { @@ -3448,8 +3456,8 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range } } - if (BM_DEBUG) { - printf("Beginning BFS traversal of history\n"); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Beginning BFS traversal of {1} history items for range [{2} - {3}) \n", self->epoch, historyEntryQueue.size(), range.begin.printable(), range.end.printable()); } while (!historyEntryQueue.empty()) { // process the node at the front of the queue and remove it @@ -3459,8 +3467,9 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range std::tie(currRange, startVersion, endVersion) = historyEntryQueue.front(); historyEntryQueue.pop(); - if (BM_DEBUG) { - fmt::print("Processing history node [{0} - {1}) with versions [{2}, {3})\n", + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Processing history node [{1} - {2}) with versions [{3}, {4})\n", + self->epoch, currRange.begin.printable(), currRange.end.printable(), startVersion, @@ -3485,12 +3494,15 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range } if (!foundHistory) { + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} No history for this node, skipping\n", self->epoch); + } continue; } - if (BM_DEBUG) { - fmt::print("Found history entry for this node. It's granuleID is {0}\n", - currHistoryNode.granuleID.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Found history entry for this node. It's granuleID is {1}\n", + self->epoch, currHistoryNode.granuleID.toString()); } // There are three cases this granule can fall into: @@ -3500,33 +3512,38 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range // and so this granule should be partially deleted // - otherwise, this granule is active, so don't schedule it for deletion if (force || endVersion <= purgeVersion) { - if (BM_DEBUG) { - fmt::print("Granule {0} will be FULLY deleted\n", currHistoryNode.granuleID.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Granule {1} will be FULLY deleted\n", self->epoch, currHistoryNode.granuleID.toString()); } toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey, currRange }); } else if (startVersion < purgeVersion) { - if (BM_DEBUG) { - fmt::print("Granule {0} will be partially deleted\n", currHistoryNode.granuleID.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Granule {1} will be partially deleted\n", self->epoch, currHistoryNode.granuleID.toString()); } toPartiallyDelete.push_back({ currHistoryNode.granuleID, currRange }); } // add all of the node's parents to the queue + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Checking {1} parents\n", self->epoch, currHistoryNode.parentVersions.size()); + } for (int i = 0; i < currHistoryNode.parentVersions.size(); i++) { // for (auto& parent : currHistoryNode.parentVersions.size()) { // if we already added this node to queue, skip it; otherwise, mark it as visited KeyRangeRef parentRange(currHistoryNode.parentBoundaries[i], currHistoryNode.parentBoundaries[i + 1]); Version parentVersion = currHistoryNode.parentVersions[i]; - if (visited.count({ parentRange.begin.begin(), parentVersion })) { - if (BM_DEBUG) { - fmt::print("Already added {0} to queue, so skipping it\n", currHistoryNode.granuleID.toString()); + std::string beginStr = parentRange.begin.toString(); + if (!visited.insert({ beginStr, parentVersion }).second) { + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Already added [{1} - {2}) @ {3} - {4} to queue, so skipping it\n", self->epoch, parentRange.begin.printable(), + parentRange.end.printable(), parentVersion, startVersion); } continue; } - visited.insert({ parentRange.begin.begin(), parentVersion }); - if (BM_DEBUG) { - fmt::print("Adding parent [{0} - {1}) with versions [{2} - {3}) to queue\n", + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Adding parent [{1} - {2}) @ {3} - {4} to queue\n", + self->epoch, parentRange.begin.printable(), parentRange.end.printable(), parentVersion, @@ -3554,10 +3571,19 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range // we won't run into any issues with trying to "re-delete" a blob file since deleting // a file that doesn't exist is considered successful + TraceEvent("PurgeGranulesTraversalComplete", self->id) + .detail("Epoch", self->epoch) + .detail("Range", range) + .detail("PurgeVersion", purgeVersion) + .detail("Force", force) + .detail("VisitedCount", visited.size()) + .detail("DeletingFullyCount", toFullyDelete.size()) + .detail("DeletingPartiallyCount", toPartiallyDelete.size()); + state std::vector> partialDeletions; state int i; - if (BM_DEBUG) { - fmt::print("{0} granules to fully delete\n", toFullyDelete.size()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0}: {1} granules to fully delete\n", self->epoch, toFullyDelete.size()); } for (i = toFullyDelete.size() - 1; i >= 0; --i) { state UID granuleId; @@ -3565,22 +3591,22 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range KeyRange keyRange; std::tie(granuleId, historyKey, keyRange) = toFullyDelete[i]; // FIXME: consider batching into a single txn (need to take care of txn size limit) - if (BM_DEBUG) { - fmt::print("About to fully delete granule {0}\n", granuleId.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0}: About to fully delete granule {1}\n", self->epoch, granuleId.toString()); } wait(fullyDeleteGranule(self, granuleId, historyKey, purgeVersion, range)); } - if (BM_DEBUG) { - fmt::print("{0} granules to partially delete\n", toPartiallyDelete.size()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0}: {1} granules to partially delete\n", self->epoch, toPartiallyDelete.size()); } for (i = toPartiallyDelete.size() - 1; i >= 0; --i) { UID granuleId; KeyRange range; std::tie(granuleId, range) = toPartiallyDelete[i]; - if (BM_DEBUG) { - fmt::print("About to partially delete granule {0}\n", granuleId.toString()); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0}: About to partially delete granule {1}\n", self->epoch, granuleId.toString()); } partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, purgeVersion, range)); } @@ -3592,8 +3618,9 @@ ACTOR Future purgeRange(Reference self, KeyRangeRef range // another purgeIntent that got written for this table while we were processing this one. // If that is the case, we should not clear the key. Otherwise, we can just clear the key. - if (BM_DEBUG) { - fmt::print("Successfully purged range [{0} - {1}) at purgeVersion={2}\n", + if (BM_PURGE_DEBUG) { + fmt::print("BM {0}: Successfully purged range [{1} - {2}) at purgeVersion={3}\n", + self->epoch, range.begin.printable(), range.end.printable(), purgeVersion); @@ -3679,8 +3706,9 @@ ACTOR Future monitorPurgeKeys(Reference self) { } purgeMap.insert(range, std::make_pair(purgeVersion, force)); - if (BM_DEBUG) { - fmt::print("about to purge range [{0} - {1}) @ {2}, force={3}\n", + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} about to purge range [{1} - {2}) @ {3}, force={4}\n", + self->epoch, range.begin.printable(), range.end.printable(), purgeVersion, @@ -3732,8 +3760,8 @@ ACTOR Future monitorPurgeKeys(Reference self) { } } - if (BM_DEBUG) { - printf("Done clearing current set of purge intents.\n"); + if (BM_PURGE_DEBUG) { + fmt::print("BM {0} Done clearing current set of purge intents.\n", self->epoch); } CODE_PROBE(true, "BM finished processing purge intents"); diff --git a/fdbserver/include/fdbserver/BlobGranuleValidation.actor.h b/fdbserver/include/fdbserver/BlobGranuleValidation.actor.h index 838cc18ec4..db4cdc2891 100644 --- a/fdbserver/include/fdbserver/BlobGranuleValidation.actor.h +++ b/fdbserver/include/fdbserver/BlobGranuleValidation.actor.h @@ -51,6 +51,8 @@ bool compareFDBAndBlob(RangeResult fdb, Version v, bool debug); +void printGranuleChunks(const Standalone>& chunks); + ACTOR Future clearAndAwaitMerge(Database cx, KeyRange range); #include "flow/unactorcompiler.h" diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index 0c47c9abd2..f7b6e799c5 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -309,20 +309,30 @@ struct BlobGranuleVerifierWorkload : TestWorkload { // reading older than the purge version if (doPurging) { wait(self->killBlobWorkers(cx, self)); - std::pair>> versionRead = + if (BGV_DEBUG) { + fmt::print("BGV Reading post-purge [{0} - {1}) @ {2}\n", oldRead.range.begin.printable(), oldRead.range.end.printable(), prevPurgeVersion); + } + // ensure purge version exactly is still readable + std::pair>> versionRead1 = wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPurgeVersion)); + if (BGV_DEBUG) { + fmt::print("BGV Post-purge first read:\n"); + printGranuleChunks(versionRead1.second); + } try { + // read at purgeVersion - 1, should NOT be readable Version minSnapshotVersion = newPurgeVersion; - for (auto& it : versionRead.second) { + for (auto& it : versionRead1.second) { minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion); } if (BGV_DEBUG) { - fmt::print("Reading post-purge @ {0}\n", minSnapshotVersion - 1); + fmt::print("BGV Reading post-purge again [{0} - {1}) @ {2}\n", oldRead.range.begin.printable(), oldRead.range.end.printable(), minSnapshotVersion - 1); } - std::pair>> versionRead = + std::pair>> versionRead2 = wait(readFromBlob(cx, self->bstore, oldRead.range, 0, minSnapshotVersion - 1)); if (BGV_DEBUG) { - fmt::print("ERROR: data not purged! Read successful!!\n"); + fmt::print("BGV ERROR: data not purged! Read successful!!\n"); + printGranuleChunks(versionRead2.second); } ASSERT(false); } catch (Error& e) { @@ -507,6 +517,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload { // For some reason simulation is still passing when this fails?.. so assert for now ASSERT(result); + // FIXME: if doPurging was set, possibly do one last purge here, and verify it succeeds with no errors + if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && deterministicRandom()->random01() < 0.1) { CODE_PROBE(true, "BGV clearing database and awaiting merge"); wait(clearAndAwaitMerge(cx, normalKeys));