support reads from range feeds that span multiple storage servers

This commit is contained in:
Evan Tschannen 2021-09-01 14:35:51 -07:00
parent a278d2977a
commit 27151b0831
3 changed files with 210 additions and 44 deletions

View File

@ -6579,6 +6579,115 @@ Future<Standalone<VectorRef<MutationsAndVersionRef>>> DatabaseContext::getChange
return getChangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID, begin, end, range); return getChangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID, begin, end, range);
} }
ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
PromiseStream<Standalone<MutationsAndVersionRef>> results,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
loop {
try {
state ChangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = begin;
req.end = end;
req.range = range;
state ReplyPromiseStream<ChangeFeedStreamReply> replyStream = interf.changeFeedStream.getReplyStream(req);
loop {
state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture());
begin = rep.mutations.back().version + 1;
state int resultLoc = 0;
while (resultLoc < rep.mutations.size()) {
if (rep.mutations[resultLoc].mutations.size() || rep.mutations[resultLoc].version + 1 == end) {
wait(results.onEmpty());
results.send(rep.mutations[resultLoc]);
}
resultLoc++;
}
if (begin == end) {
return Void();
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
e.code() == error_code_connection_failed) {
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
} else {
results.sendError(e);
return Void();
}
}
}
}
struct MutationAndVersionStream {
Standalone<MutationsAndVersionRef> next;
PromiseStream<Standalone<MutationsAndVersionRef>> results;
bool operator<(MutationAndVersionStream const& rhs) const { return next.version < rhs.next.version; }
};
ACTOR Future<Void> mergeChangeFeedStream(std::vector<std::pair<StorageServerInterface, KeyRange>> interfs,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results,
Key rangeID,
Version* begin,
Version end) {
state std::priority_queue<MutationAndVersionStream, std::vector<MutationAndVersionStream>> mutations;
state std::vector<Future<Void>> fetchers(interfs.size());
state std::vector<MutationAndVersionStream> streams(interfs.size());
for (int i = 0; i < interfs.size(); i++) {
fetchers[i] =
singleChangeFeedStream(interfs[i].first, streams[i].results, rangeID, *begin, end, interfs[i].second);
}
state int interfNum = 0;
while (interfNum < interfs.size()) {
try {
Standalone<MutationsAndVersionRef> res = waitNext(streams[interfNum].results.getFuture());
streams[interfNum].next = res;
mutations.push(streams[interfNum]);
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
throw e;
}
}
interfNum++;
}
state Version checkVersion = invalidVersion;
state Standalone<VectorRef<MutationsAndVersionRef>> nextOut;
while (mutations.size()) {
state MutationAndVersionStream nextStream = mutations.top();
mutations.pop();
ASSERT(nextStream.next.version >= checkVersion);
if (nextStream.next.version != checkVersion) {
if (nextOut.size()) {
*begin = checkVersion + 1;
results.send(nextOut);
nextOut = Standalone<VectorRef<MutationsAndVersionRef>>();
}
checkVersion = nextStream.next.version;
}
nextOut.push_back_deep(nextOut.arena(), nextStream.next);
try {
Standalone<MutationsAndVersionRef> res = waitNext(nextStream.results.getFuture());
nextStream.next = res;
mutations.push(nextStream);
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
throw e;
}
}
}
if (nextOut.size()) {
results.send(nextOut);
}
throw end_of_stream();
}
ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db, ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results, PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results,
StringRef rangeID, StringRef rangeID,
@ -6609,26 +6718,27 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
state vector<pair<KeyRange, Reference<LocationInfo>>> locations = state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx, wait(getKeyRangeLocations(cx,
keys, keys,
100, 1000,
Reverse::False, Reverse::False,
&StorageServerInterface::changeFeed, &StorageServerInterface::changeFeed,
TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
if (locations.size() > 1) { if (locations.size() >= 1000) {
results.sendError(unsupported_operation()); results.sendError(unsupported_operation());
return Void(); return Void();
} }
state int useIdx = -1; state std::vector<int> chosenLocations(locations.size());
state int loc = 0;
loop { while (loc < locations.size()) {
// FIXME: create a load balance function for this code so future users of reply streams do not have // FIXME: create a load balance function for this code so future users of reply streams do not have
// to duplicate this code // to duplicate this code
int count = 0; int count = 0;
for (int i = 0; i < locations[0].second->size(); i++) { int useIdx = -1;
for (int i = 0; i < locations[loc].second->size(); i++) {
if (!IFailureMonitor::failureMonitor() if (!IFailureMonitor::failureMonitor()
.getState( .getState(
locations[0].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint()) locations[loc].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint())
.failed) { .failed) {
if (deterministicRandom()->random01() <= 1.0 / ++count) { if (deterministicRandom()->random01() <= 1.0 / ++count) {
useIdx = i; useIdx = i;
@ -6637,13 +6747,15 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
} }
if (useIdx >= 0) { if (useIdx >= 0) {
break; chosenLocations[loc] = useIdx;
loc++;
continue;
} }
vector<Future<Void>> ok(locations[0].second->size()); vector<Future<Void>> ok(locations[loc].second->size());
for (int i = 0; i < ok.size(); i++) { for (int i = 0; i < ok.size(); i++) {
ok[i] = IFailureMonitor::failureMonitor().onStateEqual( ok[i] = IFailureMonitor::failureMonitor().onStateEqual(
locations[0].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint(), locations[loc].second->get(i, &StorageServerInterface::changeFeedStream).getEndpoint(),
FailureStatus(false)); FailureStatus(false));
} }
@ -6654,23 +6766,36 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
} }
wait(allAlternativesFailedDelay(quorum(ok, 1))); wait(allAlternativesFailedDelay(quorum(ok, 1)));
loc = 0;
} }
state ChangeFeedStreamRequest req; if (locations.size() > 1) {
req.rangeID = rangeID; std::vector<std::pair<StorageServerInterface, KeyRange>> interfs;
req.begin = begin; for (int i = 0; i < locations.size(); i++) {
req.end = end; interfs.push_back(
std::make_pair(locations[i].second->getInterface(chosenLocations[i]), locations[i].first));
}
wait(mergeChangeFeedStream(interfs, results, rangeID, &begin, end) || cx->connectionFileChanged());
} else {
state ChangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = begin;
req.end = end;
req.range = range;
state ReplyPromiseStream<ChangeFeedStreamReply> replyStream = state ReplyPromiseStream<ChangeFeedStreamReply> replyStream =
locations[0].second->get(useIdx, &StorageServerInterface::changeFeedStream).getReplyStream(req); locations[0]
.second->get(chosenLocations[0], &StorageServerInterface::changeFeedStream)
.getReplyStream(req);
loop { loop {
wait(results.onEmpty()); wait(results.onEmpty());
choose { choose {
when(wait(cx->connectionFileChanged())) { break; } when(wait(cx->connectionFileChanged())) { break; }
when(ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) { when(ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) {
begin = rep.mutations.back().version + 1; begin = rep.mutations.back().version + 1;
results.send(Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena)); results.send(Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena));
}
} }
} }
} }
@ -6699,6 +6824,29 @@ Future<Void> DatabaseContext::getChangeFeedStream(
return getChangeFeedStreamActor(Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range); return getChangeFeedStreamActor(Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range);
} }
ACTOR Future<std::vector<std::pair<Key, KeyRange>>> singleLocationOverlappingChangeFeeds(
Database cx,
Reference<LocationInfo> location,
KeyRangeRef range,
Version minVersion) {
state OverlappingChangeFeedsRequest req;
req.range = range;
req.minVersion = minVersion;
OverlappingChangeFeedsReply rep = wait(loadBalance(cx.getPtr(),
location,
&StorageServerInterface::overlappingChangeFeeds,
req,
TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::False,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr));
return rep.rangeIds;
}
bool compareChangeFeedResult(const std::pair<Key, KeyRange>& i, const std::pair<Key, KeyRange>& j) {
return i.first < j.first;
}
ACTOR Future<std::vector<std::pair<Key, KeyRange>>> getOverlappingChangeFeedsActor(Reference<DatabaseContext> db, ACTOR Future<std::vector<std::pair<Key, KeyRange>>> getOverlappingChangeFeedsActor(Reference<DatabaseContext> db,
KeyRangeRef range, KeyRangeRef range,
Version minVersion) { Version minVersion) {
@ -6708,26 +6856,28 @@ ACTOR Future<std::vector<std::pair<Key, KeyRange>>> getOverlappingChangeFeedsAct
state vector<pair<KeyRange, Reference<LocationInfo>>> locations = state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx, wait(getKeyRangeLocations(cx,
range, range,
100, 1000,
Reverse::False, Reverse::False,
&StorageServerInterface::changeFeed, &StorageServerInterface::changeFeed,
TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
if (locations.size() > 1) { if (locations.size() >= 1000) {
throw unsupported_operation(); throw unsupported_operation();
} }
state OverlappingChangeFeedsRequest req; state std::vector<Future<std::vector<std::pair<Key, KeyRange>>>> allOverlappingRequests;
req.range = range; for (auto& it : locations) {
allOverlappingRequests.push_back(singleLocationOverlappingChangeFeeds(cx, it.second, range, minVersion));
}
wait(waitForAll(allOverlappingRequests));
OverlappingChangeFeedsReply rep = wait(loadBalance(cx.getPtr(), std::vector<std::pair<Key, KeyRange>> result;
locations[0].second, for (auto& it : allOverlappingRequests) {
&StorageServerInterface::overlappingChangeFeeds, result.insert(result.end(), it.get().begin(), it.get().end());
req, }
TaskPriority::DefaultPromiseEndpoint, std::sort(result.begin(), result.end(), compareChangeFeedResult);
AtMostOnce::False, result.resize(std::unique(result.begin(), result.end()) - result.begin());
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr)); return result;
return rep.rangeIds;
} }
Future<std::vector<std::pair<Key, KeyRange>>> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range, Future<std::vector<std::pair<Key, KeyRange>>> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range,
@ -6748,20 +6898,27 @@ ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, St
state vector<pair<KeyRange, Reference<LocationInfo>>> locations = state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx, wait(getKeyRangeLocations(cx,
keys, keys,
100, 1000,
Reverse::False, Reverse::False,
&StorageServerInterface::changeFeed, &StorageServerInterface::changeFeed,
TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
if (locations.size() > 1) { if (locations.size() >= 1000) {
throw unsupported_operation(); throw unsupported_operation();
} }
state std::vector<StorageServerInterface> allInterfs;
for (auto& it : locations) {
for (int i = 0; i < it.second->size(); i++) {
allInterfs.push_back(it.second->getInterface(i));
}
}
uniquify(allInterfs);
// FIXME: lookup both the src and dest shards as of the pop version to ensure all locations are popped // FIXME: lookup both the src and dest shards as of the pop version to ensure all locations are popped
state std::vector<Future<Void>> popRequests; state std::vector<Future<Void>> popRequests;
for (int i = 0; i < locations[0].second->size(); i++) { for (int i = 0; i < allInterfs.size(); i++) {
popRequests.push_back( popRequests.push_back(allInterfs[i].changeFeedPop.getReply(ChangeFeedPopRequest(rangeID, version)));
locations[0].second->getInterface(i).changeFeedPop.getReply(ChangeFeedPopRequest(rangeID, version)));
} }
wait(waitForAll(popRequests)); wait(waitForAll(popRequests));
return Void(); return Void();

View File

@ -709,12 +709,13 @@ struct ChangeFeedStreamRequest {
Key rangeID; Key rangeID;
Version begin = 0; Version begin = 0;
Version end = 0; Version end = 0;
KeyRange range;
ReplyPromiseStream<ChangeFeedStreamReply> reply; ReplyPromiseStream<ChangeFeedStreamReply> reply;
ChangeFeedStreamRequest() {} ChangeFeedStreamRequest() {}
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, rangeID, begin, end, reply, spanContext, arena); serializer(ar, rangeID, begin, end, range, reply, spanContext, arena);
} }
}; };
@ -751,6 +752,7 @@ struct OverlappingChangeFeedsReply {
struct OverlappingChangeFeedsRequest { struct OverlappingChangeFeedsRequest {
constexpr static FileIdentifier file_identifier = 10726174; constexpr static FileIdentifier file_identifier = 10726174;
KeyRange range; KeyRange range;
Version minVersion;
ReplyPromise<OverlappingChangeFeedsReply> reply; ReplyPromise<OverlappingChangeFeedsReply> reply;
OverlappingChangeFeedsRequest() {} OverlappingChangeFeedsRequest() {}
@ -758,7 +760,7 @@ struct OverlappingChangeFeedsRequest {
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, range, reply); serializer(ar, range, minVersion, reply);
} }
}; };

View File

@ -1538,6 +1538,7 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChangeFeedsRequest req) { ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChangeFeedsRequest req) {
wait(delay(0)); wait(delay(0));
wait(data->version.whenAtLeast(req.minVersion));
auto ranges = data->keyChangeFeed.intersectingRanges(req.range); auto ranges = data->keyChangeFeed.intersectingRanges(req.range);
std::map<Key, KeyRange> rangeIds; std::map<Key, KeyRange> rangeIds;
for (auto r : ranges) { for (auto r : ranges) {
@ -1671,8 +1672,6 @@ ACTOR Future<Void> changeFeedQ(StorageServer* data, ChangeFeedRequest req) {
} }
ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req) ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req)
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{ {
state Span span("SS:getChangeFeedStream"_loc, { req.spanContext }); state Span span("SS:getChangeFeedStream"_loc, { req.spanContext });
state Version begin = req.begin; state Version begin = req.begin;
@ -1680,6 +1679,11 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
wait(delay(0, TaskPriority::DefaultEndpoint)); wait(delay(0, TaskPriority::DefaultEndpoint));
state uint64_t changeCounter = data->shardChangeCounter;
if (!data->isReadable(req.range)) {
throw wrong_shard_server();
}
try { try {
loop { loop {
wait(req.reply.onReady()); wait(req.reply.onReady());
@ -1688,6 +1692,9 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
feedRequest.begin = begin; feedRequest.begin = begin;
feedRequest.end = req.end; feedRequest.end = req.end;
ChangeFeedReply feedReply = wait(getChangeFeedMutations(data, feedRequest)); ChangeFeedReply feedReply = wait(getChangeFeedMutations(data, feedRequest));
data->checkChangeCounter(changeCounter, req.range);
begin = feedReply.mutations.back().version + 1; begin = feedReply.mutations.back().version + 1;
req.reply.send(ChangeFeedStreamReply(feedReply)); req.reply.send(ChangeFeedStreamReply(feedReply));
if (feedReply.mutations.back().version == req.end - 1) { if (feedReply.mutations.back().version == req.end - 1) {