fix: when a feed transitions from atLatest to not atLatest stall all updates at the blocked version

This commit is contained in:
Evan Tschannen 2021-11-17 22:27:43 -08:00 committed by Jingyu Zhou
parent 10a925b7e9
commit e97d337ffe

View File

@ -1871,6 +1871,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
state bool atLatest = false;
state UID streamUID = deterministicRandom()->randomUniqueID();
state bool removeUID = false;
state Optional<Version> blockedVersion;
req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);
wait(delay(0, TaskPriority::DefaultEndpoint));
@ -1880,14 +1881,14 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
Future<Void> onReady = req.reply.onReady();
if (atLatest && !onReady.isReady()) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][streamUID] =
data->version.get();
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;
removeUID = true;
}
wait(onReady);
state Future<ChangeFeedStreamReply> feedReplyFuture = getChangeFeedMutations(data, req, false);
if (atLatest && !removeUID && !feedReplyFuture.isReady()) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][streamUID] =
data->prevVersion;
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;
removeUID = true;
}
ChangeFeedStreamReply _feedReply = wait(feedReplyFuture);
@ -1908,13 +1909,15 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
minVersion = std::min(minVersion, it.second);
}
feedReply.atLatestVersion = atLatest;
feedReply.minStreamVersion = minVersion;
feedReply.minStreamVersion =
feedReply.mutations.back().mutations.empty() ? minVersion : feedReply.mutations.back().version;
req.reply.send(feedReply);
if (feedReply.mutations.back().version == req.end - 1) {
req.reply.sendError(end_of_stream());
return Void();
}
if (feedReply.mutations.back().mutations.empty()) {
blockedVersion = Optional<Version>();
auto feed = data->uidChangeFeed.find(req.rangeID);
if (feed == data->uidChangeFeed.end() || feed->second->removing) {
req.reply.sendError(unknown_change_feed());
@ -1928,6 +1931,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
req.reply.sendError(unknown_change_feed());
return Void();
}
} else {
blockedVersion = feedReply.mutations.back().version;
}
}
} catch (Error& e) {