diff --git a/fdbcli/BlobRangeCommand.actor.cpp b/fdbcli/BlobRangeCommand.actor.cpp index 66cee7b0b7..b5fa48ff0d 100644 --- a/fdbcli/BlobRangeCommand.actor.cpp +++ b/fdbcli/BlobRangeCommand.actor.cpp @@ -99,20 +99,68 @@ ACTOR Future doBlobPurge(Database db, Key startKey, Key endKey, Optional checkBlobSubrange(Database db, KeyRange keyRange, Optional version) { + state Transaction tr(db); + state Version readVersionOut = invalidVersion; + loop { + try { + wait(success(tr.readBlobGranules(keyRange, 0, version, &readVersionOut))); + return readVersionOut; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + ACTOR Future doBlobCheck(Database db, Key startKey, Key endKey, Optional version) { state Transaction tr(db); state Version readVersionOut = invalidVersion; state double elapsed = -timer_monotonic(); + state KeyRange range = KeyRange(KeyRangeRef(startKey, endKey)); + state Standalone> allRanges; loop { try { - wait(success(tr.readBlobGranules(KeyRange(KeyRangeRef(startKey, endKey)), 0, version, &readVersionOut))); - elapsed += timer_monotonic(); + wait(store(allRanges, tr.getBlobGranuleRanges(range))); break; } catch (Error& e) { wait(tr.onError(e)); } } + if (allRanges.empty()) { + fmt::print("ERROR: No blob ranges for [{0} - {1})\n", startKey.printable(), endKey.printable()); + return Void(); + } + fmt::print("Loaded {0} blob ranges to check\n", allRanges.size()); + state std::vector> checkParts; + // chunk up to smaller ranges than max + int maxChunkSize = 1000; + KeyRange currentChunk; + int currentChunkSize = 0; + for (auto& it : allRanges) { + if (currentChunkSize == maxChunkSize) { + checkParts.push_back(checkBlobSubrange(db, currentChunk, version)); + currentChunkSize = 0; + } + if (currentChunkSize == 0) { + currentChunk = it; + } else if (it.begin != currentChunk.end) { + fmt::print("ERROR: Blobrange check failed, gap in blob ranges from [{0} - {1})\n", + currentChunk.end.printable(), + it.begin.printable()); + return Void(); + } else { + currentChunk = KeyRangeRef(currentChunk.begin, it.end); + } + currentChunkSize++; + } + checkParts.push_back(checkBlobSubrange(db, currentChunk, version)); + + wait(waitForAll(checkParts)); + readVersionOut = checkParts.back().get(); + + elapsed += timer_monotonic(); + fmt::print("Blob check complete for [{0} - {1}) @ {2} in {3:.6f} seconds\n", startKey.printable(), endKey.printable(), diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index aaaf8ac77d..7bbfae32f8 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -259,6 +259,7 @@ struct BlobManagerStats { Counter granuleSplits; Counter granuleWriteHotSplits; + Counter granuleMerges; Counter ccGranulesChecked; Counter ccRowsChecked; Counter ccBytesChecked; @@ -270,16 +271,27 @@ struct BlobManagerStats { Counter granulesPartiallyPurged; Counter filesPurged; Future logger; + int64_t activeMerges; // Current stats maintained for a given blob worker process - explicit BlobManagerStats(UID id, double interval, std::unordered_map* workers) + explicit BlobManagerStats(UID id, + double interval, + int64_t epoch, + std::unordered_map* workers, + std::unordered_map* mergeHardBoundaries, + std::unordered_map* mergeBoundaries) : cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc), - granuleWriteHotSplits("GranuleWriteHotSplits", cc), ccGranulesChecked("CCGranulesChecked", cc), - ccRowsChecked("CCRowsChecked", cc), ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc), - ccTimeouts("CCTimeouts", cc), ccErrors("CCErrors", cc), purgesProcessed("PurgesProcessed", cc), + granuleWriteHotSplits("GranuleWriteHotSplits", cc), granuleMerges("GranuleMerges", cc), + ccGranulesChecked("CCGranulesChecked", cc), ccRowsChecked("CCRowsChecked", cc), + ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc), ccTimeouts("CCTimeouts", cc), + ccErrors("CCErrors", cc), purgesProcessed("PurgesProcessed", cc), granulesFullyPurged("GranulesFullyPurged", cc), granulesPartiallyPurged("GranulesPartiallyPurged", cc), - filesPurged("FilesPurged", cc) { + filesPurged("FilesPurged", cc), activeMerges(0) { specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); }); + specialCounter(cc, "Epoch", [epoch]() { return epoch; }); + specialCounter(cc, "ActiveMerges", [this]() { return this->activeMerges; }); + specialCounter(cc, "HardBoundaries", [mergeHardBoundaries]() { return mergeHardBoundaries->size(); }); + specialCounter(cc, "SoftBoundaries", [mergeBoundaries]() { return mergeBoundaries->size(); }); logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics"); } }; @@ -364,18 +376,23 @@ struct BlobManagerData : NonCopyable, ReferenceCounted { Promise foundBlobWorkers; Promise doneRecovering; - int64_t epoch = -1; + int64_t epoch; int64_t seqNo = 1; Promise iAmReplaced; - BlobManagerData(UID id, Reference const> dbInfo, Database db, Optional dcId) - : id(id), db(db), dcId(dcId), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &workersById), + BlobManagerData(UID id, + Reference const> dbInfo, + Database db, + Optional dcId, + int64_t epoch) + : id(id), db(db), dcId(dcId), + stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, epoch, &workersById, &mergeHardBoundaries, &mergeBoundaries), knownBlobRanges(false, normalKeys.end), tenantData(BGTenantMap(dbInfo)), mergeCandidates(MergeCandidateInfo(MergeCandidateUnknown), normalKeys.end), activeGranuleMerges(invalidVersion, normalKeys.end), concurrentMergeChecks(SERVER_KNOBS->BLOB_MANAGER_CONCURRENT_MERGE_CHECKS), - restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {} + restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0), epoch(epoch) {} // only initialize blob store if actually needed void initBStore() { @@ -1874,6 +1891,7 @@ ACTOR Future finishMergeGranules(Reference bmData, std::vector parentGranuleIDs, std::vector parentGranuleRanges, std::vector parentGranuleStartVersions) { + ++bmData->stats.activeMerges; // wait for BM to be fully recovered before starting actual merges wait(bmData->doneRecovering.getFuture()); @@ -1920,6 +1938,8 @@ ACTOR Future finishMergeGranules(Reference bmData, BoundaryEvaluation(bmData->epoch, seqnoForEval, BoundaryEvalType::MERGE, 0, 0)); bmData->setMergeCandidate(mergeRange, MergeCandidateMerging); + --bmData->stats.activeMerges; + return Void(); } @@ -1937,6 +1957,8 @@ ACTOR Future doMerge(Reference bmData, } ranges.push_back(std::get<1>(toMerge.back()).end); + ++bmData->stats.granuleMerges; + try { std::pair persistMerge = wait(persistMergeGranulesStart(bmData, mergeRange, ids, ranges, startVersions)); @@ -4251,7 +4273,8 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, makeReference(deterministicRandom()->randomUniqueID(), dbInfo, openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True), - bmInterf.locality.dcId()); + bmInterf.locality.dcId(), + epoch); state Future collection = actorCollection(self->addActor.getFuture()); @@ -4260,8 +4283,6 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, } TraceEvent("BlobManagerInit", bmInterf.id()).detail("Epoch", epoch).log(); - self->epoch = epoch; - // although we start the recruiter, we wait until existing workers are ack'd auto recruitBlobWorker = IAsyncListener>::create( dbInfo, [](auto const& info) { return info.clusterInterface.recruitBlobWorker; });