Handled disconnects explicitly in CF streams

This commit is contained in:
Josh Slocum 2021-12-21 10:38:04 -06:00
parent abdaf5c9cd
commit d337e8fbe8
2 changed files with 117 additions and 94 deletions

View File

@ -7230,13 +7230,13 @@ Future<Void> ChangeFeedData::whenAtLeast(Version version) {
return changeFeedWhenAtLatest(Reference<ChangeFeedData>::addRef(this), version);
}
ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
PromiseStream<Standalone<MutationsAndVersionRef>> results,
ReplyPromiseStream<ChangeFeedStreamReply> replyStream,
Version end,
Reference<ChangeFeedData> feedData,
Reference<ChangeFeedStorageData> storageData,
int idx /* TODO REMOVE this param after correctness clean */) {
ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
PromiseStream<Standalone<MutationsAndVersionRef>> results,
ReplyPromiseStream<ChangeFeedStreamReply> replyStream,
Version end,
Reference<ChangeFeedData> feedData,
Reference<ChangeFeedStorageData> storageData,
int idx /* TODO REMOVE this param after correctness clean */) {
state bool atLatestVersion = false;
state Version nextVersion = 0;
try {
@ -7376,7 +7376,7 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
refresh.send(Void());
for (int i = 0; i < interfs.size(); i++) {
fetchers[i] = singleChangeFeedStream(
fetchers[i] = partialChangeFeedStream(
interfs[i].first, streams[i].results, results->streams[i], end, results, results->storageData[i], i);
}
state int interfNum = 0;
@ -7496,6 +7496,97 @@ ACTOR Future<KeyRange> getChangeFeedRange(Reference<DatabaseContext> db, Databas
}
}
}
// TODO better name
ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
StorageServerInterface interf,
KeyRange range,
Reference<ChangeFeedData> results,
Key rangeID,
Version* begin,
Version end) {
state Database cx(db);
state ChangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = *begin;
req.end = end;
req.range = range;
results->streams.clear();
for (auto& it : results->storageData) {
if (it->debugGetReferenceCount() == 2) {
db->changeFeedUpdaters.erase(it->id);
}
}
results->streams.push_back(interf.changeFeedStream.getReplyStream(req));
results->maxSeenVersion = invalidVersion;
results->storageData.clear();
results->storageData.push_back(db->getStorageData(interf));
Promise<Void> refresh = results->refresh;
results->refresh = Promise<Void>();
results->notAtLatest.set(1);
refresh.send(Void());
state bool atLatest = false;
loop {
wait(results->mutations.onEmpty());
state ChangeFeedStreamReply feedReply = waitNext(results->streams[0].getFuture());
*begin = feedReply.mutations.back().version + 1;
// TODO REMOVE, for debugging
if (feedReply.mutations.back().version < results->lastReturnedVersion.get()) {
printf("out of order mutation for CF %s from (%d) %s! %lld < %lld\n",
rangeID.toString().substr(0, 6).c_str(),
results->storageData.size(),
results->storageData.empty() ? "????" : results->storageData[0]->id.toString().substr(0, 4).c_str(),
feedReply.mutations.back().version,
results->lastReturnedVersion.get());
}
ASSERT(feedReply.mutations.back().version >= results->lastReturnedVersion.get());
results->mutations.send(Standalone<VectorRef<MutationsAndVersionRef>>(feedReply.mutations, feedReply.arena));
// Because onEmpty returns here before the consuming process, we must do a delay(0)
wait(results->mutations.onEmpty());
wait(delay(0));
if (DEBUG_CF_VERSION(feedReply.mutations.back().version)) {
fmt::print("CFLR (single): {0} ({1}), atLatest={2}, rep.atLatest={3}, notAtLatest={4}, "
"minSV={5}\n",
feedReply.mutations.back().version,
feedReply.mutations.size(),
atLatest ? "T" : "F",
feedReply.atLatestVersion ? "T" : "F",
results->notAtLatest.get(),
feedReply.minStreamVersion);
}
if (feedReply.mutations.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(feedReply.mutations.back().version);
}
if (!atLatest && feedReply.atLatestVersion) {
atLatest = true;
results->notAtLatest.set(0);
}
if (feedReply.minStreamVersion > results->storageData[0]->version.get()) {
if (results->storageData[0]->debug) {
fmt::print("CFSD {0}: V={1} (CFLR)\n",
results->storageData[0]->id.toString().substr(0, 4),
results->storageData[0]->version.get());
}
results->storageData[0]->version.set(feedReply.minStreamVersion);
}
}
}
ACTOR Future<Void> anyCFDisconnect(std::vector<std::pair<StorageServerInterface, KeyRange>> interfs) {
state std::vector<Future<Void>> disconnectFutures;
disconnectFutures.reserve(interfs.size());
for (auto& it : interfs) {
disconnectFutures.push_back(
IFailureMonitor::failureMonitor().onDisconnectOrFailure(it.first.changeFeedStream.getEndpoint()));
}
wait(waitForAny(disconnectFutures));
return Void();
}
ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
Reference<ChangeFeedData> results,
@ -7571,86 +7662,18 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
interfs.push_back(std::make_pair(locations[i].second->getInterface(chosenLocations[i]),
locations[i].first & range));
}
wait(mergeChangeFeedStream(db, interfs, results, rangeID, &begin, end) || cx->connectionFileChanged());
} else {
state ChangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = begin;
req.end = end;
req.range = range;
results->streams.clear();
StorageServerInterface interf = locations[0].second->getInterface(chosenLocations[0]);
for (auto& it : results->storageData) {
if (it->debugGetReferenceCount() == 2) {
db->changeFeedUpdaters.erase(it->id);
}
choose {
when(wait(mergeChangeFeedStream(db, interfs, results, rangeID, &begin, end))) {}
when(wait(cx->connectionFileChanged())) {}
when(wait(anyCFDisconnect(interfs))) {}
}
results->streams.push_back(interf.changeFeedStream.getReplyStream(req));
results->maxSeenVersion = invalidVersion;
results->storageData.clear();
results->storageData.push_back(db->getStorageData(interf));
Promise<Void> refresh = results->refresh;
results->refresh = Promise<Void>();
results->notAtLatest.set(1);
refresh.send(Void());
state bool atLatest = false;
loop {
wait(results->mutations.onEmpty());
choose {
when(wait(cx->connectionFileChanged())) { break; }
when(state ChangeFeedStreamReply feedReply = waitNext(results->streams[0].getFuture())) {
begin = feedReply.mutations.back().version + 1;
// TODO REMOVE, for debugging
if (feedReply.mutations.back().version < results->lastReturnedVersion.get()) {
printf("out of order mutation for CF %s from (%d) %s! %lld < %lld\n",
rangeID.toString().substr(0, 6).c_str(),
results->storageData.size(),
results->storageData.empty()
? "????"
: results->storageData[0]->id.toString().substr(0, 4).c_str(),
feedReply.mutations.back().version,
results->lastReturnedVersion.get());
}
ASSERT(feedReply.mutations.back().version >= results->lastReturnedVersion.get());
results->mutations.send(
Standalone<VectorRef<MutationsAndVersionRef>>(feedReply.mutations, feedReply.arena));
// Because onEmpty returns here before the consuming process, we must do a delay(0)
wait(results->mutations.onEmpty());
wait(delay(0));
if (DEBUG_CF_VERSION(feedReply.mutations.back().version)) {
fmt::print("CFLR (single): {0} ({1}), atLatest={2}, rep.atLatest={3}, notAtLatest={4}, "
"minSV={5}\n",
feedReply.mutations.back().version,
feedReply.mutations.size(),
atLatest ? "T" : "F",
feedReply.atLatestVersion ? "T" : "F",
results->notAtLatest.get(),
feedReply.minStreamVersion);
}
if (feedReply.mutations.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(feedReply.mutations.back().version);
}
if (!atLatest && feedReply.atLatestVersion) {
atLatest = true;
results->notAtLatest.set(0);
}
if (feedReply.minStreamVersion > results->storageData[0]->version.get()) {
if (results->storageData[0]->debug) {
fmt::print("CFSD {0}: V={1} (CFLR)\n",
results->storageData[0]->id.toString().substr(0, 4),
results->storageData[0]->version.get());
}
results->storageData[0]->version.set(feedReply.minStreamVersion);
}
}
}
} else {
StorageServerInterface interf = locations[0].second->getInterface(chosenLocations[0]);
choose {
when(wait(singleChangeFeedStream(db, interf, range, results, rangeID, &begin, end))) {}
when(wait(cx->connectionFileChanged())) {}
when(wait(IFailureMonitor::failureMonitor().onDisconnectOrFailure(
interf.changeFeedStream.getEndpoint()))) {}
}
}
} catch (Error& e) {

View File

@ -1930,13 +1930,13 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
}
ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, BlobGranuleFileRequest req) {
if (BW_DEBUG) {
printf("BW %s processing blobGranuleFileRequest for range [%s-%s) @ %lld\n",
bwData->id.toString().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.readVersion);
}
/*if (BW_DEBUG) {
printf("BW %s processing blobGranuleFileRequest for range [%s-%s) @ %lld\n",
bwData->id.toString().c_str(),
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str(),
req.readVersion);
}*/
try {
// TODO REMOVE in api V2