Adding explicit popVersion to change feed reply, like tlogs have

This commit is contained in:
Josh Slocum 2022-02-24 15:09:38 -06:00
parent 43543b0d0e
commit e5b4fb3d80
5 changed files with 41 additions and 0 deletions

View File

@ -200,6 +200,8 @@ struct ChangeFeedData : ReferenceCounted<ChangeFeedData> {
Promise<Void> refresh; Promise<Void> refresh;
Version maxSeenVersion; Version maxSeenVersion;
Version endVersion = invalidVersion; Version endVersion = invalidVersion;
Version popVersion =
invalidVersion; // like TLog pop version, set by SS and client can check it to see if they missed data
ChangeFeedData() : notAtLatest(1) {} ChangeFeedData() : notAtLatest(1) {}
}; };

View File

@ -7466,6 +7466,9 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
if (rep.mutations.back().version > feedData->maxSeenVersion) { if (rep.mutations.back().version > feedData->maxSeenVersion) {
feedData->maxSeenVersion = rep.mutations.back().version; feedData->maxSeenVersion = rep.mutations.back().version;
} }
if (rep.popVersion > feedData->popVersion) {
feedData->popVersion = rep.popVersion;
}
state int resultLoc = 0; state int resultLoc = 0;
while (resultLoc < rep.mutations.size()) { while (resultLoc < rep.mutations.size()) {
@ -7836,6 +7839,10 @@ ACTOR Future<Void> doSingleCFStream(KeyRange range,
state ChangeFeedStreamReply feedReply = waitNext(results->streams[0].getFuture()); state ChangeFeedStreamReply feedReply = waitNext(results->streams[0].getFuture());
*begin = feedReply.mutations.back().version + 1; *begin = feedReply.mutations.back().version + 1;
if (feedReply.popVersion > results->popVersion) {
results->popVersion = feedReply.popVersion;
}
// don't send completely empty set of mutations to promise stream // don't send completely empty set of mutations to promise stream
bool anyMutations = false; bool anyMutations = false;
for (auto& it : feedReply.mutations) { for (auto& it : feedReply.mutations) {

View File

@ -690,6 +690,7 @@ struct ChangeFeedStreamReply : public ReplyPromiseStreamReply {
VectorRef<MutationsAndVersionRef> mutations; VectorRef<MutationsAndVersionRef> mutations;
bool atLatestVersion = false; bool atLatestVersion = false;
Version minStreamVersion = invalidVersion; Version minStreamVersion = invalidVersion;
Version popVersion = invalidVersion;
ChangeFeedStreamReply() {} ChangeFeedStreamReply() {}
@ -703,6 +704,7 @@ struct ChangeFeedStreamReply : public ReplyPromiseStreamReply {
mutations, mutations,
atLatestVersion, atLatestVersion,
minStreamVersion, minStreamVersion,
popVersion,
arena); arena);
} }
}; };

View File

@ -1473,6 +1473,9 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
mutations.front().version); mutations.front().version);
} }
ASSERT(mutations.front().version > metadata->waitForVersionReturned); ASSERT(mutations.front().version > metadata->waitForVersionReturned);
// If this assert trips we should have gotten change_feed_popped from SS and didn't
ASSERT(mutations.front().version >= metadata->activeCFData.get()->popVersion);
} }
when(wait(inFlightFiles.empty() ? Never() : success(inFlightFiles.front().future))) { when(wait(inFlightFiles.empty() ? Never() : success(inFlightFiles.front().future))) {
// TODO REMOVE // TODO REMOVE

View File

@ -2229,6 +2229,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
} }
} }
reply.popVersion = feedInfo->emptyVersion + 1;
// If the SS's version advanced at all during any of the waits, the read from memory may have missed some // If the SS's version advanced at all during any of the waits, the read from memory may have missed some
// mutations, so gotAll can only be true if data->version didn't change over the course of this actor // mutations, so gotAll can only be true if data->version didn't change over the course of this actor
return std::make_pair(reply, gotAll); return std::make_pair(reply, gotAll);
@ -4529,6 +4531,19 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
state int remoteLoc = 0; state int remoteLoc = 0;
while (remoteLoc < remoteResult.size()) { while (remoteLoc < remoteResult.size()) {
if (feedResults->popVersion - 1 > changeFeedInfo->emptyVersion) {
TEST(true); // CF fetched updated popped version from src SS
changeFeedInfo->emptyVersion = feedResults->popVersion - 1;
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(),
changeFeedSSValue(changeFeedInfo->range,
changeFeedInfo->emptyVersion + 1,
changeFeedInfo->stopVersion)));
}
Version localVersion = localResult.version; Version localVersion = localResult.version;
Version remoteVersion = remoteResult[remoteLoc].version; Version remoteVersion = remoteResult[remoteLoc].version;
if (remoteVersion <= localVersion) { if (remoteVersion <= localVersion) {
@ -4609,6 +4624,18 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
} }
} }
if (feedResults->popVersion - 1 > changeFeedInfo->emptyVersion) {
TEST(true); // CF fetched updated popped version from src SS at end
changeFeedInfo->emptyVersion = feedResults->popVersion - 1;
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(),
changeFeedSSValue(
changeFeedInfo->range, changeFeedInfo->emptyVersion + 1, changeFeedInfo->stopVersion)));
}
// if we were popped or removed while fetching but it didn't pass the fetch version while writing, clean up here // if we were popped or removed while fetching but it didn't pass the fetch version while writing, clean up here
if (versionsFetched > 0 && startVersion < changeFeedInfo->emptyVersion) { if (versionsFetched > 0 && startVersion < changeFeedInfo->emptyVersion) {
ASSERT(firstVersion != invalidVersion); ASSERT(firstVersion != invalidVersion);