diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 67e45c7178..e022d2512f 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -4225,6 +4225,10 @@ ACTOR Future<Void> handleFlushGranuleReq(Reference<BlobWorkerData> self, FlushGr // force granule to flush at this version, and wait if (req.flushVersion > metadata->pendingDeltaVersion) { // first, wait for granule active + if (!metadata->activeCFData.get().isValid()) { + req.reply.sendError(wrong_shard_server()); + return Void(); + } // wait for change feed version to catch up to ensure we have all data if (metadata->activeCFData.get()->getVersion() < req.flushVersion) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 42403a1a36..cca3687c10 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2319,8 +2319,8 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang } // Make sure all of the metadata we are sending won't get rolled back - if (metadataWaitVersion != invalidVersion && metadataWaitVersion > data->knownCommittedVersion.get()) { - CODE_PROBE(true, "overlapping change feeds waiting for metadata version to be committed"); + if (metadataWaitVersion != invalidVersion && metadataWaitVersion > data->desiredOldestVersion.get()) { + CODE_PROBE(true, "overlapping change feeds waiting for metadata version to be safe from rollback"); wait(data->desiredOldestVersion.whenAtLeast(metadataWaitVersion)); } req.reply.send(reply); @@ -2521,6 +2521,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor state Version dequeVersion = data->version.get(); state Version dequeKnownCommit = data->knownCommittedVersion.get(); state Version emptyVersion = feedInfo->emptyVersion; + state Version durableValidationVersion = std::min(data->durableVersion.get(), feedInfo->durableFetchVersion.get()); Version fetchStorageVersion = std::max(feedInfo->fetchVersion, feedInfo->durableFetchVersion.get()); if (DEBUG_CF_TRACE) { @@ -2537,7 +2538,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor .detail("DurableVersion", feedInfo->durableVersion) .detail("FetchStorageVersion", fetchStorageVersion) .detail("FetchVersion", feedInfo->fetchVersion) - .detail("DurableFetchVersion", feedInfo->durableFetchVersion.get()); + .detail("DurableFetchVersion", feedInfo->durableFetchVersion.get()) + .detail("DurableValidationVersion", durableValidationVersion); } if (req.end > emptyVersion + 1) { @@ -2652,18 +2654,19 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor } } else if (memoryVerifyIdx < memoryReply.mutations.size() && version == memoryReply.mutations[memoryVerifyIdx].version) { - if (version > feedInfo->storageVersion && version > feedInfo->fetchVersion) { + if (version > durableValidationVersion) { // Another validation case - feed was popped, data was fetched, fetched data was persisted but pop // wasn't yet, then SS restarted. Now SS has the data without the popped version. This looks wrong // here but is fine. memoryVerifyIdx++; } else { - fmt::print( - "ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on disk!\n", - data->thisServerID.toString().substr(0, 4), - req.rangeID.printable().substr(0, 6), - streamUID.toString().substr(0, 8), - version); + fmt::print("ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on " + "disk! (durable validation = {4})\n", + data->thisServerID.toString().substr(0, 4), + req.rangeID.printable().substr(0, 6), + streamUID.toString().substr(0, 8), + version, + durableValidationVersion); fmt::print(" Memory: ({})\n", memoryReply.mutations[memoryVerifyIdx].mutations.size()); for (auto& it : memoryReply.mutations[memoryVerifyIdx].mutations) { @@ -2681,7 +2684,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str()); } } - ASSERT(false); + ASSERT_WE_THINK(false); } } remainingDurableBytes -=