diff --git a/fdbclient/BlobWorkerCommon.h b/fdbclient/BlobWorkerCommon.h index 49aed17985..061213e793 100644 --- a/fdbclient/BlobWorkerCommon.h +++ b/fdbclient/BlobWorkerCommon.h @@ -44,6 +44,7 @@ struct BlobWorkerStats { int numRangesAssigned; int mutationBytesBuffered; int activeReadRequests; + int granulesPendingSplitCheck; Future logger; @@ -62,10 +63,11 @@ struct BlobWorkerStats { readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc), commitVersionChecks("CommitVersionChecks", cc), granuleUpdateErrors("GranuleUpdateErrors", cc), granuleRequestTimeouts("GranuleRequestTimeouts", cc), readRequestsWithBegin("ReadRequestsWithBegin", cc), readRequestsCollapsed("ReadRequestsCollapsed", cc), - numRangesAssigned(0), mutationBytesBuffered(0), activeReadRequests(0) { + numRangesAssigned(0), mutationBytesBuffered(0), activeReadRequests(0), granulesPendingSplitCheck(0) { specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; }); specialCounter(cc, "MutationBytesBuffered", [this]() { return this->mutationBytesBuffered; }); specialCounter(cc, "ActiveReadRequests", [this]() { return this->activeReadRequests; }); + specialCounter(cc, "GranulesPendingSplitCheck", [this]() { return this->granulesPendingSplitCheck; }); logger = traceCounters("BlobWorkerMetrics", id, interval, &cc, "BlobWorkerMetrics"); } diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index be38308f3a..fc82b60434 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -858,6 +858,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 ); init( BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT, 1.5 ); + init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 ); + init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 ); + // clang-format on if (clientKnobs) { diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 2d87507198..ec7fdc08b0 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -812,6 +812,8 @@ public: double BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN; double BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX; double BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT; + double BGCC_TIMEOUT; + double BGCC_MIN_INTERVAL; ServerKnobs(Randomize, ClientKnobs*, IsSimulated); void initialize(Randomize, ClientKnobs*, IsSimulated); diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 912b0842e8..31542262db 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -224,13 +224,16 @@ struct BlobManagerStats { Counter ccRowsChecked; Counter ccBytesChecked; Counter ccMismatches; + Counter ccTimeouts; + Counter ccErrors; Future logger; // Current stats maintained for a given blob worker process explicit BlobManagerStats(UID id, double interval, std::unordered_map* workers) : cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc), granuleWriteHotSplits("GranuleWriteHotSplits", cc), ccGranulesChecked("CCGranulesChecked", cc), - ccRowsChecked("CCRowsChecked", cc), ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc) { + ccRowsChecked("CCRowsChecked", cc), ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc), + ccTimeouts("CCTimeouts", cc), ccErrors("CCErrors", cc) { specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); }); logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics"); } @@ -2743,6 +2746,25 @@ static void blobManagerExclusionSafetyCheck(Reference self, req.reply.send(reply); } +ACTOR Future bgccCheckGranule(Reference bmData, KeyRange range) { + state std::pair fdbResult = wait(readFromFDB(bmData->db, range)); + + std::pair>> blobResult = + wait(readFromBlob(bmData->db, bmData->bstore, range, 0, fdbResult.second)); + + if (!compareFDBAndBlob(fdbResult.first, blobResult, range, fdbResult.second, BM_DEBUG)) { + ++bmData->stats.ccMismatches; + } + + int64_t bytesRead = fdbResult.first.expectedSize(); + + ++bmData->stats.ccGranulesChecked; + bmData->stats.ccRowsChecked += fdbResult.first.size(); + bmData->stats.ccBytesChecked += bytesRead; + + return bytesRead; +} + // FIXME: could eventually make this more thorough by storing some state in the DB or something // FIXME: simpler solution could be to shuffle ranges ACTOR Future bgConsistencyCheck(Reference bmData) { @@ -2775,32 +2797,31 @@ ACTOR Future bgConsistencyCheck(Reference bmData) { tries--; } + state int64_t allowanceBytes = SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES; if (tries == 0) { if (BM_DEBUG) { printf("BGCC couldn't find random range to check, skipping\n"); } - wait(rateLimiter->getAllowance(SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES)); } else { - state std::pair fdbResult = wait(readFromFDB(bmData->db, range)); - - std::pair>> blobResult = - wait(readFromBlob(bmData->db, bmData->bstore, range, 0, fdbResult.second)); - - if (!compareFDBAndBlob(fdbResult.first, blobResult, range, fdbResult.second, BM_DEBUG)) { - ++bmData->stats.ccMismatches; + try { + Optional bytesRead = + wait(timeout(bgccCheckGranule(bmData, range), SERVER_KNOBS->BGCC_TIMEOUT)); + if (bytesRead.present()) { + allowanceBytes = bytesRead.get(); + } else { + ++bmData->stats.ccTimeouts; + } + } catch (Error& e) { + if (e.code() == error_code_operation_cancelled) { + throw e; + } + TraceEvent(SevWarn, "BGCCError", bmData->id).error(e).detail("Epoch", bmData->epoch); + ++bmData->stats.ccErrors; } - - int64_t bytesRead = fdbResult.first.expectedSize(); - - ++bmData->stats.ccGranulesChecked; - bmData->stats.ccRowsChecked += fdbResult.first.size(); - bmData->stats.ccBytesChecked += bytesRead; - - // clear fdb result to release memory since it is a state variable - fdbResult = std::pair(RangeResult(), 0); - - wait(rateLimiter->getAllowance(bytesRead)); } + // wait at least some interval if snapshot is small and to not overwhelm the system with reads (for example, + // empty database with one empty granule) + wait(rateLimiter->getAllowance(allowanceBytes) && delay(SERVER_KNOBS->BGCC_MIN_INTERVAL)); } else { if (BM_DEBUG) { fmt::print("BGCC found no workers, skipping\n", bmData->workerAssignments.size()); diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index b48330a531..182e692735 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -862,6 +862,23 @@ ACTOR Future compactFromBlob(Reference bwData, } } +struct CounterHolder { + int* counter; + bool completed; + + CounterHolder() : counter(nullptr), completed(true) {} + CounterHolder(int* counter) : counter(counter), completed(false) { (*counter)++; } + + void complete() { + if (!completed) { + completed = true; + (*counter)--; + } + } + + ~CounterHolder() { complete(); } +}; + ACTOR Future checkSplitAndReSnapshot(Reference bwData, Reference metadata, UID granuleID, @@ -877,6 +894,8 @@ ACTOR Future checkSplitAndReSnapshot(Reference bw wait(delay(0, TaskPriority::BlobWorkerUpdateFDB)); + state CounterHolder pendingCounter(&bwData->stats.granulesPendingSplitCheck); + if (BW_DEBUG) { fmt::print("Granule [{0} - {1}) checking with BM for re-snapshot after {2} bytes\n", metadata->keyRange.begin.printable(), @@ -955,6 +974,8 @@ ACTOR Future checkSplitAndReSnapshot(Reference bw } } + pendingCounter.complete(); + if (BW_DEBUG) { fmt::print("Granule [{0} - {1}) re-snapshotting after {2} bytes\n", metadata->keyRange.begin.printable(), diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index c4f1b642e7..ba318f1474 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2551,6 +2551,7 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques req.reply.send(feedReply); if (req.begin == req.end) { + data->activeFeedQueries--; req.reply.sendError(end_of_stream()); return Void(); }