More ss cf perf fixes main (#9109)

* changing future version logic for change feed fetch

* Optimizing change feed data structures and accesses

* coalescing change feed request ranges for merge cursor if they're to the same team

* fixing over-read of memory mutations for change feeds

* feed filter mutations common prefix cpu optimiation

* fix formatting
This commit is contained in:
Josh Slocum 2023-01-12 16:15:05 -06:00 committed by GitHub
parent 507f59956a
commit b6450f9eaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 128 additions and 32 deletions

View File

@ -81,7 +81,8 @@ void ClientKnobs::initialize(Randomize randomize) {
init( CHANGE_FEED_CACHE_SIZE, 100000 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_SIZE = 1;
init( CHANGE_FEED_POP_TIMEOUT, 10.0 );
init( CHANGE_FEED_STREAM_MIN_BYTES, 1e4 ); if( randomize && BUGGIFY ) CHANGE_FEED_STREAM_MIN_BYTES = 1;
init( CHANGE_FEED_START_INTERVAL, 10.0 );
init( CHANGE_FEED_START_INTERVAL, 20.0 ); if( randomize && BUGGIFY ) CHANGE_FEED_START_INTERVAL = 10.0;
init( CHANGE_FEED_COALESCE_LOCATIONS, true ); if( randomize && BUGGIFY ) CHANGE_FEED_COALESCE_LOCATIONS = false;
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;

View File

@ -10185,6 +10185,51 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
return Void();
}
void coalesceChangeFeedLocations(std::vector<KeyRangeLocationInfo>& locations) {
// only coalesce if same tenant
if (locations.front().tenantEntry.id != locations.back().tenantEntry.id) {
return;
}
std::vector<UID> teamUIDs;
bool anyToCoalesce = false;
teamUIDs.reserve(locations.size());
for (int i = 0; i < locations.size(); i++) {
ASSERT(locations[i].locations->size() > 0);
UID teamUID = locations[i].locations->getId(0);
for (int j = 1; j < locations[i].locations->size(); j++) {
UID locUID = locations[i].locations->getId(j);
teamUID = UID(teamUID.first() ^ locUID.first(), teamUID.second() ^ locUID.second());
}
if (!teamUIDs.empty() && teamUIDs.back() == teamUID) {
anyToCoalesce = true;
}
teamUIDs.push_back(teamUID);
}
if (!anyToCoalesce) {
return;
}
CODE_PROBE(true, "coalescing change feed locations");
// FIXME: there's technically a probability of "hash" collisions here, but it's extremely low. Could validate that
// two teams with the same xor are in fact the same, or fall back to not doing this if it gets a wrong shard server
// error or something
std::vector<KeyRangeLocationInfo> coalesced;
coalesced.reserve(locations.size());
coalesced.push_back(locations[0]);
for (int i = 1; i < locations.size(); i++) {
if (teamUIDs[i] == teamUIDs[i - 1]) {
coalesced.back().range = KeyRangeRef(coalesced.back().range.begin, locations[i].range.end);
} else {
coalesced.push_back(locations[i]);
}
}
locations = coalesced;
}
ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
Reference<ChangeFeedData> results,
Key rangeID,
@ -10226,6 +10271,10 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
throw unknown_change_feed();
}
if (CLIENT_KNOBS->CHANGE_FEED_COALESCE_LOCATIONS && locations.size() > 1) {
coalesceChangeFeedLocations(locations);
}
state std::vector<int> chosenLocations(locations.size());
state int loc = 0;
while (loc < locations.size()) {

View File

@ -79,6 +79,7 @@ public:
double CHANGE_FEED_POP_TIMEOUT;
int64_t CHANGE_FEED_STREAM_MIN_BYTES;
double CHANGE_FEED_START_INTERVAL;
bool CHANGE_FEED_COALESCE_LOCATIONS;
int MAX_BATCH_SIZE;
double GRV_BATCH_TIMEOUT;

View File

@ -1018,12 +1018,12 @@ public:
KeyRangeMap<bool> cachedRangeMap; // indicates if a key-range is being cached
KeyRangeMap<std::vector<Reference<ChangeFeedInfo>>> keyChangeFeed;
std::map<Key, Reference<ChangeFeedInfo>> uidChangeFeed;
std::unordered_map<Key, Reference<ChangeFeedInfo>> uidChangeFeed;
Deque<std::pair<std::vector<Key>, Version>> changeFeedVersions;
std::map<UID, PromiseStream<Key>> changeFeedDestroys;
std::set<Key> currentChangeFeeds;
std::set<Key> fetchingChangeFeeds;
std::unordered_map<NetworkAddress, std::map<UID, Version>> changeFeedClientVersions;
std::unordered_map<NetworkAddress, std::unordered_map<UID, Version>> changeFeedClientVersions;
std::unordered_map<Key, Version> changeFeedCleanupDurable;
int64_t activeFeedQueries = 0;
int64_t changeFeedMemoryBytes = 0;
@ -2645,7 +2645,8 @@ MutationsAndVersionRef filterMutationsInverted(Arena& arena, MutationsAndVersion
MutationsAndVersionRef filterMutations(Arena& arena,
MutationsAndVersionRef const& m,
KeyRange const& range,
bool inverted) {
bool inverted,
int commonPrefixLength) {
if (m.mutations.size() == 1 && m.mutations.back().param1 == lastEpochEndPrivateKey) {
return m;
}
@ -2657,22 +2658,28 @@ MutationsAndVersionRef filterMutations(Arena& arena,
Optional<VectorRef<MutationRef>> modifiedMutations;
for (int i = 0; i < m.mutations.size(); i++) {
if (m.mutations[i].type == MutationRef::SetValue) {
if (modifiedMutations.present() && range.contains(m.mutations[i].param1)) {
bool inRange = range.begin.compareSuffix(m.mutations[i].param1, commonPrefixLength) <= 0 &&
m.mutations[i].param1.compareSuffix(range.end, commonPrefixLength) < 0;
if (modifiedMutations.present() && inRange) {
modifiedMutations.get().push_back(arena, m.mutations[i]);
}
if (!modifiedMutations.present() && !range.contains(m.mutations[i].param1)) {
if (!modifiedMutations.present() && !inRange) {
modifiedMutations = m.mutations.slice(0, i);
arena.dependsOn(range.arena());
}
} else {
ASSERT(m.mutations[i].type == MutationRef::ClearRange);
// param1 < range.begin || param2 > range.end
if (!modifiedMutations.present() &&
(m.mutations[i].param1 < range.begin || m.mutations[i].param2 > range.end)) {
(m.mutations[i].param1.compareSuffix(range.begin, commonPrefixLength) < 0 ||
m.mutations[i].param2.compareSuffix(range.end, commonPrefixLength) > 0)) {
modifiedMutations = m.mutations.slice(0, i);
arena.dependsOn(range.arena());
}
if (modifiedMutations.present()) {
if (m.mutations[i].param1 < range.end && range.begin < m.mutations[i].param2) {
// param1 < range.end && range.begin < param2
if (m.mutations[i].param1.compareSuffix(range.end, commonPrefixLength) < 0 &&
range.begin.compareSuffix(m.mutations[i].param2, commonPrefixLength) < 0) {
modifiedMutations.get().push_back(arena,
MutationRef(MutationRef::ClearRange,
std::max(range.begin, m.mutations[i].param1),
@ -2776,9 +2783,12 @@ static std::deque<Standalone<MutationsAndVersionRef>>::const_iterator searchChan
enum FeedDiskReadState { STARTING, NORMAL, DISK_CATCHUP };
ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(StorageServer* data,
Reference<ChangeFeedInfo> feedInfo,
ChangeFeedStreamRequest req,
bool inverted,
bool atLatest,
bool doFilterMutations,
int commonFeedPrefixLength,
FeedDiskReadState* feedDiskReadState) {
state ChangeFeedStreamReply reply;
state ChangeFeedStreamReply memoryReply;
@ -2809,21 +2819,19 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
throw wrong_shard_server();
}
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed == data->uidChangeFeed.end()) {
if (feedInfo->removing) {
throw unknown_change_feed();
}
state Reference<ChangeFeedInfo> feedInfo = feed->second;
// We must copy the mutationDeque when fetching the durable bytes in case mutations are popped from memory while
// waiting for the results
state Version dequeVersion = data->version.get();
state Version dequeKnownCommit = data->knownCommittedVersion.get();
state Version emptyVersion = feedInfo->emptyVersion;
state Version durableValidationVersion = std::min(data->durableVersion.get(), feedInfo->durableFetchVersion.get());
state Version lastMemoryVersion = invalidVersion;
state Version lastMemoryKnownCommitted = invalidVersion;
Version fetchStorageVersion = std::max(feedInfo->fetchVersion, feedInfo->durableFetchVersion.get());
state bool doFilterMutations = !req.range.contains(feedInfo->range);
state bool doValidation = EXPENSIVE_VALIDATION;
if (DEBUG_CF_TRACE) {
@ -2858,14 +2866,18 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
break;
}
MutationsAndVersionRef m = *it;
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize();
if (doFilterMutations) {
m = filterMutations(memoryReply.arena, *it, req.range, inverted);
m = filterMutations(memoryReply.arena, *it, req.range, inverted, commonFeedPrefixLength);
}
if (m.mutations.size()) {
memoryReply.arena.dependsOn(it->arena());
memoryReply.mutations.push_back(memoryReply.arena, m);
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize();
}
lastMemoryVersion = m.version;
lastMemoryKnownCommitted = m.knownCommittedVersion;
it++;
}
}
@ -2957,7 +2969,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
MutationsAndVersionRef m = MutationsAndVersionRef(mutations, version, knownCommittedVersion);
if (doFilterMutations) {
m = filterMutations(reply.arena, m, req.range, inverted);
m = filterMutations(reply.arena, m, req.range, inverted, commonFeedPrefixLength);
}
if (m.mutations.size()) {
reply.arena.dependsOn(mutations.arena());
@ -3044,10 +3056,16 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
reply.mutations.append(reply.arena, it, totalCount);
// If still empty, that means disk results were filtered out, but skipped all memory results. Add an empty,
// either the last version from disk
if (reply.mutations.empty() && res.size()) {
if (reply.mutations.empty()) {
if (res.size() || (lastMemoryVersion != invalidVersion && remainingLimitBytes <= 0)) {
CODE_PROBE(true, "Change feed adding empty version after disk + memory filtered");
if (res.empty()) {
lastVersion = lastMemoryVersion;
lastKnownCommitted = lastMemoryKnownCommitted;
}
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(lastVersion, lastKnownCommitted));
}
}
} else if (reply.mutations.empty() || reply.mutations.back().version < lastVersion) {
CODE_PROBE(true, "Change feed adding empty version after disk filtered");
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(lastVersion, lastKnownCommitted));
@ -3055,6 +3073,14 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
} else {
reply = memoryReply;
*feedDiskReadState = FeedDiskReadState::NORMAL;
// if we processed memory results that got entirely or mostly filtered, but we're not caught up, add an empty at
// the end
if ((reply.mutations.empty() || reply.mutations.back().version < lastMemoryVersion) &&
remainingLimitBytes <= 0) {
CODE_PROBE(true, "Memory feed adding empty version after memory filtered");
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(lastMemoryVersion, lastMemoryKnownCommitted));
}
}
bool gotAll = remainingLimitBytes > 0 && remainingDurableBytes > 0 && data->version.get() == startVersion;
@ -3215,6 +3241,10 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
state bool removeUID = false;
state FeedDiskReadState feedDiskReadState = STARTING;
state Optional<Version> blockedVersion;
state Reference<ChangeFeedInfo> feedInfo;
state Future<Void> streamEndReached;
state bool doFilterMutations;
state int commonFeedPrefixLength;
try {
++data->counters.feedStreamQueries;
@ -3254,7 +3284,26 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
}
wait(success(waitForVersionNoTooOld(data, req.begin)));
Version checkTooOldVersion = (!req.canReadPopped || req.end == MAX_VERSION) ? req.begin : req.end;
wait(success(waitForVersionNoTooOld(data, checkTooOldVersion)));
// set persistent references to map data structures to not have to re-look them up every loop
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed == data->uidChangeFeed.end() || feed->second->removing) {
req.reply.sendError(unknown_change_feed());
// throw to delete from changeFeedClientVersions if present
throw unknown_change_feed();
}
feedInfo = feed->second;
streamEndReached =
(req.end == std::numeric_limits<Version>::max()) ? Never() : data->version.whenAtLeast(req.end);
doFilterMutations = !req.range.contains(feedInfo->range);
commonFeedPrefixLength = 0;
if (doFilterMutations) {
commonFeedPrefixLength = commonPrefixLength(feedInfo->range.begin, feedInfo->range.end);
}
// send an empty version at begin - 1 to establish the stream quickly
ChangeFeedStreamReply emptyInitialReply;
@ -3298,8 +3347,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
wait(onReady);
// keep this as not state variable so it is freed after sending to reduce memory
Future<std::pair<ChangeFeedStreamReply, bool>> feedReplyFuture =
getChangeFeedMutations(data, req, false, atLatest, &feedDiskReadState);
Future<std::pair<ChangeFeedStreamReply, bool>> feedReplyFuture = getChangeFeedMutations(
data, feedInfo, req, false, atLatest, doFilterMutations, commonFeedPrefixLength, &feedDiskReadState);
if (atLatest && !removeUID && !feedReplyFuture.isReady()) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] =
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;
@ -3330,11 +3379,10 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
Version minVersion = removeUID ? data->version.get() : data->prevVersion;
if (removeUID) {
if (gotAll || req.begin == req.end) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()].erase(req.id);
clientVersions.erase(req.id);
removeUID = false;
} else {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] =
feedReply.mutations.back().version;
clientVersions[req.id] = feedReply.mutations.back().version;
}
}
@ -3355,19 +3403,16 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
}
if (gotAll) {
blockedVersion = Optional<Version>();
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed == data->uidChangeFeed.end() || feed->second->removing) {
if (feedInfo->removing) {
req.reply.sendError(unknown_change_feed());
// throw to delete from changeFeedClientVersions if present
throw unknown_change_feed();
}
choose {
when(wait(feed->second->newMutations.onTrigger())) {}
when(wait(req.end == std::numeric_limits<Version>::max() ? Future<Void>(Never())
: data->version.whenAtLeast(req.end))) {}
when(wait(feedInfo->newMutations.onTrigger())) {}
when(wait(streamEndReached)) {}
}
auto feedItr = data->uidChangeFeed.find(req.rangeID);
if (feedItr == data->uidChangeFeed.end() || feedItr->second->removing) {
if (feedInfo->removing) {
req.reply.sendError(unknown_change_feed());
// throw to delete from changeFeedClientVersions if present
throw unknown_change_feed();