From 27151b08312a8ab469f0743d6f82f1db5085d88b Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Wed, 1 Sep 2021 14:35:51 -0700 Subject: [PATCH] support reads from range feeds that span multiple storage servers --- fdbclient/NativeAPI.actor.cpp | 237 ++++++++++++++++++++++++----- fdbclient/StorageServerInterface.h | 6 +- fdbserver/storageserver.actor.cpp | 11 +- 3 files changed, 210 insertions(+), 44 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 78ecd5c3f8..993391b237 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6579,6 +6579,115 @@ Future>> DatabaseContext::getChange return getChangeFeedMutationsActor(Reference::addRef(this), rangeID, begin, end, range); } +ACTOR Future singleChangeFeedStream(StorageServerInterface interf, + PromiseStream> results, + Key rangeID, + Version begin, + Version end, + KeyRange range) { + loop { + try { + state ChangeFeedStreamRequest req; + req.rangeID = rangeID; + req.begin = begin; + req.end = end; + req.range = range; + + state ReplyPromiseStream replyStream = interf.changeFeedStream.getReplyStream(req); + + loop { + state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture()); + begin = rep.mutations.back().version + 1; + state int resultLoc = 0; + while (resultLoc < rep.mutations.size()) { + if (rep.mutations[resultLoc].mutations.size() || rep.mutations[resultLoc].version + 1 == end) { + wait(results.onEmpty()); + results.send(rep.mutations[resultLoc]); + } + resultLoc++; + } + if (begin == end) { + return Void(); + } + } + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || + e.code() == error_code_connection_failed) { + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY)); + } else { + results.sendError(e); + return Void(); + } + } + } +} + +struct MutationAndVersionStream { + Standalone next; + PromiseStream> results; + + bool operator<(MutationAndVersionStream const& rhs) const { return next.version < rhs.next.version; } +}; + +ACTOR Future mergeChangeFeedStream(std::vector> interfs, + PromiseStream>> results, + Key rangeID, + Version* begin, + Version end) { + state std::priority_queue> mutations; + state std::vector> fetchers(interfs.size()); + state std::vector streams(interfs.size()); + for (int i = 0; i < interfs.size(); i++) { + fetchers[i] = + singleChangeFeedStream(interfs[i].first, streams[i].results, rangeID, *begin, end, interfs[i].second); + } + state int interfNum = 0; + while (interfNum < interfs.size()) { + try { + Standalone res = waitNext(streams[interfNum].results.getFuture()); + streams[interfNum].next = res; + mutations.push(streams[interfNum]); + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw e; + } + } + interfNum++; + } + state Version checkVersion = invalidVersion; + state Standalone> nextOut; + while (mutations.size()) { + state MutationAndVersionStream nextStream = mutations.top(); + mutations.pop(); + ASSERT(nextStream.next.version >= checkVersion); + if (nextStream.next.version != checkVersion) { + if (nextOut.size()) { + *begin = checkVersion + 1; + results.send(nextOut); + nextOut = Standalone>(); + } + checkVersion = nextStream.next.version; + } + nextOut.push_back_deep(nextOut.arena(), nextStream.next); + try { + Standalone res = waitNext(nextStream.results.getFuture()); + nextStream.next = res; + mutations.push(nextStream); + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw e; + } + } + } + if (nextOut.size()) { + results.send(nextOut); + } + throw end_of_stream(); +} + ACTOR Future getChangeFeedStreamActor(Reference db, PromiseStream>> results, StringRef rangeID, @@ -6609,26 +6718,27 @@ ACTOR Future getChangeFeedStreamActor(Reference db, state vector>> locations = wait(getKeyRangeLocations(cx, keys, - 100, + 1000, Reverse::False, &StorageServerInterface::changeFeed, TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); - if (locations.size() > 1) { + if (locations.size() >= 1000) { results.sendError(unsupported_operation()); return Void(); } - state int useIdx = -1; - - loop { + state std::vector chosenLocations(locations.size()); + state int loc = 0; + while (loc < locations.size()) { // FIXME: create a load balance function for this code so future users of reply streams do not have // to duplicate this code int count = 0; - for (int i = 0; i < locations[0].second->size(); i++) { + int useIdx = -1; + for (int i = 0; i < locations[loc].second->size(); i++) { if (!IFailureMonitor::failureMonitor() .getState( - locations[0].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint()) + locations[loc].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint()) .failed) { if (deterministicRandom()->random01() <= 1.0 / ++count) { useIdx = i; @@ -6637,13 +6747,15 @@ ACTOR Future getChangeFeedStreamActor(Reference db, } if (useIdx >= 0) { - break; + chosenLocations[loc] = useIdx; + loc++; + continue; } - vector> ok(locations[0].second->size()); + vector> ok(locations[loc].second->size()); for (int i = 0; i < ok.size(); i++) { ok[i] = IFailureMonitor::failureMonitor().onStateEqual( - locations[0].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint(), + locations[loc].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint(), FailureStatus(false)); } @@ -6654,23 +6766,36 @@ ACTOR Future getChangeFeedStreamActor(Reference db, } wait(allAlternativesFailedDelay(quorum(ok, 1))); + loc = 0; } - state ChangeFeedStreamRequest req; - req.rangeID = rangeID; - req.begin = begin; - req.end = end; + if (locations.size() > 1) { + std::vector> interfs; + for (int i = 0; i < locations.size(); i++) { + interfs.push_back( + std::make_pair(locations[i].second->getInterface(chosenLocations[i]), locations[i].first)); + } + wait(mergeChangeFeedStream(interfs, results, rangeID, &begin, end) || cx->connectionFileChanged()); + } else { + state ChangeFeedStreamRequest req; + req.rangeID = rangeID; + req.begin = begin; + req.end = end; + req.range = range; - state ReplyPromiseStream replyStream = - locations[0].second->get(useIdx, &StorageServerInterface::changeFeedStream).getReplyStream(req); + state ReplyPromiseStream replyStream = + locations[0] + .second->get(chosenLocations[0], &StorageServerInterface::changeFeedStream) + .getReplyStream(req); - loop { - wait(results.onEmpty()); - choose { - when(wait(cx->connectionFileChanged())) { break; } - when(ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) { - begin = rep.mutations.back().version + 1; - results.send(Standalone>(rep.mutations, rep.arena)); + loop { + wait(results.onEmpty()); + choose { + when(wait(cx->connectionFileChanged())) { break; } + when(ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) { + begin = rep.mutations.back().version + 1; + results.send(Standalone>(rep.mutations, rep.arena)); + } } } } @@ -6699,6 +6824,29 @@ Future DatabaseContext::getChangeFeedStream( return getChangeFeedStreamActor(Reference::addRef(this), results, rangeID, begin, end, range); } +ACTOR Future>> singleLocationOverlappingChangeFeeds( + Database cx, + Reference location, + KeyRangeRef range, + Version minVersion) { + state OverlappingChangeFeedsRequest req; + req.range = range; + req.minVersion = minVersion; + + OverlappingChangeFeedsReply rep = wait(loadBalance(cx.getPtr(), + location, + &StorageServerInterface::overlappingChangeFeeds, + req, + TaskPriority::DefaultPromiseEndpoint, + AtMostOnce::False, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr)); + return rep.rangeIds; +} + +bool compareChangeFeedResult(const std::pair& i, const std::pair& j) { + return i.first < j.first; +} + ACTOR Future>> getOverlappingChangeFeedsActor(Reference db, KeyRangeRef range, Version minVersion) { @@ -6708,26 +6856,28 @@ ACTOR Future>> getOverlappingChangeFeedsAct state vector>> locations = wait(getKeyRangeLocations(cx, range, - 100, + 1000, Reverse::False, &StorageServerInterface::changeFeed, TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); - if (locations.size() > 1) { + if (locations.size() >= 1000) { throw unsupported_operation(); } - state OverlappingChangeFeedsRequest req; - req.range = range; + state std::vector>>> allOverlappingRequests; + for (auto& it : locations) { + allOverlappingRequests.push_back(singleLocationOverlappingChangeFeeds(cx, it.second, range, minVersion)); + } + wait(waitForAll(allOverlappingRequests)); - OverlappingChangeFeedsReply rep = wait(loadBalance(cx.getPtr(), - locations[0].second, - &StorageServerInterface::overlappingChangeFeeds, - req, - TaskPriority::DefaultPromiseEndpoint, - AtMostOnce::False, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr)); - return rep.rangeIds; + std::vector> result; + for (auto& it : allOverlappingRequests) { + result.insert(result.end(), it.get().begin(), it.get().end()); + } + std::sort(result.begin(), result.end(), compareChangeFeedResult); + result.resize(std::unique(result.begin(), result.end()) - result.begin()); + return result; } Future>> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range, @@ -6748,20 +6898,27 @@ ACTOR Future popChangeFeedMutationsActor(Reference db, St state vector>> locations = wait(getKeyRangeLocations(cx, keys, - 100, + 1000, Reverse::False, &StorageServerInterface::changeFeed, TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); - if (locations.size() > 1) { + if (locations.size() >= 1000) { throw unsupported_operation(); } + state std::vector allInterfs; + for (auto& it : locations) { + for (int i = 0; i < it.second->size(); i++) { + allInterfs.push_back(it.second->getInterface(i)); + } + } + uniquify(allInterfs); + // FIXME: lookup both the src and dest shards as of the pop version to ensure all locations are popped state std::vector> popRequests; - for (int i = 0; i < locations[0].second->size(); i++) { - popRequests.push_back( - locations[0].second->getInterface(i).changeFeedPop.getReply(ChangeFeedPopRequest(rangeID, version))); + for (int i = 0; i < allInterfs.size(); i++) { + popRequests.push_back(allInterfs[i].changeFeedPop.getReply(ChangeFeedPopRequest(rangeID, version))); } wait(waitForAll(popRequests)); return Void(); diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index e11491e0b2..f644c7cee3 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -709,12 +709,13 @@ struct ChangeFeedStreamRequest { Key rangeID; Version begin = 0; Version end = 0; + KeyRange range; ReplyPromiseStream reply; ChangeFeedStreamRequest() {} template void serialize(Ar& ar) { - serializer(ar, rangeID, begin, end, reply, spanContext, arena); + serializer(ar, rangeID, begin, end, range, reply, spanContext, arena); } }; @@ -751,6 +752,7 @@ struct OverlappingChangeFeedsReply { struct OverlappingChangeFeedsRequest { constexpr static FileIdentifier file_identifier = 10726174; KeyRange range; + Version minVersion; ReplyPromise reply; OverlappingChangeFeedsRequest() {} @@ -758,7 +760,7 @@ struct OverlappingChangeFeedsRequest { template void serialize(Ar& ar) { - serializer(ar, range, reply); + serializer(ar, range, minVersion, reply); } }; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 3e8c26b527..2d22dbce57 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1538,6 +1538,7 @@ ACTOR Future changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) ACTOR Future overlappingChangeFeedsQ(StorageServer* data, OverlappingChangeFeedsRequest req) { wait(delay(0)); + wait(data->version.whenAtLeast(req.minVersion)); auto ranges = data->keyChangeFeed.intersectingRanges(req.range); std::map rangeIds; for (auto r : ranges) { @@ -1671,8 +1672,6 @@ ACTOR Future changeFeedQ(StorageServer* data, ChangeFeedRequest req) { } ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req) -// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large -// selector offset prevents all data from being read in one range read { state Span span("SS:getChangeFeedStream"_loc, { req.spanContext }); state Version begin = req.begin; @@ -1680,6 +1679,11 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques wait(delay(0, TaskPriority::DefaultEndpoint)); + state uint64_t changeCounter = data->shardChangeCounter; + if (!data->isReadable(req.range)) { + throw wrong_shard_server(); + } + try { loop { wait(req.reply.onReady()); @@ -1688,6 +1692,9 @@ ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques feedRequest.begin = begin; feedRequest.end = req.end; ChangeFeedReply feedReply = wait(getChangeFeedMutations(data, feedRequest)); + + data->checkChangeCounter(changeCounter, req.range); + begin = feedReply.mutations.back().version + 1; req.reply.send(ChangeFeedStreamReply(feedReply)); if (feedReply.mutations.back().version == req.end - 1) {