Merge pull request #7639 from sfc-gh-jslocum/cf_metadata_rewrite

Change Feed Metadata Rewrite and adding targeted fault injection
This commit is contained in:
Josh Slocum 2022-07-26 18:10:37 -05:00 committed by GitHub
commit 77956dc7ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 395 additions and 221 deletions

View File

@ -9409,11 +9409,20 @@ Future<Void> DatabaseContext::getChangeFeedStream(Reference<ChangeFeedData> resu
Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range, replyBufferSize, canReadPopped); Reference<DatabaseContext>::addRef(this), results, rangeID, begin, end, range, replyBufferSize, canReadPopped);
} }
ACTOR Future<std::vector<OverlappingChangeFeedEntry>> singleLocationOverlappingChangeFeeds( Version OverlappingChangeFeedsInfo::getFeedMetadataVersion(const KeyRangeRef& range) const {
Database cx, Version v = invalidVersion;
Reference<LocationInfo> location, for (auto& it : feedMetadataVersions) {
KeyRangeRef range, if (it.second > v && it.first.intersects(range)) {
Version minVersion) { v = it.second;
}
}
return v;
}
ACTOR Future<OverlappingChangeFeedsReply> singleLocationOverlappingChangeFeeds(Database cx,
Reference<LocationInfo> location,
KeyRangeRef range,
Version minVersion) {
state OverlappingChangeFeedsRequest req; state OverlappingChangeFeedsRequest req;
req.range = range; req.range = range;
req.minVersion = minVersion; req.minVersion = minVersion;
@ -9425,16 +9434,16 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> singleLocationOverlappingC
TaskPriority::DefaultPromiseEndpoint, TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::False, AtMostOnce::False,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr)); cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr));
return rep.rangeIds; return rep;
} }
bool compareChangeFeedResult(const OverlappingChangeFeedEntry& i, const OverlappingChangeFeedEntry& j) { bool compareChangeFeedResult(const OverlappingChangeFeedEntry& i, const OverlappingChangeFeedEntry& j) {
return i.rangeId < j.rangeId; return i.feedId < j.feedId;
} }
ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsActor(Reference<DatabaseContext> db, ACTOR Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeedsActor(Reference<DatabaseContext> db,
KeyRangeRef range, KeyRangeRef range,
Version minVersion) { Version minVersion) {
state Database cx(db); state Database cx(db);
state Span span("NAPI:GetOverlappingChangeFeeds"_loc); state Span span("NAPI:GetOverlappingChangeFeeds"_loc);
@ -9460,19 +9469,33 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsA
throw all_alternatives_failed(); throw all_alternatives_failed();
} }
state std::vector<Future<std::vector<OverlappingChangeFeedEntry>>> allOverlappingRequests; state std::vector<Future<OverlappingChangeFeedsReply>> allOverlappingRequests;
for (auto& it : locations) { for (auto& it : locations) {
allOverlappingRequests.push_back( allOverlappingRequests.push_back(
singleLocationOverlappingChangeFeeds(cx, it.locations, it.range & range, minVersion)); singleLocationOverlappingChangeFeeds(cx, it.locations, it.range & range, minVersion));
} }
wait(waitForAll(allOverlappingRequests)); wait(waitForAll(allOverlappingRequests));
std::vector<OverlappingChangeFeedEntry> result; OverlappingChangeFeedsInfo result;
for (auto& it : allOverlappingRequests) { std::unordered_map<KeyRef, OverlappingChangeFeedEntry> latestFeedMetadata;
result.insert(result.end(), it.get().begin(), it.get().end()); for (int i = 0; i < locations.size(); i++) {
result.arena.dependsOn(allOverlappingRequests[i].get().arena);
result.arena.dependsOn(locations[i].range.arena());
result.feedMetadataVersions.push_back(
{ locations[i].range, allOverlappingRequests[i].get().feedMetadataVersion });
for (auto& it : allOverlappingRequests[i].get().feeds) {
auto res = latestFeedMetadata.insert({ it.feedId, it });
if (!res.second) {
CODE_PROBE(true, "deduping fetched overlapping feed by higher metadata version");
if (res.first->second.feedMetadataVersion < it.feedMetadataVersion) {
res.first->second = it;
}
}
}
}
for (auto& it : latestFeedMetadata) {
result.feeds.push_back(result.arena, it.second);
} }
std::sort(result.begin(), result.end(), compareChangeFeedResult);
result.resize(std::unique(result.begin(), result.end()) - result.begin());
return result; return result;
} catch (Error& e) { } catch (Error& e) {
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
@ -9485,8 +9508,7 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsA
} }
} }
Future<std::vector<OverlappingChangeFeedEntry>> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range, Future<OverlappingChangeFeedsInfo> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range, Version minVersion) {
Version minVersion) {
return getOverlappingChangeFeedsActor(Reference<DatabaseContext>::addRef(this), range, minVersion); return getOverlappingChangeFeedsActor(Reference<DatabaseContext>::addRef(this), range, minVersion);
} }

View File

@ -207,6 +207,16 @@ struct KeyRangeLocationInfo {
: tenantEntry(tenantEntry), range(range), locations(locations) {} : tenantEntry(tenantEntry), range(range), locations(locations) {}
}; };
struct OverlappingChangeFeedsInfo {
Arena arena;
VectorRef<OverlappingChangeFeedEntry> feeds;
// would prefer to use key range map but it complicates copy/move constructors
std::vector<std::pair<KeyRangeRef, Version>> feedMetadataVersions;
// for a feed that wasn't present, returns the metadata version it would have been fetched at.
Version getFeedMetadataVersion(const KeyRangeRef& feedRange) const;
};
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable { class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
public: public:
static DatabaseContext* allocateOnForeignThread() { static DatabaseContext* allocateOnForeignThread() {
@ -361,7 +371,7 @@ public:
int replyBufferSize = -1, int replyBufferSize = -1,
bool canReadPopped = true); bool canReadPopped = true);
Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion); Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(Key rangeID, Version version); Future<Void> popChangeFeedMutations(Key rangeID, Version version);
Future<Key> purgeBlobGranules(KeyRange keyRange, Future<Key> purgeBlobGranules(KeyRange keyRange,

View File

@ -970,39 +970,51 @@ struct FetchCheckpointKeyValuesRequest {
}; };
struct OverlappingChangeFeedEntry { struct OverlappingChangeFeedEntry {
Key rangeId; KeyRef feedId;
KeyRange range; KeyRangeRef range;
Version emptyVersion; Version emptyVersion;
Version stopVersion; Version stopVersion;
Version feedMetadataVersion;
bool operator==(const OverlappingChangeFeedEntry& r) const { bool operator==(const OverlappingChangeFeedEntry& r) const {
return rangeId == r.rangeId && range == r.range && emptyVersion == r.emptyVersion && return feedId == r.feedId && range == r.range && emptyVersion == r.emptyVersion &&
stopVersion == r.stopVersion; stopVersion == r.stopVersion && feedMetadataVersion == r.feedMetadataVersion;
} }
OverlappingChangeFeedEntry() {} OverlappingChangeFeedEntry() {}
OverlappingChangeFeedEntry(Key const& rangeId, KeyRange const& range, Version emptyVersion, Version stopVersion) OverlappingChangeFeedEntry(KeyRef const& feedId,
: rangeId(rangeId), range(range), emptyVersion(emptyVersion), stopVersion(stopVersion) {} KeyRangeRef const& range,
Version emptyVersion,
Version stopVersion,
Version feedMetadataVersion)
: feedId(feedId), range(range), emptyVersion(emptyVersion), stopVersion(stopVersion),
feedMetadataVersion(feedMetadataVersion) {}
OverlappingChangeFeedEntry(Arena& arena, const OverlappingChangeFeedEntry& rhs)
: feedId(arena, rhs.feedId), range(arena, rhs.range), emptyVersion(rhs.emptyVersion),
stopVersion(rhs.stopVersion), feedMetadataVersion(rhs.feedMetadataVersion) {}
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, rangeId, range, emptyVersion, stopVersion); serializer(ar, feedId, range, emptyVersion, stopVersion, feedMetadataVersion);
} }
}; };
struct OverlappingChangeFeedsReply { struct OverlappingChangeFeedsReply {
constexpr static FileIdentifier file_identifier = 11815134; constexpr static FileIdentifier file_identifier = 11815134;
std::vector<OverlappingChangeFeedEntry> rangeIds; VectorRef<OverlappingChangeFeedEntry> feeds;
bool cached; bool cached;
Arena arena; Arena arena;
Version feedMetadataVersion;
OverlappingChangeFeedsReply() : cached(false) {} OverlappingChangeFeedsReply() : cached(false), feedMetadataVersion(invalidVersion) {}
explicit OverlappingChangeFeedsReply(std::vector<OverlappingChangeFeedEntry> const& rangeIds) explicit OverlappingChangeFeedsReply(VectorRef<OverlappingChangeFeedEntry> const& feeds,
: rangeIds(rangeIds), cached(false) {} Version feedMetadataVersion)
: feeds(feeds), cached(false), feedMetadataVersion(feedMetadataVersion) {}
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, rangeIds, arena); serializer(ar, feeds, arena, feedMetadataVersion);
} }
}; };

View File

@ -23,6 +23,7 @@
#include "flow/ProtocolVersion.h" #include "flow/ProtocolVersion.h"
#include <algorithm> #include <algorithm>
#include <string> #include <string>
#include <limits>
#pragma once #pragma once
#include "flow/flow.h" #include "flow/flow.h"
@ -469,6 +470,8 @@ public:
bool setDiffProtocol; // true if a process with a different protocol version has been started bool setDiffProtocol; // true if a process with a different protocol version has been started
bool allowStorageMigrationTypeChange = false; bool allowStorageMigrationTypeChange = false;
double injectTargetedSSRestartTime = std::numeric_limits<double>::max();
double injectSSDelayTime = std::numeric_limits<double>::max();
flowGlobalType global(int id) const final { return getCurrentProcess()->global(id); }; flowGlobalType global(int id) const final { return getCurrentProcess()->global(id); };
void setGlobal(size_t id, flowGlobalType v) final { getCurrentProcess()->setGlobal(id, v); }; void setGlobal(size_t id, flowGlobalType v) final { getCurrentProcess()->setGlobal(id, v); };

View File

@ -1650,7 +1650,9 @@ ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
state Key lockKey = blobGranuleLockKeyFor(parentRange); state Key lockKey = blobGranuleLockKeyFor(parentRange);
state Future<Optional<Value>> oldLockFuture = tr->get(lockKey); state Future<Optional<Value>> oldLockFuture = tr->get(lockKey);
wait(updateChangeFeed(tr, // This has to be
// TODO: fix this better! (privatize change feed key clear)
wait(updateChangeFeed(&tr->getTransaction(),
granuleIDToCFKey(parentGranuleIDs[parentIdx]), granuleIDToCFKey(parentGranuleIDs[parentIdx]),
ChangeFeedStatus::CHANGE_FEED_DESTROY, ChangeFeedStatus::CHANGE_FEED_DESTROY,
parentRange)); parentRange));

View File

@ -280,6 +280,13 @@ class TestConfig {
if (attrib == "blobGranulesEnabled") { if (attrib == "blobGranulesEnabled") {
blobGranulesEnabled = strcmp(value.c_str(), "true") == 0; blobGranulesEnabled = strcmp(value.c_str(), "true") == 0;
} }
if (attrib == "injectSSTargetedRestart") {
injectTargetedSSRestart = strcmp(value.c_str(), "true") == 0;
}
if (attrib == "injectSSDelay") {
injectSSDelay = strcmp(value.c_str(), "true") == 0;
}
} }
ifs.close(); ifs.close();
@ -327,6 +334,8 @@ public:
bool allowDefaultTenant = true; bool allowDefaultTenant = true;
bool allowDisablingTenants = true; bool allowDisablingTenants = true;
bool injectTargetedSSRestart = false;
bool injectSSDelay = false;
ConfigDBType getConfigDBType() const { return configDBType; } ConfigDBType getConfigDBType() const { return configDBType; }
@ -384,7 +393,9 @@ public:
.add("blobGranulesEnabled", &blobGranulesEnabled) .add("blobGranulesEnabled", &blobGranulesEnabled)
.add("allowDefaultTenant", &allowDefaultTenant) .add("allowDefaultTenant", &allowDefaultTenant)
.add("allowDisablingTenants", &allowDisablingTenants) .add("allowDisablingTenants", &allowDisablingTenants)
.add("randomlyRenameZoneId", &randomlyRenameZoneId); .add("randomlyRenameZoneId", &randomlyRenameZoneId)
.add("injectTargetedSSRestart", &injectTargetedSSRestart)
.add("injectSSDelay", &injectSSDelay);
try { try {
auto file = toml::parse(testFile); auto file = toml::parse(testFile);
if (file.contains("configuration") && toml::find(file, "configuration").is_table()) { if (file.contains("configuration") && toml::find(file, "configuration").is_table()) {
@ -2364,6 +2375,13 @@ ACTOR void setupAndRun(std::string dataFolder,
testConfig.readFromConfig(testFile); testConfig.readFromConfig(testFile);
g_simulator.hasDiffProtocolProcess = testConfig.startIncompatibleProcess; g_simulator.hasDiffProtocolProcess = testConfig.startIncompatibleProcess;
g_simulator.setDiffProtocol = false; g_simulator.setDiffProtocol = false;
if (testConfig.injectTargetedSSRestart && deterministicRandom()->random01() < 0.25) {
g_simulator.injectTargetedSSRestartTime = 60.0 + 340.0 * deterministicRandom()->random01();
}
if (testConfig.injectSSDelay && deterministicRandom()->random01() < 0.25) {
g_simulator.injectSSDelayTime = 60.0 + 240.0 * deterministicRandom()->random01();
}
// Build simulator allow list // Build simulator allow list
allowList.addTrustedSubnet("0.0.0.0/2"sv); allowList.addTrustedSubnet("0.0.0.0/2"sv);

View File

@ -536,6 +536,9 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
Version storageVersion = invalidVersion; // The version between the storage version and the durable version are Version storageVersion = invalidVersion; // The version between the storage version and the durable version are
// being written to disk as part of the current commit in updateStorage. // being written to disk as part of the current commit in updateStorage.
Version durableVersion = invalidVersion; // All versions before the durable version are durable on disk Version durableVersion = invalidVersion; // All versions before the durable version are durable on disk
// FIXME: this needs to get persisted to disk to still fix same races across restart!
Version metadataVersion = invalidVersion; // Last update to the change feed metadata. Used for reasoning about
// fetched metadata vs local metadata
Version emptyVersion = 0; // The change feed does not have any mutations before emptyVersion Version emptyVersion = 0; // The change feed does not have any mutations before emptyVersion
KeyRange range; KeyRange range;
Key id; Key id;
@ -551,8 +554,6 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
bool removing = false; bool removing = false;
bool destroyed = false; bool destroyed = false;
bool possiblyDestroyed = false;
bool refreshInProgress = false;
KeyRangeMap<std::unordered_map<UID, Promise<Void>>> moveTriggers; KeyRangeMap<std::unordered_map<UID, Promise<Void>>> moveTriggers;
@ -587,12 +588,21 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
} }
void destroy(Version destroyVersion) { void destroy(Version destroyVersion) {
updateMetadataVersion(destroyVersion);
removing = true; removing = true;
destroyed = true; destroyed = true;
refreshInProgress = false;
moved(range); moved(range);
newMutations.trigger(); newMutations.trigger();
} }
bool updateMetadataVersion(Version version) {
// don't update metadata version if removing, so that metadata version remains the moved away version
if (!removing && version > metadataVersion) {
metadataVersion = version;
return true;
}
return false;
}
}; };
class ServerWatchMetadata : public ReferenceCounted<ServerWatchMetadata> { class ServerWatchMetadata : public ReferenceCounted<ServerWatchMetadata> {
@ -895,7 +905,7 @@ public:
KeyRangeMap<std::vector<Reference<ChangeFeedInfo>>> keyChangeFeed; KeyRangeMap<std::vector<Reference<ChangeFeedInfo>>> keyChangeFeed;
std::map<Key, Reference<ChangeFeedInfo>> uidChangeFeed; std::map<Key, Reference<ChangeFeedInfo>> uidChangeFeed;
Deque<std::pair<std::vector<Key>, Version>> changeFeedVersions; Deque<std::pair<std::vector<Key>, Version>> changeFeedVersions;
std::map<UID, PromiseStream<Key>> changeFeedRemovals; std::map<UID, PromiseStream<Key>> changeFeedDestroys;
std::set<Key> currentChangeFeeds; std::set<Key> currentChangeFeeds;
std::set<Key> fetchingChangeFeeds; std::set<Key> fetchingChangeFeeds;
std::unordered_map<NetworkAddress, std::map<UID, Version>> changeFeedClientVersions; std::unordered_map<NetworkAddress, std::map<UID, Version>> changeFeedClientVersions;
@ -1400,6 +1410,28 @@ public:
req.reply.sendError(e); req.reply.sendError(e);
} }
} }
void maybeInjectTargetedRestart(Version v) {
// inject an SS restart at most once per test
if (g_network->isSimulated() && !g_simulator.speedUpSimulation &&
now() > g_simulator.injectTargetedSSRestartTime &&
rebootAfterDurableVersion == std::numeric_limits<Version>::max()) {
CODE_PROBE(true, "Injecting SS targeted restart");
TraceEvent("SimSSInjectTargetedRestart", thisServerID).detail("Version", v);
rebootAfterDurableVersion = v;
g_simulator.injectTargetedSSRestartTime = std::numeric_limits<double>::max();
}
}
bool maybeInjectDelay() {
if (g_network->isSimulated() && !g_simulator.speedUpSimulation && now() > g_simulator.injectSSDelayTime) {
CODE_PROBE(true, "Injecting SS targeted delay");
TraceEvent("SimSSInjectDelay", thisServerID);
g_simulator.injectSSDelayTime = std::numeric_limits<double>::max();
return true;
}
return false;
}
}; };
const StringRef StorageServer::CurrentRunningFetchKeys::emptyString = LiteralStringRef(""); const StringRef StorageServer::CurrentRunningFetchKeys::emptyString = LiteralStringRef("");
@ -2212,46 +2244,54 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
return Void(); return Void();
} }
Version metadataVersion = invalidVersion; Version metadataWaitVersion = invalidVersion;
auto ranges = data->keyChangeFeed.intersectingRanges(req.range); auto ranges = data->keyChangeFeed.intersectingRanges(req.range);
std::map<Key, std::tuple<KeyRange, Version, Version>> rangeIds; std::map<Key, std::tuple<KeyRange, Version, Version, Version>> rangeIds;
for (auto r : ranges) { for (auto r : ranges) {
for (auto& it : r.value()) { for (auto& it : r.value()) {
if (!it->removing) { if (!it->removing) {
// Can't tell other SS about a change feed create or stopVersion that may get rolled back, and we only // Can't tell other SS about a change feed create or stopVersion that may get rolled back, and we only
// need to tell it about the metadata if req.minVersion > metadataVersion, since it will get the // need to tell it about the metadata if req.minVersion > metadataVersion, since it will get the
// information from its own private mutations if it hasn't processed up that version yet // information from its own private mutations if it hasn't processed up that version yet
metadataVersion = std::max(metadataVersion, it->metadataCreateVersion); metadataWaitVersion = std::max(metadataWaitVersion, it->metadataCreateVersion);
// don't wait for all it->metadataVersion updates, if metadata was fetched from elsewhere it's already
// durable, and some updates are unecessary to wait for
Version stopVersion; Version stopVersion;
if (it->stopVersion != MAX_VERSION && req.minVersion > it->stopVersion) { if (it->stopVersion != MAX_VERSION && req.minVersion > it->stopVersion) {
stopVersion = it->stopVersion; stopVersion = it->stopVersion;
metadataVersion = std::max(metadataVersion, stopVersion); metadataWaitVersion = std::max(metadataWaitVersion, stopVersion);
} else { } else {
stopVersion = MAX_VERSION; stopVersion = MAX_VERSION;
} }
rangeIds[it->id] = std::tuple(it->range, it->emptyVersion, stopVersion); rangeIds[it->id] = std::tuple(it->range, it->emptyVersion, stopVersion, it->metadataVersion);
} }
} }
} }
state OverlappingChangeFeedsReply reply; state OverlappingChangeFeedsReply reply;
reply.feedMetadataVersion = data->version.get();
for (auto& it : rangeIds) { for (auto& it : rangeIds) {
reply.rangeIds.push_back(OverlappingChangeFeedEntry( reply.feeds.push_back_deep(reply.arena,
it.first, std::get<0>(it.second), std::get<1>(it.second), std::get<2>(it.second))); OverlappingChangeFeedEntry(it.first,
std::get<0>(it.second),
std::get<1>(it.second),
std::get<2>(it.second),
std::get<3>(it.second)));
TraceEvent(SevDebug, "OverlappingChangeFeedEntry", data->thisServerID) TraceEvent(SevDebug, "OverlappingChangeFeedEntry", data->thisServerID)
.detail("MinVersion", req.minVersion) .detail("MinVersion", req.minVersion)
.detail("FeedID", it.first) .detail("FeedID", it.first)
.detail("Range", std::get<0>(it.second)) .detail("Range", std::get<0>(it.second))
.detail("EmptyVersion", std::get<1>(it.second)) .detail("EmptyVersion", std::get<1>(it.second))
.detail("StopVersion", std::get<2>(it.second)); .detail("StopVersion", std::get<2>(it.second))
.detail("FeedMetadataVersion", std::get<3>(it.second));
} }
// Make sure all of the metadata we are sending won't get rolled back // Make sure all of the metadata we are sending won't get rolled back
if (metadataVersion != invalidVersion && metadataVersion > data->knownCommittedVersion.get()) { if (metadataWaitVersion != invalidVersion && metadataWaitVersion > data->knownCommittedVersion.get()) {
CODE_PROBE(true, "overlapping change feeds waiting for metadata version to be committed"); CODE_PROBE(true, "overlapping change feeds waiting for metadata version to be committed");
wait(data->desiredOldestVersion.whenAtLeast(metadataVersion)); wait(data->desiredOldestVersion.whenAtLeast(metadataWaitVersion));
} }
req.reply.send(reply); req.reply.send(reply);
return Void(); return Void();
@ -2584,21 +2624,37 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
} }
} else if (memoryVerifyIdx < memoryReply.mutations.size() && } else if (memoryVerifyIdx < memoryReply.mutations.size() &&
version == memoryReply.mutations[memoryVerifyIdx].version) { version == memoryReply.mutations[memoryVerifyIdx].version) {
fmt::print("ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on disk!\n", if (version > feedInfo->storageVersion && version > feedInfo->fetchVersion) {
data->thisServerID.toString().substr(0, 4), // Another validation case - feed was popped, data was fetched, fetched data was persisted but pop
req.rangeID.printable().substr(0, 6), // wasn't yet, then SS restarted. Now SS has the data without the popped version. This looks wrong
streamUID.toString().substr(0, 8), // here but is fine.
version); memoryVerifyIdx++;
} else {
fmt::print(
"ERROR: SS {0} CF {1} SQ {2} has mutation at {3} in memory but all filtered out on disk!\n",
data->thisServerID.toString().substr(0, 4),
req.rangeID.printable().substr(0, 6),
streamUID.toString().substr(0, 8),
version);
fmt::print(" Memory: ({})\n", memoryReply.mutations[memoryVerifyIdx].mutations.size()); fmt::print(" Memory: ({})\n", memoryReply.mutations[memoryVerifyIdx].mutations.size());
for (auto& it : memoryReply.mutations[memoryVerifyIdx].mutations) { for (auto& it : memoryReply.mutations[memoryVerifyIdx].mutations) {
if (it.type == MutationRef::SetValue) { if (it.type == MutationRef::SetValue) {
fmt::print(" {}=\n", it.param1.printable().c_str()); fmt::print(" {}=\n", it.param1.printable().c_str());
} else { } else {
fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str()); fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str());
}
} }
fmt::print(" Disk(pre-filter): ({})\n", mutations.size());
for (auto& it : mutations) {
if (it.type == MutationRef::SetValue) {
fmt::print(" {}=\n", it.param1.printable().c_str());
} else {
fmt::print(" {} - {}\n", it.param1.printable().c_str(), it.param2.printable().c_str());
}
}
ASSERT(false);
} }
ASSERT(false);
} }
remainingDurableBytes -= remainingDurableBytes -=
sizeof(KeyValueRef) + sizeof(KeyValueRef) +
@ -5371,22 +5427,27 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
// We have to store the version the change feed was stopped at in the SS instead of just the stopped status // We have to store the version the change feed was stopped at in the SS instead of just the stopped status
// In addition to simplifying stopping logic, it enables communicating stopped status when fetching change feeds // In addition to simplifying stopping logic, it enables communicating stopped status when fetching change feeds
// from other SS correctly // from other SS correctly
const Value changeFeedSSValue(KeyRangeRef const& range, Version popVersion, Version stopVersion) { const Value changeFeedSSValue(KeyRangeRef const& range,
Version popVersion,
Version stopVersion,
Version metadataVersion) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed())); BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed()));
wr << range; wr << range;
wr << popVersion; wr << popVersion;
wr << stopVersion; wr << stopVersion;
wr << metadataVersion;
return wr.toValue(); return wr.toValue();
} }
std::tuple<KeyRange, Version, Version> decodeChangeFeedSSValue(ValueRef const& value) { std::tuple<KeyRange, Version, Version, Version> decodeChangeFeedSSValue(ValueRef const& value) {
KeyRange range; KeyRange range;
Version popVersion, stopVersion; Version popVersion, stopVersion, metadataVersion;
BinaryReader reader(value, IncludeVersion()); BinaryReader reader(value, IncludeVersion());
reader >> range; reader >> range;
reader >> popVersion; reader >> popVersion;
reader >> stopVersion; reader >> stopVersion;
return std::make_tuple(range, popVersion, stopVersion); reader >> metadataVersion;
return std::make_tuple(range, popVersion, stopVersion, metadataVersion);
} }
ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) { ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) {
@ -5420,10 +5481,12 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
auto& mLV = self->addVersionToMutationLog(durableVersion); auto& mLV = self->addVersionToMutationLog(durableVersion);
self->addMutationToMutationLog( self->addMutationToMutationLog(
mLV, mLV,
MutationRef( MutationRef(MutationRef::SetValue,
MutationRef::SetValue, persistChangeFeedKeys.begin.toString() + feed->second->id.toString(),
persistChangeFeedKeys.begin.toString() + feed->second->id.toString(), changeFeedSSValue(feed->second->range,
changeFeedSSValue(feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion))); feed->second->emptyVersion + 1,
feed->second->stopVersion,
feed->second->metadataVersion)));
if (feed->second->storageVersion != invalidVersion) { if (feed->second->storageVersion != invalidVersion) {
++self->counters.kvSystemClearRanges; ++self->counters.kvSystemClearRanges;
self->addMutationToMutationLog(mLV, self->addMutationToMutationLog(mLV,
@ -5515,7 +5578,8 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(), persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(),
changeFeedSSValue(changeFeedInfo->range, changeFeedSSValue(changeFeedInfo->range,
changeFeedInfo->emptyVersion + 1, changeFeedInfo->emptyVersion + 1,
changeFeedInfo->stopVersion))); changeFeedInfo->stopVersion,
changeFeedInfo->metadataVersion)));
data->addMutationToMutationLog( data->addMutationToMutationLog(
mLV, mLV,
MutationRef(MutationRef::ClearRange, MutationRef(MutationRef::ClearRange,
@ -5634,8 +5698,10 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
mLV, mLV,
MutationRef(MutationRef::SetValue, MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(), persistChangeFeedKeys.begin.toString() + changeFeedInfo->id.toString(),
changeFeedSSValue( changeFeedSSValue(changeFeedInfo->range,
changeFeedInfo->range, changeFeedInfo->emptyVersion + 1, changeFeedInfo->stopVersion))); changeFeedInfo->emptyVersion + 1,
changeFeedInfo->stopVersion,
changeFeedInfo->metadataVersion)));
data->addMutationToMutationLog(mLV, data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange, MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(changeFeedInfo->id, 0), changeFeedDurableKey(changeFeedInfo->id, 0),
@ -5732,13 +5798,6 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
} }
} }
/*fmt::print("DBG: SS {} Feed {} possibly destroyed {}, {} metadata create, {} desired committed\n",
data->thisServerID.toString().substr(0, 4),
changeFeedInfo->id.printable(),
changeFeedInfo->possiblyDestroyed,
changeFeedInfo->metadataCreateVersion,
data->desiredOldestVersion.get());*/
// There are two reasons for change_feed_not_registered: // There are two reasons for change_feed_not_registered:
// 1. The feed was just created, but the ss mutation stream is ahead of the GRV that fetchChangeFeedApplier // 1. The feed was just created, but the ss mutation stream is ahead of the GRV that fetchChangeFeedApplier
// uses to read the change feed data from the database. In this case we need to wait and retry // uses to read the change feed data from the database. In this case we need to wait and retry
@ -5777,7 +5836,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
data->changeFeedCleanupDurable[changeFeedInfo->id] = cleanupVersion; data->changeFeedCleanupDurable[changeFeedInfo->id] = cleanupVersion;
} }
for (auto& it : data->changeFeedRemovals) { for (auto& it : data->changeFeedDestroys) {
it.second.send(changeFeedInfo->id); it.second.send(changeFeedInfo->id);
} }
@ -5793,7 +5852,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data, ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
KeyRange keys, KeyRange keys,
PromiseStream<Key> removals, PromiseStream<Key> destroyedFeeds,
UID fetchKeysID) { UID fetchKeysID) {
// Wait for current TLog batch to finish to ensure that we're fetching metadata at a version >= the version of the // Wait for current TLog batch to finish to ensure that we're fetching metadata at a version >= the version of the
@ -5807,82 +5866,55 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
.detail("FetchVersion", fetchVersion) .detail("FetchVersion", fetchVersion)
.detail("FKID", fetchKeysID); .detail("FKID", fetchKeysID);
state std::set<Key> refreshedFeedIds; state OverlappingChangeFeedsInfo feedMetadata = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion));
state std::set<Key> destroyedFeedIds; // rest of this actor needs to happen without waits that might yield to scheduler, to avoid races in feed metadata.
// before fetching feeds from other SS's, refresh any feeds we already have that are being marked as removed
// Find set of feeds we currently have that were not present in fetch, to infer that they may have been destroyed.
state std::unordered_map<Key, Version> missingFeeds;
auto ranges = data->keyChangeFeed.intersectingRanges(keys); auto ranges = data->keyChangeFeed.intersectingRanges(keys);
for (auto& r : ranges) { for (auto& r : ranges) {
for (auto& cfInfo : r.value()) { for (auto& cfInfo : r.value()) {
auto feedCleanup = data->changeFeedCleanupDurable.find(cfInfo->id); if (cfInfo->removing && !cfInfo->destroyed) {
if (feedCleanup != data->changeFeedCleanupDurable.end() && cfInfo->removing && !cfInfo->destroyed) { missingFeeds.insert({ cfInfo->id, cfInfo->metadataVersion });
CODE_PROBE(true, "re-fetching feed scheduled for deletion! Un-mark it as removing");
destroyedFeedIds.insert(cfInfo->id);
cfInfo->removing = false;
// because we now have a gap in the metadata, it's possible this feed was destroyed
cfInfo->possiblyDestroyed = true;
// Set refreshInProgress, so that if this actor is replaced by an expanded move actor, the new actor
// picks up the refresh
cfInfo->refreshInProgress = true;
// reset fetch versions because everything previously fetched was cleaned up
cfInfo->fetchVersion = invalidVersion;
cfInfo->durableFetchVersion = NotifiedVersion();
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", cfInfo->id)
.detail("Range", cfInfo->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", cfInfo->emptyVersion)
.detail("StopVersion", cfInfo->stopVersion)
.detail("FKID", fetchKeysID);
} else if (cfInfo->refreshInProgress) {
CODE_PROBE(true, "Racing refreshes for same change feed in fetch");
destroyedFeedIds.insert(cfInfo->id);
} }
} }
} }
state std::vector<OverlappingChangeFeedEntry> feeds = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion)); // handle change feeds destroyed while fetching overlapping info
// handle change feeds removed while fetching overlapping while (destroyedFeeds.getFuture().isReady()) {
while (removals.getFuture().isReady()) { Key destroyed = waitNext(destroyedFeeds.getFuture());
Key remove = waitNext(removals.getFuture()); for (int i = 0; i < feedMetadata.feeds.size(); i++) {
for (int i = 0; i < feeds.size(); i++) { if (feedMetadata.feeds[i].feedId == destroyed) {
if (feeds[i].rangeId == remove) { missingFeeds.erase(destroyed); // feed definitely destroyed, no need to infer
swapAndPop(&feeds, i--); swapAndPop(&feedMetadata.feeds, i--);
} }
} }
} }
std::vector<Key> feedIds; std::vector<Key> feedIds;
feedIds.reserve(feeds.size()); feedIds.reserve(feedMetadata.feeds.size());
// create change feed metadata if it does not exist // create change feed metadata if it does not exist
for (auto& cfEntry : feeds) { for (auto& cfEntry : feedMetadata.feeds) {
auto cleanupEntry = data->changeFeedCleanupDurable.find(cfEntry.rangeId); auto cleanupEntry = data->changeFeedCleanupDurable.find(cfEntry.feedId);
bool cleanupPending = cleanupEntry != data->changeFeedCleanupDurable.end(); bool cleanupPending = cleanupEntry != data->changeFeedCleanupDurable.end();
feedIds.push_back(cfEntry.rangeId); auto existingEntry = data->uidChangeFeed.find(cfEntry.feedId);
auto existingEntry = data->uidChangeFeed.find(cfEntry.rangeId);
bool existing = existingEntry != data->uidChangeFeed.end(); bool existing = existingEntry != data->uidChangeFeed.end();
TraceEvent(SevDebug, "FetchedChangeFeedInfo", data->thisServerID) TraceEvent(SevDebug, "FetchedChangeFeedInfo", data->thisServerID)
.detail("RangeID", cfEntry.rangeId) .detail("RangeID", cfEntry.feedId)
.detail("Range", cfEntry.range) .detail("Range", cfEntry.range)
.detail("FetchVersion", fetchVersion) .detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", cfEntry.emptyVersion) .detail("EmptyVersion", cfEntry.emptyVersion)
.detail("StopVersion", cfEntry.stopVersion) .detail("StopVersion", cfEntry.stopVersion)
.detail("FeedMetadataVersion", cfEntry.feedMetadataVersion)
.detail("Existing", existing) .detail("Existing", existing)
.detail("ExistingMetadataVersion", existing ? existingEntry->second->metadataVersion : invalidVersion)
.detail("CleanupPendingVersion", cleanupPending ? cleanupEntry->second : invalidVersion) .detail("CleanupPendingVersion", cleanupPending ? cleanupEntry->second : invalidVersion)
.detail("FKID", fetchKeysID); .detail("FKID", fetchKeysID);
bool addMutationToLog = false; bool addMutationToLog = false;
Reference<ChangeFeedInfo> changeFeedInfo; Reference<ChangeFeedInfo> changeFeedInfo;
auto fid = destroyedFeedIds.find(cfEntry.rangeId);
if (fid != destroyedFeedIds.end()) {
refreshedFeedIds.insert(cfEntry.rangeId);
destroyedFeedIds.erase(fid);
}
if (!existing) { if (!existing) {
CODE_PROBE(cleanupPending, CODE_PROBE(cleanupPending,
"Fetch change feed which is cleanup pending. This means there was a move away and a move back, " "Fetch change feed which is cleanup pending. This means there was a move away and a move back, "
@ -5890,24 +5922,51 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo()); changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo());
changeFeedInfo->range = cfEntry.range; changeFeedInfo->range = cfEntry.range;
changeFeedInfo->id = cfEntry.rangeId; changeFeedInfo->id = cfEntry.feedId;
changeFeedInfo->emptyVersion = cfEntry.emptyVersion; changeFeedInfo->emptyVersion = cfEntry.emptyVersion;
changeFeedInfo->stopVersion = cfEntry.stopVersion; changeFeedInfo->stopVersion = cfEntry.stopVersion;
data->uidChangeFeed[cfEntry.rangeId] = changeFeedInfo; data->uidChangeFeed[cfEntry.feedId] = changeFeedInfo;
auto rs = data->keyChangeFeed.modify(cfEntry.range); auto rs = data->keyChangeFeed.modify(cfEntry.range);
for (auto r = rs.begin(); r != rs.end(); ++r) { for (auto r = rs.begin(); r != rs.end(); ++r) {
r->value().push_back(changeFeedInfo); r->value().push_back(changeFeedInfo);
} }
data->keyChangeFeed.coalesce(cfEntry.range.contents()); data->keyChangeFeed.coalesce(cfEntry.range);
addMutationToLog = true; addMutationToLog = true;
} else { } else {
changeFeedInfo = existingEntry->second; changeFeedInfo = existingEntry->second;
CODE_PROBE(cfEntry.feedMetadataVersion > data->version.get(),
"Change Feed fetched future metadata version");
auto fid = missingFeeds.find(cfEntry.feedId);
if (fid != missingFeeds.end()) {
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", changeFeedInfo->id.printable())
.detail("Range", changeFeedInfo->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
.detail("StopVersion", changeFeedInfo->stopVersion)
.detail("PreviousMetadataVersion", changeFeedInfo->metadataVersion)
.detail("NewMetadataVersion", cfEntry.feedMetadataVersion)
.detail("FKID", fetchKeysID);
missingFeeds.erase(fid);
ASSERT(!changeFeedInfo->destroyed);
ASSERT(changeFeedInfo->removing);
CODE_PROBE(true, "re-fetching feed scheduled for deletion! Un-mark it as removing");
changeFeedInfo->removing = false;
// reset fetch versions because everything previously fetched was cleaned up
changeFeedInfo->fetchVersion = invalidVersion;
changeFeedInfo->durableFetchVersion = NotifiedVersion();
addMutationToLog = true;
}
if (changeFeedInfo->destroyed) { if (changeFeedInfo->destroyed) {
// race where multiple feeds fetched overlapping change feed, one realized feed was missing and marked CODE_PROBE(true, "Change feed fetched and destroyed by other fetch while fetching metadata");
// it removed+destroyed, then this one fetched the same info
continue; continue;
} }
@ -5927,82 +5986,63 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
addMutationToLog = true; addMutationToLog = true;
} }
} }
feedIds.push_back(cfEntry.feedId);
addMutationToLog |= changeFeedInfo->updateMetadataVersion(cfEntry.feedMetadataVersion);
if (addMutationToLog) { if (addMutationToLog) {
ASSERT(changeFeedInfo.isValid()); ASSERT(changeFeedInfo.isValid());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); Version logV = data->data().getLatestVersion();
auto& mLV = data->addVersionToMutationLog(logV);
data->addMutationToMutationLog( data->addMutationToMutationLog(
mLV, mLV,
MutationRef( MutationRef(MutationRef::SetValue,
MutationRef::SetValue, persistChangeFeedKeys.begin.toString() + cfEntry.feedId.toString(),
persistChangeFeedKeys.begin.toString() + cfEntry.rangeId.toString(), changeFeedSSValue(cfEntry.range,
changeFeedSSValue(cfEntry.range, changeFeedInfo->emptyVersion + 1, changeFeedInfo->stopVersion))); changeFeedInfo->emptyVersion + 1,
changeFeedInfo->stopVersion,
changeFeedInfo->metadataVersion)));
// if we updated pop version, remove mutations // if we updated pop version, remove mutations
while (!changeFeedInfo->mutations.empty() && while (!changeFeedInfo->mutations.empty() &&
changeFeedInfo->mutations.front().version <= changeFeedInfo->emptyVersion) { changeFeedInfo->mutations.front().version <= changeFeedInfo->emptyVersion) {
changeFeedInfo->mutations.pop_front(); changeFeedInfo->mutations.pop_front();
} }
if (BUGGIFY) {
data->maybeInjectTargetedRestart(logV);
}
} }
} }
CODE_PROBE(!refreshedFeedIds.empty(), "Feed refreshed between move away and move back"); for (auto& feed : missingFeeds) {
CODE_PROBE(!destroyedFeedIds.empty(), "Feed destroyed between move away and move back"); auto existingEntry = data->uidChangeFeed.find(feed.first);
for (auto& feedId : refreshedFeedIds) { ASSERT(existingEntry != data->uidChangeFeed.end());
auto existingEntry = data->uidChangeFeed.find(feedId); ASSERT(existingEntry->second->removing);
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed || ASSERT(!existingEntry->second->destroyed);
!existingEntry->second->refreshInProgress) {
CODE_PROBE(true, "feed refreshed"); Version fetchedMetadataVersion = feedMetadata.getFeedMetadataVersion(existingEntry->second->range);
Version lastMetadataVersion = feed.second;
// Look for case where feed's range was moved away, feed was destroyed, and then feed's range was moved back.
// This happens where feed is removing, the fetch metadata is higher than the moved away version, and the feed
// isn't in the fetched response. In that case, the feed must have been destroyed between lastMetadataVersion
// and fetchedMetadataVersion
if (lastMetadataVersion >= fetchedMetadataVersion) {
CODE_PROBE(true, "Change Feed fetched higher metadata version before moved away");
continue; continue;
} }
// Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore
// it
// We may just want to refactor this so updateStorage does explicit deletes based on
// changeFeedCleanupDurable and not use the mutation log at all for the change feed metadata cleanup.
// Then we wouldn't have to reset anything here or above
// Do the mutation log update here instead of above to ensure we only add it back to the mutation log if we're
// sure it wasn't deleted in the metadata gap
Version metadataVersion = data->data().getLatestVersion();
auto& mLV = data->addVersionToMutationLog(metadataVersion);
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + existingEntry->second->id.toString(),
changeFeedSSValue(existingEntry->second->range,
existingEntry->second->emptyVersion + 1,
existingEntry->second->stopVersion)));
TraceEvent(SevDebug, "PersistingResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", existingEntry->second->id)
.detail("Range", existingEntry->second->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", existingEntry->second->emptyVersion)
.detail("StopVersion", existingEntry->second->stopVersion)
.detail("FKID", fetchKeysID)
.detail("MetadataVersion", metadataVersion);
existingEntry->second->refreshInProgress = false;
}
for (auto& feedId : destroyedFeedIds) {
auto existingEntry = data->uidChangeFeed.find(feedId);
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed) {
CODE_PROBE(true, "feed refreshed but then destroyed elsewhere");
continue;
}
/*fmt::print("DBG: SS {} fetching feed {} was refreshed but not present!! assuming destroyed\n",
data->thisServerID.toString().substr(0, 4),
feedId.printable());*/
Version cleanupVersion = data->data().getLatestVersion(); Version cleanupVersion = data->data().getLatestVersion();
CODE_PROBE(true, "Destroying change feed from fetch metadata"); //
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetchMetadata", data->thisServerID) TraceEvent(SevDebug, "DestroyingChangeFeedFromFetchMetadata", data->thisServerID)
.detail("RangeID", feedId) .detail("RangeID", feed.first)
.detail("Range", existingEntry->second->range) .detail("Range", existingEntry->second->range)
.detail("Version", cleanupVersion) .detail("Version", cleanupVersion)
.detail("FKID", fetchKeysID); .detail("FKID", fetchKeysID);
if (g_network->isSimulated()) { if (g_network->isSimulated()) {
ASSERT(g_simulator.validationData.allDestroyedChangeFeedIDs.count(feedId.toString())); // verify that the feed was actually destroyed and it's not an error in this inference logic
ASSERT(g_simulator.validationData.allDestroyedChangeFeedIDs.count(feed.first.toString()));
} }
Key beginClearKey = feedId.withPrefix(persistChangeFeedKeys.begin); Key beginClearKey = feed.first.withPrefix(persistChangeFeedKeys.begin);
auto& mLV = data->addVersionToMutationLog(cleanupVersion); auto& mLV = data->addVersionToMutationLog(cleanupVersion);
data->addMutationToMutationLog(mLV, data->addMutationToMutationLog(mLV,
@ -6010,15 +6050,18 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
++data->counters.kvSystemClearRanges; ++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV, data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange, MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(feedId, 0), changeFeedDurableKey(feed.first, 0),
changeFeedDurableKey(feedId, cleanupVersion))); changeFeedDurableKey(feed.first, cleanupVersion)));
++data->counters.kvSystemClearRanges; ++data->counters.kvSystemClearRanges;
existingEntry->second->destroy(cleanupVersion); existingEntry->second->destroy(cleanupVersion);
data->changeFeedCleanupDurable[feedId] = cleanupVersion; data->changeFeedCleanupDurable[feed.first] = cleanupVersion;
for (auto& it : data->changeFeedRemovals) { for (auto& it : data->changeFeedDestroys) {
it.second.send(feedId); it.second.send(feed.first);
}
if (BUGGIFY) {
data->maybeInjectTargetedRestart(cleanupVersion);
} }
} }
return feedIds; return feedIds;
@ -6031,7 +6074,7 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
KeyRange keys, KeyRange keys,
Version beginVersion, Version beginVersion,
Version endVersion, Version endVersion,
PromiseStream<Key> removals, PromiseStream<Key> destroyedFeeds,
std::vector<Key>* feedIds, std::vector<Key>* feedIds,
std::unordered_set<Key> newFeedIds) { std::unordered_set<Key> newFeedIds) {
state std::unordered_map<Key, Version> feedMaxFetched; state std::unordered_map<Key, Version> feedMaxFetched;
@ -6060,7 +6103,7 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
loop { loop {
Future<Version> nextFeed = Never(); Future<Version> nextFeed = Never();
if (!removals.getFuture().isReady()) { if (!destroyedFeeds.getFuture().isReady()) {
bool done = true; bool done = true;
while (!feedFetches.empty()) { while (!feedFetches.empty()) {
if (feedFetches.begin()->second.isReady()) { if (feedFetches.begin()->second.isReady()) {
@ -6080,11 +6123,11 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
} }
} }
choose { choose {
when(state Key remove = waitNext(removals.getFuture())) { when(state Key destroyed = waitNext(destroyedFeeds.getFuture())) {
wait(delay(0)); wait(delay(0));
feedFetches.erase(remove); feedFetches.erase(destroyed);
for (int i = 0; i < feedIds->size(); i++) { for (int i = 0; i < feedIds->size(); i++) {
if ((*feedIds)[i] == remove) { if ((*feedIds)[i] == destroyed) {
swapAndPop(feedIds, i--); swapAndPop(feedIds, i--);
} }
} }
@ -6095,7 +6138,7 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
} catch (Error& e) { } catch (Error& e) {
if (!data->shuttingDown) { if (!data->shuttingDown) {
data->changeFeedRemovals.erase(fetchKeysID); data->changeFeedDestroys.erase(fetchKeysID);
} }
throw; throw;
} }
@ -6108,6 +6151,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state Future<Void> warningLogger = logFetchKeysWarning(shard); state Future<Void> warningLogger = logFetchKeysWarning(shard);
state const double startTime = now(); state const double startTime = now();
state Version fetchVersion = invalidVersion; state Version fetchVersion = invalidVersion;
state PromiseStream<Key> destroyedFeeds;
state FetchKeysMetricReporter metricReporter(fetchKeysID, state FetchKeysMetricReporter metricReporter(fetchKeysID,
startTime, startTime,
keys, keys,
@ -6116,17 +6161,27 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
data->counters.bytesFetched, data->counters.bytesFetched,
data->counters.kvFetched); data->counters.kvFetched);
// need to set this at the very start of the fetch, to handle any private change feed destroy mutations we get for
// this key range, that apply to change feeds we don't know about yet because their metadata hasn't been fetched yet
data->changeFeedDestroys[fetchKeysID] = destroyedFeeds;
// delay(0) to force a return to the run loop before the work of fetchKeys is started. // delay(0) to force a return to the run loop before the work of fetchKeys is started.
// This allows adding->start() to be called inline with CSK. // This allows adding->start() to be called inline with CSK.
wait(data->coreStarted.getFuture() && delay(0)); try {
wait(data->coreStarted.getFuture() && delay(0));
// On SS Reboot, durableVersion == latestVersion, so any mutations we add to the mutation log would be skipped if // On SS Reboot, durableVersion == latestVersion, so any mutations we add to the mutation log would be skipped
// added before latest version advances. // if added before latest version advances. To ensure this doesn't happen, we wait for version to increase by
// To ensure this doesn't happen, we wait for version to increase by one if this fetchKeys was initiated by a // one if this fetchKeys was initiated by a changeServerKeys from restoreDurableState
// changeServerKeys from restoreDurableState if (data->version.get() == data->durableVersion.get()) {
if (data->version.get() == data->durableVersion.get()) { wait(data->version.whenAtLeast(data->version.get() + 1));
wait(data->version.whenAtLeast(data->version.get() + 1)); wait(delay(0));
wait(delay(0)); }
} catch (Error& e) {
if (!data->shuttingDown) {
data->changeFeedDestroys.erase(fetchKeysID);
}
throw e;
} }
try { try {
@ -6138,9 +6193,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
.detail("Version", data->version.get()) .detail("Version", data->version.get())
.detail("FKID", fetchKeysID); .detail("FKID", fetchKeysID);
state PromiseStream<Key> removals; state Future<std::vector<Key>> fetchCFMetadata =
data->changeFeedRemovals[fetchKeysID] = removals; fetchChangeFeedMetadata(data, keys, destroyedFeeds, fetchKeysID);
state Future<std::vector<Key>> fetchCFMetadata = fetchChangeFeedMetadata(data, keys, removals, fetchKeysID);
validate(data); validate(data);
@ -6397,8 +6451,14 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// being recovered. Instead we wait for the updateStorage loop to commit something (and consequently also what // being recovered. Instead we wait for the updateStorage loop to commit something (and consequently also what
// we have written) // we have written)
state Future<std::unordered_map<Key, Version>> feedFetchMain = dispatchChangeFeeds( state Future<std::unordered_map<Key, Version>> feedFetchMain = dispatchChangeFeeds(data,
data, fetchKeysID, keys, 0, fetchVersion + 1, removals, &changeFeedsToFetch, std::unordered_set<Key>()); fetchKeysID,
keys,
0,
fetchVersion + 1,
destroyedFeeds,
&changeFeedsToFetch,
std::unordered_set<Key>());
state Future<Void> fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1); state Future<Void> fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1);
state Future<Void> dataArrive = data->version.whenAtLeast(fetchVersion); state Future<Void> dataArrive = data->version.whenAtLeast(fetchVersion);
@ -6461,7 +6521,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
keys, keys,
fetchVersion + 1, fetchVersion + 1,
shard->transferredVersion, shard->transferredVersion,
removals, destroyedFeeds,
&changeFeedsToFetch, &changeFeedsToFetch,
newChangeFeeds); newChangeFeeds);
@ -6515,7 +6575,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
} }
} }
data->changeFeedRemovals.erase(fetchKeysID); data->changeFeedDestroys.erase(fetchKeysID);
shard->phase = AddingShard::Waiting; shard->phase = AddingShard::Waiting;
@ -6571,7 +6631,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
.errorUnsuppressed(e) .errorUnsuppressed(e)
.detail("Version", data->version.get()); .detail("Version", data->version.get());
if (!data->shuttingDown) { if (!data->shuttingDown) {
data->changeFeedRemovals.erase(fetchKeysID); data->changeFeedDestroys.erase(fetchKeysID);
} }
if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) { if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
if (shard->phase < AddingShard::FetchingCF) { if (shard->phase < AddingShard::FetchingCF) {
@ -6824,11 +6884,15 @@ void cleanUpChangeFeeds(StorageServer* data, const KeyRangeRef& keys, Version ve
auto feed = data->uidChangeFeed.find(f.first); auto feed = data->uidChangeFeed.find(f.first);
if (feed != data->uidChangeFeed.end()) { if (feed != data->uidChangeFeed.end()) {
feed->second->updateMetadataVersion(version);
feed->second->removing = true; feed->second->removing = true;
feed->second->refreshInProgress = false;
feed->second->moved(feed->second->range); feed->second->moved(feed->second->range);
feed->second->newMutations.trigger(); feed->second->newMutations.trigger();
} }
if (BUGGIFY) {
data->maybeInjectTargetedRestart(durableVersion);
}
} else { } else {
// if just part of feed's range is moved away // if just part of feed's range is moved away
auto feed = data->uidChangeFeed.find(f.first); auto feed = data->uidChangeFeed.find(f.first);
@ -7449,7 +7513,7 @@ private:
.detail("Status", status); .detail("Status", status);
// Because of data moves, we can get mutations operating on a change feed we don't yet know about, because // Because of data moves, we can get mutations operating on a change feed we don't yet know about, because
// the fetch hasn't started yet // the metadata fetch hasn't started yet
bool createdFeed = false; bool createdFeed = false;
if (feed == data->uidChangeFeed.end() && status != ChangeFeedStatus::CHANGE_FEED_DESTROY) { if (feed == data->uidChangeFeed.end() && status != ChangeFeedStatus::CHANGE_FEED_DESTROY) {
createdFeed = true; createdFeed = true;
@ -7481,6 +7545,9 @@ private:
} }
data->keyChangeFeed.coalesce(changeFeedRange.contents()); data->keyChangeFeed.coalesce(changeFeedRange.contents());
} }
if (feed != data->uidChangeFeed.end()) {
feed->second->updateMetadataVersion(currentVersion);
}
bool popMutationLog = false; bool popMutationLog = false;
bool addMutationToLog = false; bool addMutationToLog = false;
@ -7542,22 +7609,29 @@ private:
feed->second->destroy(currentVersion); feed->second->destroy(currentVersion);
data->changeFeedCleanupDurable[feed->first] = cleanupVersion; data->changeFeedCleanupDurable[feed->first] = cleanupVersion;
if (BUGGIFY) {
data->maybeInjectTargetedRestart(cleanupVersion);
}
} }
if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) { if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) {
for (auto& it : data->changeFeedRemovals) { for (auto& it : data->changeFeedDestroys) {
it.second.send(changeFeedId); it.second.send(changeFeedId);
} }
} }
if (addMutationToLog) { if (addMutationToLog) {
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); Version logV = data->data().getLatestVersion();
auto& mLV = data->addVersionToMutationLog(logV);
data->addMutationToMutationLog( data->addMutationToMutationLog(
mLV, mLV,
MutationRef(MutationRef::SetValue, MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + changeFeedId.toString(), persistChangeFeedKeys.begin.toString() + changeFeedId.toString(),
changeFeedSSValue( changeFeedSSValue(feed->second->range,
feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion))); feed->second->emptyVersion + 1,
feed->second->stopVersion,
feed->second->metadataVersion)));
if (popMutationLog) { if (popMutationLog) {
++data->counters.kvSystemClearRanges; ++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV, data->addMutationToMutationLog(mLV,
@ -7565,6 +7639,9 @@ private:
changeFeedDurableKey(feed->second->id, 0), changeFeedDurableKey(feed->second->id, 0),
changeFeedDurableKey(feed->second->id, popVersion))); changeFeedDurableKey(feed->second->id, popVersion)));
} }
if (BUGGIFY) {
data->maybeInjectTargetedRestart(logV);
}
} }
} else if ((m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) && } else if ((m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) &&
m.param1.startsWith(TenantMetadata::tenantMapPrivatePrefix)) { m.param1.startsWith(TenantMetadata::tenantMapPrivatePrefix)) {
@ -7777,6 +7854,10 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
} }
} }
if (data->maybeInjectDelay()) {
wait(delay(deterministicRandom()->random01() * 10.0));
}
while (data->byteSampleClearsTooLarge.get()) { while (data->byteSampleClearsTooLarge.get()) {
wait(data->byteSampleClearsTooLarge.onChange()); wait(data->byteSampleClearsTooLarge.onChange());
} }
@ -8526,6 +8607,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
TraceEvent("RebootWhenDurableTriggered", data->thisServerID) TraceEvent("RebootWhenDurableTriggered", data->thisServerID)
.detail("NewOldestVersion", newOldestVersion) .detail("NewOldestVersion", newOldestVersion)
.detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion); .detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
CODE_PROBE(true, "SS rebooting after durable");
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this // To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this
// process) never sets durableInProgress, we should set durableInProgress before send the // process) never sets durableInProgress, we should set durableInProgress before send the
// please_reboot() error. Otherwise, in the race situation when storage server receives both reboot and // please_reboot() error. Otherwise, in the race situation when storage server receives both reboot and
@ -8674,7 +8756,8 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
// ASSERT( self->debug_inApplyUpdate ); // ASSERT( self->debug_inApplyUpdate );
ASSERT(!keys.empty()); ASSERT(!keys.empty());
auto& mLV = self->addVersionToMutationLog(self->data().getLatestVersion()); Version logV = self->data().getLatestVersion();
auto& mLV = self->addVersionToMutationLog(logV);
KeyRange availableKeys = KeyRangeRef(persistShardAvailableKeys.begin.toString() + keys.begin.toString(), KeyRange availableKeys = KeyRangeRef(persistShardAvailableKeys.begin.toString() + keys.begin.toString(),
persistShardAvailableKeys.begin.toString() + keys.end.toString()); persistShardAvailableKeys.begin.toString() + keys.end.toString());
@ -8710,6 +8793,10 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
.detail("DeleteVersion", mLV.version + 1); .detail("DeleteVersion", mLV.version + 1);
} }
} }
if (BUGGIFY) {
self->maybeInjectTargetedRestart(logV);
}
} }
void updateStorageShard(StorageServer* data, StorageServerShard shard) { void updateStorageShard(StorageServer* data, StorageServerShard shard) {
@ -8746,7 +8833,8 @@ void updateStorageShard(StorageServer* data, StorageServerShard shard) {
void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) { void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) {
ASSERT(!keys.empty()); ASSERT(!keys.empty());
auto& mLV = self->addVersionToMutationLog(self->data().getLatestVersion()); Version logV = self->data().getLatestVersion();
auto& mLV = self->addVersionToMutationLog(logV);
KeyRange assignedKeys = KeyRangeRef(persistShardAssignedKeys.begin.toString() + keys.begin.toString(), KeyRange assignedKeys = KeyRangeRef(persistShardAssignedKeys.begin.toString() + keys.begin.toString(),
persistShardAssignedKeys.begin.toString() + keys.end.toString()); persistShardAssignedKeys.begin.toString() + keys.end.toString());
//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end); //TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end);
@ -8763,6 +8851,10 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
assignedKeys.end, assignedKeys.end,
endAssigned ? LiteralStringRef("1") : LiteralStringRef("0"))); endAssigned ? LiteralStringRef("1") : LiteralStringRef("0")));
} }
if (BUGGIFY) {
self->maybeInjectTargetedRestart(logV);
}
} }
void StorageServerDisk::clearRange(KeyRangeRef keys) { void StorageServerDisk::clearRange(KeyRangeRef keys) {
@ -9166,13 +9258,15 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) { for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) {
Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin); Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin);
KeyRange changeFeedRange; KeyRange changeFeedRange;
Version popVersion, stopVersion; Version popVersion, stopVersion, metadataVersion;
std::tie(changeFeedRange, popVersion, stopVersion) = decodeChangeFeedSSValue(changeFeeds[feedLoc].value); std::tie(changeFeedRange, popVersion, stopVersion, metadataVersion) =
decodeChangeFeedSSValue(changeFeeds[feedLoc].value);
TraceEvent(SevDebug, "RestoringChangeFeed", data->thisServerID) TraceEvent(SevDebug, "RestoringChangeFeed", data->thisServerID)
.detail("RangeID", changeFeedId) .detail("RangeID", changeFeedId)
.detail("Range", changeFeedRange) .detail("Range", changeFeedRange)
.detail("StopVersion", stopVersion) .detail("StopVersion", stopVersion)
.detail("PopVer", popVersion); .detail("PopVer", popVersion)
.detail("MetadataVersion", metadataVersion);
Reference<ChangeFeedInfo> changeFeedInfo(new ChangeFeedInfo()); Reference<ChangeFeedInfo> changeFeedInfo(new ChangeFeedInfo());
changeFeedInfo->range = changeFeedRange; changeFeedInfo->range = changeFeedRange;
changeFeedInfo->id = changeFeedId; changeFeedInfo->id = changeFeedId;
@ -9180,6 +9274,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
changeFeedInfo->storageVersion = version; changeFeedInfo->storageVersion = version;
changeFeedInfo->emptyVersion = popVersion - 1; changeFeedInfo->emptyVersion = popVersion - 1;
changeFeedInfo->stopVersion = stopVersion; changeFeedInfo->stopVersion = stopVersion;
changeFeedInfo->metadataVersion = metadataVersion;
data->uidChangeFeed[changeFeedId] = changeFeedInfo; data->uidChangeFeed[changeFeedId] = changeFeedInfo;
auto rs = data->keyChangeFeed.modify(changeFeedRange); auto rs = data->keyChangeFeed.modify(changeFeedRange);
for (auto r = rs.begin(); r != rs.end(); ++r) { for (auto r = rs.begin(); r != rs.end(); ++r) {

View File

@ -1,6 +1,8 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
allowDefaultTenant = false allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point # FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5] storageEngineExcludeTypes = [4, 5]

View File

@ -1,6 +1,8 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
allowDefaultTenant = false allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point # FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5] storageEngineExcludeTypes = [4, 5]

View File

@ -1,6 +1,8 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
allowDefaultTenant = false allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: exclude redwood because WriteDuringRead can write massive KV pairs and we don't chunk change feed data on disk yet # FIXME: exclude redwood because WriteDuringRead can write massive KV pairs and we don't chunk change feed data on disk yet
# FIXME: re-enable rocks at some point # FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [3, 4, 5] storageEngineExcludeTypes = [3, 4, 5]

View File

@ -2,6 +2,8 @@
blobGranulesEnabled = true blobGranulesEnabled = true
allowDefaultTenant = false allowDefaultTenant = false
allowDisablingTenants = false allowDisablingTenants = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point # FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5] storageEngineExcludeTypes = [4, 5]

View File

@ -1,6 +1,8 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
allowDefaultTenant = false allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point # FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5] storageEngineExcludeTypes = [4, 5]

View File

@ -1,6 +1,8 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
allowDefaultTenant = false allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point # FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5] storageEngineExcludeTypes = [4, 5]