mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 02:37:02 +08:00
Using desiredOldest for when change feed metadata is safe
This commit is contained in:
parent
7b3a65676b
commit
917c7acca5
@ -1261,7 +1261,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
||||
metadata->keyRange.end.printable(),
|
||||
metadata->originalEpoch,
|
||||
metadata->originalSeqno);
|
||||
fmt::print(" CFID: {}\n", startState.granuleID.toString());
|
||||
fmt::print(" CFID: {} ({})\n", startState.granuleID.toString(), cfKey.printable());
|
||||
fmt::print(" CF Start Version: {}\n", startState.changeFeedStartVersion);
|
||||
fmt::print(" Previous Durable Version: {}\n", startState.previousDurableVersion);
|
||||
fmt::print(" doSnapshot={}\n", startState.doSnapshot ? "T" : "F");
|
||||
@ -2427,6 +2427,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
||||
ASSERT(req.type != AssignRequestType::Continue);
|
||||
state Transaction tr(bwData->db);
|
||||
state Key lockKey = blobGranuleLockKeyFor(req.keyRange);
|
||||
state UID newGranuleID = deterministicRandom()->randomUniqueID();
|
||||
|
||||
if (BW_DEBUG) {
|
||||
fmt::print("{0} [{1} - {2}) opening\n",
|
||||
@ -2491,8 +2492,8 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
||||
info.granuleID = info.history.get().value.granuleID;
|
||||
} else {
|
||||
// FIXME: could avoid max uid for granule ids here
|
||||
// if this granule is not derived from a split or merge, create the granule id here
|
||||
info.granuleID = deterministicRandom()->randomUniqueID();
|
||||
// if this granule is not derived from a split or merge, use new granule id
|
||||
info.granuleID = newGranuleID;
|
||||
}
|
||||
wait(updateChangeFeed(
|
||||
&tr, granuleIDToCFKey(info.granuleID), ChangeFeedStatus::CHANGE_FEED_CREATE, req.keyRange));
|
||||
|
@ -1943,7 +1943,7 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
|
||||
return Void();
|
||||
}
|
||||
|
||||
Version knownCommittedRequired = invalidVersion;
|
||||
Version metadataVersion = invalidVersion;
|
||||
|
||||
auto ranges = data->keyChangeFeed.intersectingRanges(req.range);
|
||||
std::map<Key, std::tuple<KeyRange, Version, Version>> rangeIds;
|
||||
@ -1952,12 +1952,12 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
|
||||
// Can't tell other SS about a change feed create or stopVersion that may get rolled back, and we only need
|
||||
// to tell it about the metadata if req.minVersion > metadataVersion, since it will get the information from
|
||||
// its own private mutations if it hasn't processed up that version yet
|
||||
knownCommittedRequired = std::max(knownCommittedRequired, it->metadataCreateVersion);
|
||||
metadataVersion = std::max(metadataVersion, it->metadataCreateVersion);
|
||||
|
||||
Version stopVersion;
|
||||
if (it->stopVersion != MAX_VERSION && req.minVersion > it->stopVersion) {
|
||||
stopVersion = it->stopVersion;
|
||||
knownCommittedRequired = std::max(knownCommittedRequired, stopVersion);
|
||||
metadataVersion = std::max(metadataVersion, stopVersion);
|
||||
} else {
|
||||
stopVersion = MAX_VERSION;
|
||||
}
|
||||
@ -1971,10 +1971,10 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
|
||||
it.first, std::get<0>(it.second), std::get<1>(it.second), std::get<2>(it.second)));
|
||||
}
|
||||
|
||||
// Make sure all of the stop versions we are sending aren't going to get rolled back
|
||||
if (knownCommittedRequired != invalidVersion && knownCommittedRequired > data->knownCommittedVersion.get()) {
|
||||
TEST(true); // overlapping change feeds waiting for stop version to be committed
|
||||
wait(data->knownCommittedVersion.whenAtLeast(knownCommittedRequired));
|
||||
// Make sure all of the metadata we are sending won't get rolled back
|
||||
if (metadataVersion != invalidVersion && metadataVersion > data->knownCommittedVersion.get()) {
|
||||
TEST(true); // overlapping change feeds waiting for metadata version to be committed
|
||||
wait(data->desiredOldestVersion.whenAtLeast(metadataVersion));
|
||||
}
|
||||
req.reply.send(reply);
|
||||
return Void();
|
||||
|
Loading…
x
Reference in New Issue
Block a user