More cf bug fixes (#7789)

* Fixing change feed fetch and rollback race

* Fixing validation issue for change feed validation

* Fixing shutdown segfault in blob worker
This commit is contained in:
Josh Slocum 2022-08-04 15:57:42 -05:00 committed by GitHub
parent 84d483605b
commit d721d1b850
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 11 deletions

View File

@ -4225,6 +4225,10 @@ ACTOR Future<Void> handleFlushGranuleReq(Reference<BlobWorkerData> self, FlushGr
// force granule to flush at this version, and wait
if (req.flushVersion > metadata->pendingDeltaVersion) {
// first, wait for granule active
if (!metadata->activeCFData.get().isValid()) {
req.reply.sendError(wrong_shard_server());
return Void();
}
// wait for change feed version to catch up to ensure we have all data
if (metadata->activeCFData.get()->getVersion() < req.flushVersion) {

View File

@ -2319,8 +2319,8 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
}
// Make sure all of the metadata we are sending won't get rolled back
if (metadataWaitVersion != invalidVersion && metadataWaitVersion > data->knownCommittedVersion.get()) {
CODE_PROBE(true, "overlapping change feeds waiting for metadata version to be committed");
if (metadataWaitVersion != invalidVersion && metadataWaitVersion > data->desiredOldestVersion.get()) {
CODE_PROBE(true, "overlapping change feeds waiting for metadata version to be safe from rollback");
wait(data->desiredOldestVersion.whenAtLeast(metadataWaitVersion));
}
req.reply.send(reply);
@ -2521,6 +2521,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
state Version dequeVersion = data->version.get();
state Version dequeKnownCommit = data->knownCommittedVersion.get();
state Version emptyVersion = feedInfo->emptyVersion;
state Version durableValidationVersion = std::min(data->durableVersion.get(), feedInfo->durableFetchVersion.get());
Version fetchStorageVersion = std::max(feedInfo->fetchVersion, feedInfo->durableFetchVersion.get());
if (DEBUG_CF_TRACE) {
@ -2537,7 +2538,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
.detail("DurableVersion", feedInfo->durableVersion)
.detail("FetchStorageVersion", fetchStorageVersion)
.detail("FetchVersion", feedInfo->fetchVersion)
.detail("DurableFetchVersion", feedInfo->durableFetchVersion.get());
.detail("DurableFetchVersion", feedInfo->durableFetchVersion.get())
.detail("DurableValidationVersion", durableValidationVersion);
}
if (req.end > emptyVersion + 1) {
@ -2652,18 +2654,19 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
} else if (memoryVerifyIdx < memoryReply.mutations.size() &&
version == memoryReply.mutations[memoryVerifyIdx].version) {
if (version > feedInfo->storageVersion && version > feedInfo->fetchVersion) {
if (version > durableValidationVersion) {
// Another validation case - feed was popped, data was fetched, fetched data was persisted but pop
// wasn't yet, then SS restarted. Now SS has the data without the popped version. This looks wrong
// here but is fine.
memoryVerifyIdx++;
} else {
fmt::print(
"ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on disk!\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
streamUID.toString().substr(0, 8),
version);
fmt::print("ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on "
"disk! (durable validation = {4})\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
streamUID.toString().substr(0, 8),
version,
durableValidationVersion);
fmt::print(" Memory: ({})\n", memoryReply.mutations[memoryVerifyIdx].mutations.size());
for (auto& it : memoryReply.mutations[memoryVerifyIdx].mutations) {
@ -2681,7 +2684,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str());
}
}
ASSERT(false);
ASSERT_WE_THINK(false);
}
}
remainingDurableBytes -=