properly handle an active change feed stream when removed

This commit is contained in:
Evan Tschannen 2021-11-17 21:40:04 -08:00 committed by Jingyu Zhou
parent 11d5cae515
commit f3bb1d8f51

View File

@ -345,6 +345,7 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
Key id;
AsyncTrigger newMutations;
bool stopped = false; // A stopped change feed no longer adds new mutations, but is still queriable
bool removing = false;
};
class ServerWatchMetadata : public ReferenceCounted<ServerWatchMetadata> {
@ -1915,13 +1916,18 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
}
if (feedReply.mutations.back().mutations.empty()) {
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed == data->uidChangeFeed.end()) {
if (feed == data->uidChangeFeed.end() || feed->second->removing) {
req.reply.sendError(unknown_change_feed());
return Void();
}
wait(feed->second->newMutations
.onTrigger()); // FIXME: check that this is triggered when the range is moved to a different
// server, also check that the stream is closed
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed == data->uidChangeFeed.end() || feed->second->removing) {
req.reply.sendError(unknown_change_feed());
return Void();
}
}
}
} catch (Error& e) {
@ -4615,7 +4621,12 @@ void changeServerKeys(StorageServer* data,
}
}
}
data->uidChangeFeed.erase(f.first);
auto feed = data->uidChangeFeed.find(f.first);
if (feed != data->uidChangeFeed.end()) {
feed->second->removing = true;
feed->second->newMutations.trigger();
data->uidChangeFeed.erase(feed);
}
}
}
}