fixed empty version merge logic

This commit is contained in:
Evan Tschannen 2021-11-18 22:02:11 -08:00 committed by Jingyu Zhou
parent 6d9f134bf0
commit 9df287ea06

View File

@ -6891,11 +6891,10 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
}
choose {
when(state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) {
state int resultLoc = 0;
// FIXME: handle empty versions properly
while (resultLoc < rep.mutations.size()) {
wait(results.onEmpty());
ASSERT(rep.mutations[resultLoc].version >= nextVersion);
results.send(rep.mutations[resultLoc]);
resultLoc++;
}
@ -6915,13 +6914,16 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
}
}
}
when(wait(atLatestVersion ? storageData->version.whenAtLeast(nextVersion) : Future<Void>(Never()))) {
wait(results.onEmpty());
when(wait(atLatestVersion && replyStream.isEmpty() && results.isEmpty()
? storageData->version.whenAtLeast(nextVersion)
: Future<Void>(Never()))) {
MutationsAndVersionRef empty;
empty.version = std::min(storageData->version.get(), end);
empty.version = storageData->version.get();
results.send(empty);
nextVersion = storageData->version.get() + 1;
}
when(wait(atLatestVersion && replyStream.isEmpty() && !results.isEmpty() ? results.onEmpty()
: Future<Void>(Never()))) {}
}
}
} catch (Error& e) {