send merge cursor's buffered data if all sub-streams have something at this version

This commit is contained in:
Josh Slocum 2021-12-21 16:32:56 -06:00
parent b0aea91895
commit 738b72918a
2 changed files with 39 additions and 3 deletions

View File

@ -7360,12 +7360,16 @@ ACTOR Future<Void> doCFMerge(Reference<ChangeFeedData> results,
}
interfNum++;
}
state int atCheckVersion = 0;
state Version checkVersion = invalidVersion;
state Standalone<VectorRef<MutationsAndVersionRef>> nextOut;
while (mutations.size()) {
state MutationAndVersionStream nextStream = mutations.top();
mutations.pop();
ASSERT(nextStream.next.version >= checkVersion);
if (nextStream.next.version == checkVersion) {
ASSERT(atCheckVersion > 0);
}
if (nextStream.next.version != checkVersion) {
if (nextOut.size()) {
*begin = checkVersion + 1;
@ -7394,6 +7398,7 @@ ACTOR Future<Void> doCFMerge(Reference<ChangeFeedData> results,
nextOut = Standalone<VectorRef<MutationsAndVersionRef>>();
}
checkVersion = nextStream.next.version;
atCheckVersion = 0;
}
if (nextOut.size() && nextStream.next.version == nextOut.back().version) {
if (nextStream.next.mutations.size() &&
@ -7404,9 +7409,41 @@ ACTOR Future<Void> doCFMerge(Reference<ChangeFeedData> results,
} else {
nextOut.push_back_deep(nextOut.arena(), nextStream.next);
}
atCheckVersion++;
// TODO AVOID CODE DUPLICATION
// If all streams have returned something at this version, we know it is complete.
if (atCheckVersion == mutations.size() + 1) {
ASSERT(nextOut.size() == 1);
*begin = checkVersion + 1;
if (DEBUG_CF_VERSION(nextOut.back().version)) {
fmt::print("CFNA (merged@all): {0} (1)\n", nextOut.back().version);
}
if (nextOut.back().version < results->lastReturnedVersion.get()) {
printf("ERROR: merge cursor@all pushing next out <= lastReturnedVersion");
}
// We can get an empty version pushed through the stream if whenAtLeast is called. Ignore
// it
if (!nextOut.back().mutations.empty()) {
ASSERT(nextOut.back().version >= results->lastReturnedVersion.get());
results->mutations.send(nextOut);
wait(results->mutations.onEmpty());
wait(delay(0));
}
if (DEBUG_CF_VERSION(nextOut.back().version)) {
fmt::print("CFLR (merged@all): {0} (1)\n", nextOut.back().version);
}
if (nextOut.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(nextOut.back().version);
}
nextOut = Standalone<VectorRef<MutationsAndVersionRef>>();
atCheckVersion = 0;
}
try {
Standalone<MutationsAndVersionRef> res = waitNext(nextStream.results.getFuture());
if (DEBUG_CF_VERSION(nextOut.back().version)) {
if (DEBUG_CF_VERSION(res.version)) {
fmt::print(" CFNA (merge1): {0} (1)\n", res.version, res.mutations.size());
}
nextStream.next = res;

View File

@ -716,6 +716,7 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
// files might not be the current set of files in metadata, in the case of doing the initial snapshot of a granule that
// was split.
// FIXME: only pass metadata->keyRange
ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
Reference<GranuleMetadata> metadata,
UID granuleID,
@ -728,8 +729,6 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
metadata->keyRange.end.printable().c_str());
}
// FIXME: don't use metadata->files
ASSERT(!files.snapshotFiles.empty());
ASSERT(!files.deltaFiles.empty());