diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 4b57a42f34..c8b69dac0b 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -604,7 +604,8 @@ ACTOR Future databaseLogger(DatabaseContext* cx) { loop { wait(delay(CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskPriority::FlushTrace)); - if (!g_network->isSimulated()) { + bool logTraces = !g_network->isSimulated() || BUGGIFY_WITH_PROB(0.01); + if (logTraces) { TraceEvent ev("TransactionMetrics", cx->dbId); ev.detail("Elapsed", (lastLogged == 0) ? 0 : now() - lastLogged) @@ -657,6 +658,19 @@ ACTOR Future databaseLogger(DatabaseContext* cx) { cx->bgLatencies.clear(); cx->bgGranulesPerRequest.clear(); + if (cx->usedAnyChangeFeeds && logTraces) { + TraceEvent feedEv("ChangeFeedClientMetrics", cx->dbId); + + feedEv.detail("Elapsed", (lastLogged == 0) ? 0 : now() - lastLogged) + .detail("Cluster", + cx->getConnectionRecord() + ? cx->getConnectionRecord()->getConnectionString().clusterKeyName().toString() + : "") + .detail("Internal", cx->internal); + + cx->ccFeed.logToTraceEvent(feedEv); + } + lastLogged = now(); } } @@ -1466,9 +1480,13 @@ DatabaseContext::DatabaseContext(ReferenceSHARD_STAT_SMOOTH_AMOUNT), connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {} @@ -9512,6 +9534,7 @@ ACTOR Future getChangeFeedStreamActor(Reference db, bool canReadPopped) { state Database cx(db); state Span span("NAPI:GetChangeFeedStream"_loc); + db->usedAnyChangeFeeds = true; results->endVersion = end; @@ -9587,7 +9610,10 @@ ACTOR Future getChangeFeedStreamActor(Reference db, loc = 0; } + ++db->feedStreamStarts; + if (locations.size() > 1) { + ++db->feedMergeStreamStarts; std::vector> interfs; for (int i = 0; i < locations.size(); i++) { interfs.emplace_back(locations[i].locations->getInterface(chosenLocations[i]), @@ -9610,6 +9636,7 @@ ACTOR Future getChangeFeedStreamActor(Reference db, results->streams.clear(); results->storageData.clear(); if (e.code() == error_code_change_feed_popped) { + ++db->feedNonRetriableErrors; CODE_PROBE(true, "getChangeFeedStreamActor got popped"); results->mutations.sendError(e); results->refresh.sendError(e); @@ -9629,18 +9656,27 @@ ACTOR Future getChangeFeedStreamActor(Reference db, if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || e.code() == error_code_connection_failed || e.code() == error_code_unknown_change_feed || e.code() == error_code_broken_promise || e.code() == error_code_future_version || - e.code() == error_code_request_maybe_delivered) { + e.code() == error_code_request_maybe_delivered || + e.code() == error_code_storage_too_many_feed_streams) { + ++db->feedErrors; db->changeFeedCache.erase(rangeID); cx->invalidateCache(Key(), keys); - if (begin == lastBeginVersion) { + if (begin == lastBeginVersion || e.code() == error_code_storage_too_many_feed_streams) { // We didn't read anything since the last failure before failing again. - // Do exponential backoff, up to 1 second - sleepWithBackoff = std::min(1.0, sleepWithBackoff * 1.5); + // Back off quickly and exponentially, up to 1 second + sleepWithBackoff = std::min(2.0, sleepWithBackoff * 5); + sleepWithBackoff = std::max(0.1, sleepWithBackoff); } else { sleepWithBackoff = CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY; } + TraceEvent("ChangeFeedClientError") + .errorUnsuppressed(e) + .suppressFor(30.0) + .detail("AnyProgress", begin != lastBeginVersion); wait(delay(sleepWithBackoff)); } else { + ++db->feedNonRetriableErrors; + TraceEvent("ChangeFeedClientErrorNonRetryable").errorUnsuppressed(e).suppressFor(5.0); results->mutations.sendError(e); results->refresh.sendError(change_feed_cancelled()); results->streams.clear(); @@ -9767,6 +9803,7 @@ Future DatabaseContext::getOverlappingChangeFeeds(Ke } ACTOR static Future popChangeFeedBackup(Database cx, Key rangeID, Version version) { + ++cx->feedPopsFallback; state Transaction tr(cx); loop { try { @@ -9804,6 +9841,8 @@ ACTOR Future popChangeFeedMutationsActor(Reference db, Ke state Database cx(db); state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix); state Span span("NAPI:PopChangeFeedMutations"_loc); + db->usedAnyChangeFeeds = true; + ++db->feedPops; state KeyRange keys = wait(getChangeFeedRange(db, cx, rangeID)); diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 9e5b351bb3..54ce58f997 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -684,7 +684,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi bool buggifySmallBWLag = randomize && BUGGIFY; init( TARGET_BW_LAG, 50.0 ); if(buggifySmallBWLag) TARGET_BW_LAG = 10.0; - init( TARGET_BW_LAG_BATCH, 20.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_BATCH = 4.0; + init( TARGET_BW_LAG_BATCH, 30.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_BATCH = 4.0; init( TARGET_BW_LAG_UPDATE, 9.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_UPDATE = 1.0; init( MIN_BW_HISTORY, 10 ); init( BW_ESTIMATION_INTERVAL, 10.0 ); if(buggifySmallBWLag) BW_ESTIMATION_INTERVAL = 2.0; @@ -738,6 +738,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( FETCH_KEYS_PARALLELISM_FULL, 6 ); init( FETCH_KEYS_LOWER_PRIORITY, 0 ); init( SERVE_FETCH_CHECKPOINT_PARALLELISM, 4 ); + init( CHANGE_FEED_DISK_READS_PARALLELISM, 1000 ); if( randomize && BUGGIFY ) CHANGE_FEED_DISK_READS_PARALLELISM = 20; init( BUGGIFY_BLOCK_BYTES, 10000 ); init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS ); init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000; @@ -776,6 +777,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100); init( QUICK_GET_KEY_VALUES_LIMIT, 2000 ); init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 ); + init( STORAGE_FEED_QUERY_HARD_LIMIT, 100000 ); //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index d86058b3b8..8897972bc3 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -545,6 +545,16 @@ public: Counter transactionGrvTimedOutBatches; Counter transactionCommitVersionNotFoundForSS; + // Change Feed metrics. Omit change feed metrics from logging if not used + bool usedAnyChangeFeeds; + CounterCollection ccFeed; + Counter feedStreamStarts; + Counter feedMergeStreamStarts; + Counter feedErrors; + Counter feedNonRetriableErrors; + Counter feedPops; + Counter feedPopsFallback; + ContinuousSample latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, bytesPerCommit, bgLatencies, bgGranulesPerRequest; diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 068c28a2c0..37de449650 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -693,6 +693,7 @@ public: int FETCH_KEYS_PARALLELISM_FULL; int FETCH_KEYS_LOWER_PRIORITY; int SERVE_FETCH_CHECKPOINT_PARALLELISM; + int CHANGE_FEED_DISK_READS_PARALLELISM; int BUGGIFY_BLOCK_BYTES; int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT; double STORAGE_DURABILITY_LAG_REJECT_THRESHOLD; @@ -731,6 +732,7 @@ public: int CHECKPOINT_TRANSFER_BLOCK_BYTES; int QUICK_GET_KEY_VALUES_LIMIT; int QUICK_GET_KEY_VALUES_LIMIT_BYTES; + int STORAGE_FEED_QUERY_HARD_LIMIT; // Wait Failure int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 38e0eeea83..989b2833b9 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1016,7 +1016,7 @@ public: // Extra lock that prevents too much post-initial-fetch work from building up, such as mutation applying and change // feed tail fetching FlowLock fetchKeysParallelismFullLock; - FlowLock fetchChangeFeedParallelismLock; + FlowLock changeFeedDiskReadsLock; int64_t fetchKeysBytesBudget; AsyncVar fetchKeysBudgetUsed; std::vector> readyFetchKeys; @@ -1057,7 +1057,8 @@ public: CounterCollection cc; Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, getMappedRangeQueries, getRangeStreamQueries, finishedQueries, lowPriorityQueries, rowsQueried, bytesQueried, watchQueries, - emptyQueries, feedRowsQueried, feedBytesQueried, feedStreamQueries, feedVersionQueries; + emptyQueries, feedRowsQueried, feedBytesQueried, feedStreamQueries, rejectedFeedStreamQueries, + feedVersionQueries; // Bytes of the mutations that have been added to the memory of the storage server. When the data is durable // and cleared from the memory, we do not subtract it but add it to bytesDurable. @@ -1135,9 +1136,9 @@ public: lowPriorityQueries("LowPriorityQueries", cc), rowsQueried("RowsQueried", cc), bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc), feedRowsQueried("FeedRowsQueried", cc), feedBytesQueried("FeedBytesQueried", cc), - feedStreamQueries("FeedStreamQueries", cc), feedVersionQueries("FeedVersionQueries", cc), - bytesInput("BytesInput", cc), logicalBytesInput("LogicalBytesInput", cc), - logicalBytesMoveInOverhead("LogicalBytesMoveInOverhead", cc), + feedStreamQueries("FeedStreamQueries", cc), rejectedFeedStreamQueries("RejectedFeedStreamQueries", cc), + feedVersionQueries("FeedVersionQueries", cc), bytesInput("BytesInput", cc), + logicalBytesInput("LogicalBytesInput", cc), logicalBytesMoveInOverhead("LogicalBytesMoveInOverhead", cc), kvCommitLogicalBytes("KVCommitLogicalBytes", cc), kvClearRanges("KVClearRanges", cc), kvSystemClearRanges("KVSystemClearRanges", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc), mutationBytes("MutationBytes", cc), @@ -1195,6 +1196,10 @@ public: specialCounter(cc, "ServeFetchCheckpointWaiting", [self]() { return self->serveFetchCheckpointParallelismLock.waiters(); }); + specialCounter( + cc, "ChangeFeedDiskReadsActive", [self]() { return self->changeFeedDiskReadsLock.activePermits(); }); + specialCounter( + cc, "ChangeFeedDiskReadsWaiting", [self]() { return self->changeFeedDiskReadsLock.waiters(); }); specialCounter(cc, "QueryQueueMax", [self]() { return self->getAndResetMaxQueryQueueSize(); }); specialCounter(cc, "BytesStored", [self]() { return self->metrics.byteSample.getEstimate(allKeys); }); specialCounter(cc, "ActiveWatches", [self]() { return self->numWatches; }); @@ -1254,6 +1259,7 @@ public: numWatches(0), noRecentUpdates(false), lastUpdate(now()), updateEagerReads(nullptr), fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM), fetchKeysParallelismFullLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_FULL), + changeFeedDiskReadsLock(SERVER_KNOBS->CHANGE_FEED_DISK_READS_PARALLELISM), fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false), serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM), instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false), @@ -2622,12 +2628,16 @@ ACTOR Future> getChangeFeedMutations(Stor // To let update storage finish wait(delay(0)); } + + wait(data->changeFeedDiskReadsLock.take(TaskPriority::DefaultYield)); + state FlowLock::Releaser holdingDiskReadsLock(data->changeFeedDiskReadsLock); RangeResult res = wait( data->storage.readRange(KeyRangeRef(changeFeedDurableKey(req.rangeID, std::max(req.begin, emptyVersion)), changeFeedDurableKey(req.rangeID, req.end)), 1 << 30, remainingDurableBytes, options)); + holdingDiskReadsLock.release(); data->counters.kvScanBytes += res.logicalSize(); ++data->counters.changeFeedDiskReads; @@ -2964,17 +2974,28 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques state bool atLatest = false; state bool removeUID = false; state Optional blockedVersion; - if (req.replyBufferSize <= 0) { - req.reply.setByteLimit(SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES); - } else { - req.reply.setByteLimit(std::min((int64_t)req.replyBufferSize, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES)); - } - - ++data->counters.feedStreamQueries; - - wait(delay(0, TaskPriority::DefaultEndpoint)); try { + ++data->counters.feedStreamQueries; + + // FIXME: do something more sophisticated here besides hard limit + if (data->activeFeedQueries >= SERVER_KNOBS->STORAGE_FEED_QUERY_HARD_LIMIT || + (g_network->isSimulated() && BUGGIFY_WITH_PROB(0.005))) { + req.reply.sendError(storage_too_many_feed_streams()); + ++data->counters.rejectedFeedStreamQueries; + return Void(); + } + + data->activeFeedQueries++; + + if (req.replyBufferSize <= 0) { + req.reply.setByteLimit(SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES); + } else { + req.reply.setByteLimit(std::min((int64_t)req.replyBufferSize, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES)); + } + + wait(delay(0, TaskPriority::DefaultEndpoint)); + if (DEBUG_CF_TRACE) { TraceEvent(SevDebug, "TraceChangeFeedStreamStart", data->thisServerID) .detail("FeedID", req.rangeID) @@ -2985,7 +3006,6 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques .detail("CanReadPopped", req.canReadPopped) .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); } - data->activeFeedQueries++; wait(success(waitForVersionNoTooOld(data, req.begin))); diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h index 664078fe1e..e967caead3 100755 --- a/flow/include/flow/error_definitions.h +++ b/flow/include/flow/error_definitions.h @@ -100,6 +100,7 @@ ERROR( data_move_dest_team_not_found, 1076, "Dest team was not found for data mo ERROR( blob_worker_full, 1077, "Blob worker cannot take on more granule assignments" ) ERROR( grv_proxy_memory_limit_exceeded, 1078, "GetReadVersion proxy memory limit exceeded" ) ERROR( blob_granule_request_failed, 1079, "BlobGranule request failed" ) +ERROR( storage_too_many_feed_streams, 1080, "Too many feed streams to a single storage server" ) ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )