From b6450f9eaa0fd1aa2e24b5fd23684a4c07ee8eb6 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 12 Jan 2023 16:15:05 -0600 Subject: [PATCH] More ss cf perf fixes main (#9109) * changing future version logic for change feed fetch * Optimizing change feed data structures and accesses * coalescing change feed request ranges for merge cursor if they're to the same team * fixing over-read of memory mutations for change feeds * feed filter mutations common prefix cpu optimiation * fix formatting --- fdbclient/ClientKnobs.cpp | 3 +- fdbclient/NativeAPI.actor.cpp | 49 ++++++++++ fdbclient/include/fdbclient/ClientKnobs.h | 1 + fdbserver/storageserver.actor.cpp | 107 +++++++++++++++------- 4 files changed, 128 insertions(+), 32 deletions(-) diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index e81805adf7..d12d450d18 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -81,7 +81,8 @@ void ClientKnobs::initialize(Randomize randomize) { init( CHANGE_FEED_CACHE_SIZE, 100000 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_SIZE = 1; init( CHANGE_FEED_POP_TIMEOUT, 10.0 ); init( CHANGE_FEED_STREAM_MIN_BYTES, 1e4 ); if( randomize && BUGGIFY ) CHANGE_FEED_STREAM_MIN_BYTES = 1; - init( CHANGE_FEED_START_INTERVAL, 10.0 ); + init( CHANGE_FEED_START_INTERVAL, 20.0 ); if( randomize && BUGGIFY ) CHANGE_FEED_START_INTERVAL = 10.0; + init( CHANGE_FEED_COALESCE_LOCATIONS, true ); if( randomize && BUGGIFY ) CHANGE_FEED_COALESCE_LOCATIONS = false; init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1; init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 1af20dcd5d..01ba5ac0f5 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -10185,6 +10185,51 @@ ACTOR Future singleChangeFeedStream(Reference db, return Void(); } +void coalesceChangeFeedLocations(std::vector& locations) { + // only coalesce if same tenant + if (locations.front().tenantEntry.id != locations.back().tenantEntry.id) { + return; + } + std::vector teamUIDs; + bool anyToCoalesce = false; + teamUIDs.reserve(locations.size()); + for (int i = 0; i < locations.size(); i++) { + ASSERT(locations[i].locations->size() > 0); + UID teamUID = locations[i].locations->getId(0); + for (int j = 1; j < locations[i].locations->size(); j++) { + UID locUID = locations[i].locations->getId(j); + teamUID = UID(teamUID.first() ^ locUID.first(), teamUID.second() ^ locUID.second()); + } + if (!teamUIDs.empty() && teamUIDs.back() == teamUID) { + anyToCoalesce = true; + } + teamUIDs.push_back(teamUID); + } + + if (!anyToCoalesce) { + return; + } + + CODE_PROBE(true, "coalescing change feed locations"); + + // FIXME: there's technically a probability of "hash" collisions here, but it's extremely low. Could validate that + // two teams with the same xor are in fact the same, or fall back to not doing this if it gets a wrong shard server + // error or something + + std::vector coalesced; + coalesced.reserve(locations.size()); + coalesced.push_back(locations[0]); + for (int i = 1; i < locations.size(); i++) { + if (teamUIDs[i] == teamUIDs[i - 1]) { + coalesced.back().range = KeyRangeRef(coalesced.back().range.begin, locations[i].range.end); + } else { + coalesced.push_back(locations[i]); + } + } + + locations = coalesced; +} + ACTOR Future getChangeFeedStreamActor(Reference db, Reference results, Key rangeID, @@ -10226,6 +10271,10 @@ ACTOR Future getChangeFeedStreamActor(Reference db, throw unknown_change_feed(); } + if (CLIENT_KNOBS->CHANGE_FEED_COALESCE_LOCATIONS && locations.size() > 1) { + coalesceChangeFeedLocations(locations); + } + state std::vector chosenLocations(locations.size()); state int loc = 0; while (loc < locations.size()) { diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h index 75e54cf644..fa4fe246cf 100644 --- a/fdbclient/include/fdbclient/ClientKnobs.h +++ b/fdbclient/include/fdbclient/ClientKnobs.h @@ -79,6 +79,7 @@ public: double CHANGE_FEED_POP_TIMEOUT; int64_t CHANGE_FEED_STREAM_MIN_BYTES; double CHANGE_FEED_START_INTERVAL; + bool CHANGE_FEED_COALESCE_LOCATIONS; int MAX_BATCH_SIZE; double GRV_BATCH_TIMEOUT; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index e9b29b6c25..957f6213b2 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1018,12 +1018,12 @@ public: KeyRangeMap cachedRangeMap; // indicates if a key-range is being cached KeyRangeMap>> keyChangeFeed; - std::map> uidChangeFeed; + std::unordered_map> uidChangeFeed; Deque, Version>> changeFeedVersions; std::map> changeFeedDestroys; std::set currentChangeFeeds; std::set fetchingChangeFeeds; - std::unordered_map> changeFeedClientVersions; + std::unordered_map> changeFeedClientVersions; std::unordered_map changeFeedCleanupDurable; int64_t activeFeedQueries = 0; int64_t changeFeedMemoryBytes = 0; @@ -2645,7 +2645,8 @@ MutationsAndVersionRef filterMutationsInverted(Arena& arena, MutationsAndVersion MutationsAndVersionRef filterMutations(Arena& arena, MutationsAndVersionRef const& m, KeyRange const& range, - bool inverted) { + bool inverted, + int commonPrefixLength) { if (m.mutations.size() == 1 && m.mutations.back().param1 == lastEpochEndPrivateKey) { return m; } @@ -2657,22 +2658,28 @@ MutationsAndVersionRef filterMutations(Arena& arena, Optional> modifiedMutations; for (int i = 0; i < m.mutations.size(); i++) { if (m.mutations[i].type == MutationRef::SetValue) { - if (modifiedMutations.present() && range.contains(m.mutations[i].param1)) { + bool inRange = range.begin.compareSuffix(m.mutations[i].param1, commonPrefixLength) <= 0 && + m.mutations[i].param1.compareSuffix(range.end, commonPrefixLength) < 0; + if (modifiedMutations.present() && inRange) { modifiedMutations.get().push_back(arena, m.mutations[i]); } - if (!modifiedMutations.present() && !range.contains(m.mutations[i].param1)) { + if (!modifiedMutations.present() && !inRange) { modifiedMutations = m.mutations.slice(0, i); arena.dependsOn(range.arena()); } } else { ASSERT(m.mutations[i].type == MutationRef::ClearRange); + // param1 < range.begin || param2 > range.end if (!modifiedMutations.present() && - (m.mutations[i].param1 < range.begin || m.mutations[i].param2 > range.end)) { + (m.mutations[i].param1.compareSuffix(range.begin, commonPrefixLength) < 0 || + m.mutations[i].param2.compareSuffix(range.end, commonPrefixLength) > 0)) { modifiedMutations = m.mutations.slice(0, i); arena.dependsOn(range.arena()); } if (modifiedMutations.present()) { - if (m.mutations[i].param1 < range.end && range.begin < m.mutations[i].param2) { + // param1 < range.end && range.begin < param2 + if (m.mutations[i].param1.compareSuffix(range.end, commonPrefixLength) < 0 && + range.begin.compareSuffix(m.mutations[i].param2, commonPrefixLength) < 0) { modifiedMutations.get().push_back(arena, MutationRef(MutationRef::ClearRange, std::max(range.begin, m.mutations[i].param1), @@ -2776,9 +2783,12 @@ static std::deque>::const_iterator searchChan enum FeedDiskReadState { STARTING, NORMAL, DISK_CATCHUP }; ACTOR Future> getChangeFeedMutations(StorageServer* data, + Reference feedInfo, ChangeFeedStreamRequest req, bool inverted, bool atLatest, + bool doFilterMutations, + int commonFeedPrefixLength, FeedDiskReadState* feedDiskReadState) { state ChangeFeedStreamReply reply; state ChangeFeedStreamReply memoryReply; @@ -2809,21 +2819,19 @@ ACTOR Future> getChangeFeedMutations(Stor throw wrong_shard_server(); } - auto feed = data->uidChangeFeed.find(req.rangeID); - if (feed == data->uidChangeFeed.end()) { + if (feedInfo->removing) { throw unknown_change_feed(); } - state Reference feedInfo = feed->second; - // We must copy the mutationDeque when fetching the durable bytes in case mutations are popped from memory while // waiting for the results state Version dequeVersion = data->version.get(); state Version dequeKnownCommit = data->knownCommittedVersion.get(); state Version emptyVersion = feedInfo->emptyVersion; state Version durableValidationVersion = std::min(data->durableVersion.get(), feedInfo->durableFetchVersion.get()); + state Version lastMemoryVersion = invalidVersion; + state Version lastMemoryKnownCommitted = invalidVersion; Version fetchStorageVersion = std::max(feedInfo->fetchVersion, feedInfo->durableFetchVersion.get()); - state bool doFilterMutations = !req.range.contains(feedInfo->range); state bool doValidation = EXPENSIVE_VALIDATION; if (DEBUG_CF_TRACE) { @@ -2858,14 +2866,18 @@ ACTOR Future> getChangeFeedMutations(Stor break; } MutationsAndVersionRef m = *it; + + remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize(); if (doFilterMutations) { - m = filterMutations(memoryReply.arena, *it, req.range, inverted); + m = filterMutations(memoryReply.arena, *it, req.range, inverted, commonFeedPrefixLength); } if (m.mutations.size()) { memoryReply.arena.dependsOn(it->arena()); memoryReply.mutations.push_back(memoryReply.arena, m); - remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize(); } + + lastMemoryVersion = m.version; + lastMemoryKnownCommitted = m.knownCommittedVersion; it++; } } @@ -2957,7 +2969,7 @@ ACTOR Future> getChangeFeedMutations(Stor MutationsAndVersionRef m = MutationsAndVersionRef(mutations, version, knownCommittedVersion); if (doFilterMutations) { - m = filterMutations(reply.arena, m, req.range, inverted); + m = filterMutations(reply.arena, m, req.range, inverted, commonFeedPrefixLength); } if (m.mutations.size()) { reply.arena.dependsOn(mutations.arena()); @@ -3044,9 +3056,15 @@ ACTOR Future> getChangeFeedMutations(Stor reply.mutations.append(reply.arena, it, totalCount); // If still empty, that means disk results were filtered out, but skipped all memory results. Add an empty, // either the last version from disk - if (reply.mutations.empty() && res.size()) { - CODE_PROBE(true, "Change feed adding empty version after disk + memory filtered"); - reply.mutations.push_back(reply.arena, MutationsAndVersionRef(lastVersion, lastKnownCommitted)); + if (reply.mutations.empty()) { + if (res.size() || (lastMemoryVersion != invalidVersion && remainingLimitBytes <= 0)) { + CODE_PROBE(true, "Change feed adding empty version after disk + memory filtered"); + if (res.empty()) { + lastVersion = lastMemoryVersion; + lastKnownCommitted = lastMemoryKnownCommitted; + } + reply.mutations.push_back(reply.arena, MutationsAndVersionRef(lastVersion, lastKnownCommitted)); + } } } else if (reply.mutations.empty() || reply.mutations.back().version < lastVersion) { CODE_PROBE(true, "Change feed adding empty version after disk filtered"); @@ -3055,6 +3073,14 @@ ACTOR Future> getChangeFeedMutations(Stor } else { reply = memoryReply; *feedDiskReadState = FeedDiskReadState::NORMAL; + + // if we processed memory results that got entirely or mostly filtered, but we're not caught up, add an empty at + // the end + if ((reply.mutations.empty() || reply.mutations.back().version < lastMemoryVersion) && + remainingLimitBytes <= 0) { + CODE_PROBE(true, "Memory feed adding empty version after memory filtered"); + reply.mutations.push_back(reply.arena, MutationsAndVersionRef(lastMemoryVersion, lastMemoryKnownCommitted)); + } } bool gotAll = remainingLimitBytes > 0 && remainingDurableBytes > 0 && data->version.get() == startVersion; @@ -3215,6 +3241,10 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques state bool removeUID = false; state FeedDiskReadState feedDiskReadState = STARTING; state Optional blockedVersion; + state Reference feedInfo; + state Future streamEndReached; + state bool doFilterMutations; + state int commonFeedPrefixLength; try { ++data->counters.feedStreamQueries; @@ -3254,7 +3284,26 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); } - wait(success(waitForVersionNoTooOld(data, req.begin))); + Version checkTooOldVersion = (!req.canReadPopped || req.end == MAX_VERSION) ? req.begin : req.end; + wait(success(waitForVersionNoTooOld(data, checkTooOldVersion))); + + // set persistent references to map data structures to not have to re-look them up every loop + auto feed = data->uidChangeFeed.find(req.rangeID); + if (feed == data->uidChangeFeed.end() || feed->second->removing) { + req.reply.sendError(unknown_change_feed()); + // throw to delete from changeFeedClientVersions if present + throw unknown_change_feed(); + } + feedInfo = feed->second; + + streamEndReached = + (req.end == std::numeric_limits::max()) ? Never() : data->version.whenAtLeast(req.end); + + doFilterMutations = !req.range.contains(feedInfo->range); + commonFeedPrefixLength = 0; + if (doFilterMutations) { + commonFeedPrefixLength = commonPrefixLength(feedInfo->range.begin, feedInfo->range.end); + } // send an empty version at begin - 1 to establish the stream quickly ChangeFeedStreamReply emptyInitialReply; @@ -3298,8 +3347,8 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques wait(onReady); // keep this as not state variable so it is freed after sending to reduce memory - Future> feedReplyFuture = - getChangeFeedMutations(data, req, false, atLatest, &feedDiskReadState); + Future> feedReplyFuture = getChangeFeedMutations( + data, feedInfo, req, false, atLatest, doFilterMutations, commonFeedPrefixLength, &feedDiskReadState); if (atLatest && !removeUID && !feedReplyFuture.isReady()) { data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] = blockedVersion.present() ? blockedVersion.get() : data->prevVersion; @@ -3330,11 +3379,10 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques Version minVersion = removeUID ? data->version.get() : data->prevVersion; if (removeUID) { if (gotAll || req.begin == req.end) { - data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()].erase(req.id); + clientVersions.erase(req.id); removeUID = false; } else { - data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] = - feedReply.mutations.back().version; + clientVersions[req.id] = feedReply.mutations.back().version; } } @@ -3355,19 +3403,16 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques } if (gotAll) { blockedVersion = Optional(); - auto feed = data->uidChangeFeed.find(req.rangeID); - if (feed == data->uidChangeFeed.end() || feed->second->removing) { + if (feedInfo->removing) { req.reply.sendError(unknown_change_feed()); // throw to delete from changeFeedClientVersions if present throw unknown_change_feed(); } choose { - when(wait(feed->second->newMutations.onTrigger())) {} - when(wait(req.end == std::numeric_limits::max() ? Future(Never()) - : data->version.whenAtLeast(req.end))) {} + when(wait(feedInfo->newMutations.onTrigger())) {} + when(wait(streamEndReached)) {} } - auto feedItr = data->uidChangeFeed.find(req.rangeID); - if (feedItr == data->uidChangeFeed.end() || feedItr->second->removing) { + if (feedInfo->removing) { req.reply.sendError(unknown_change_feed()); // throw to delete from changeFeedClientVersions if present throw unknown_change_feed();