From 917c7acca54ff4b6667a04f0dc421225eb939b0d Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Mon, 21 Mar 2022 22:21:40 -0500 Subject: [PATCH] Using desiredOldest for when change feed metadata is safe --- fdbserver/BlobWorker.actor.cpp | 7 ++++--- fdbserver/storageserver.actor.cpp | 14 +++++++------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 1e3e4232cf..07000fb4e7 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -1261,7 +1261,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, metadata->keyRange.end.printable(), metadata->originalEpoch, metadata->originalSeqno); - fmt::print(" CFID: {}\n", startState.granuleID.toString()); + fmt::print(" CFID: {} ({})\n", startState.granuleID.toString(), cfKey.printable()); fmt::print(" CF Start Version: {}\n", startState.changeFeedStartVersion); fmt::print(" Previous Durable Version: {}\n", startState.previousDurableVersion); fmt::print(" doSnapshot={}\n", startState.doSnapshot ? "T" : "F"); @@ -2427,6 +2427,7 @@ ACTOR Future openGranule(Reference bwData, As ASSERT(req.type != AssignRequestType::Continue); state Transaction tr(bwData->db); state Key lockKey = blobGranuleLockKeyFor(req.keyRange); + state UID newGranuleID = deterministicRandom()->randomUniqueID(); if (BW_DEBUG) { fmt::print("{0} [{1} - {2}) opening\n", @@ -2491,8 +2492,8 @@ ACTOR Future openGranule(Reference bwData, As info.granuleID = info.history.get().value.granuleID; } else { // FIXME: could avoid max uid for granule ids here - // if this granule is not derived from a split or merge, create the granule id here - info.granuleID = deterministicRandom()->randomUniqueID(); + // if this granule is not derived from a split or merge, use new granule id + info.granuleID = newGranuleID; } wait(updateChangeFeed( &tr, granuleIDToCFKey(info.granuleID), ChangeFeedStatus::CHANGE_FEED_CREATE, req.keyRange)); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 20f337dd5e..c06dfb8951 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1943,7 +1943,7 @@ ACTOR Future overlappingChangeFeedsQ(StorageServer* data, OverlappingChang return Void(); } - Version knownCommittedRequired = invalidVersion; + Version metadataVersion = invalidVersion; auto ranges = data->keyChangeFeed.intersectingRanges(req.range); std::map> rangeIds; @@ -1952,12 +1952,12 @@ ACTOR Future overlappingChangeFeedsQ(StorageServer* data, OverlappingChang // Can't tell other SS about a change feed create or stopVersion that may get rolled back, and we only need // to tell it about the metadata if req.minVersion > metadataVersion, since it will get the information from // its own private mutations if it hasn't processed up that version yet - knownCommittedRequired = std::max(knownCommittedRequired, it->metadataCreateVersion); + metadataVersion = std::max(metadataVersion, it->metadataCreateVersion); Version stopVersion; if (it->stopVersion != MAX_VERSION && req.minVersion > it->stopVersion) { stopVersion = it->stopVersion; - knownCommittedRequired = std::max(knownCommittedRequired, stopVersion); + metadataVersion = std::max(metadataVersion, stopVersion); } else { stopVersion = MAX_VERSION; } @@ -1971,10 +1971,10 @@ ACTOR Future overlappingChangeFeedsQ(StorageServer* data, OverlappingChang it.first, std::get<0>(it.second), std::get<1>(it.second), std::get<2>(it.second))); } - // Make sure all of the stop versions we are sending aren't going to get rolled back - if (knownCommittedRequired != invalidVersion && knownCommittedRequired > data->knownCommittedVersion.get()) { - TEST(true); // overlapping change feeds waiting for stop version to be committed - wait(data->knownCommittedVersion.whenAtLeast(knownCommittedRequired)); + // Make sure all of the metadata we are sending won't get rolled back + if (metadataVersion != invalidVersion && metadataVersion > data->knownCommittedVersion.get()) { + TEST(true); // overlapping change feeds waiting for metadata version to be committed + wait(data->desiredOldestVersion.whenAtLeast(metadataVersion)); } req.reply.send(reply); return Void();