diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 758020adf8..20d48846cc 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -111,6 +111,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT, 20 ); init( QUARANTINE_TSS_ON_MISMATCH, true ); if( randomize && BUGGIFY ) QUARANTINE_TSS_ON_MISMATCH = false; // if true, a tss mismatch will put the offending tss in quarantine. If false, it will just be killed init( CHANGE_FEED_EMPTY_BATCH_TIME, 0.005 ); + init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true; //KeyRangeMap init( KRM_GET_RANGE_LIMIT, 1e5 ); if( randomize && BUGGIFY ) KRM_GET_RANGE_LIMIT = 10; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index e1c9d6e82c..63ecafa06c 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7900,8 +7900,9 @@ Future>> Transaction::readBlobGranules return readBlobGranulesActor(this, range, begin, readVersion, readVersionOut); } -ACTOR Future setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) { +ACTOR Future setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) { state ReadYourWritesTransaction tr(cx); + state Version version = invalidVersion; loop { try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); @@ -7911,12 +7912,13 @@ ACTOR Future setPerpetualStorageWiggle(Database cx, bool enable, LockAware tr.set(perpetualStorageWiggleKey, enable ? "1"_sr : "0"_sr); wait(tr.commit()); + version = tr.getCommittedVersion(); break; } catch (Error& e) { wait(tr.onError(e)); } } - return Void(); + return version; } ACTOR Future>> readStorageWiggleValues(Database cx, diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index c18984f058..0e2d989f6d 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -584,6 +584,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( REMOVE_RETRY_DELAY, 1.0 ); init( MOVE_KEYS_KRM_LIMIT, 2000 ); if( randomize && BUGGIFY ) MOVE_KEYS_KRM_LIMIT = 2; init( MOVE_KEYS_KRM_LIMIT_BYTES, 1e5 ); if( randomize && BUGGIFY ) MOVE_KEYS_KRM_LIMIT_BYTES = 5e4; //This must be sufficiently larger than CLIENT_KNOBS->KEY_SIZE_LIMIT (fdbclient/Knobs.h) to ensure that at least two entries will be returned from an attempt to read a key range map + init( MOVE_SHARD_KRM_ROW_LIMIT, 20000 ); + init( MOVE_SHARD_KRM_BYTE_LIMIT, 1e6 ); init( MAX_SKIP_TAGS, 1 ); //The TLogs require tags to be densely packed to be memory efficient, so be careful increasing this knob init( MAX_ADDED_SOURCES_MULTIPLIER, 2.0 ); diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 8b5c9b6fb6..2fb2bf7180 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -27,6 +27,9 @@ #include "flow/serialize.h" #include "flow/UnitTest.h" +FDB_DEFINE_BOOLEAN_PARAM(AssignEmptyRange); +FDB_DEFINE_BOOLEAN_PARAM(UnassignShard); + const KeyRef systemKeysPrefix = LiteralStringRef("\xff"); const KeyRangeRef normalKeys(KeyRef(), systemKeysPrefix); const KeyRangeRef systemKeys(systemKeysPrefix, LiteralStringRef("\xff\xff")); @@ -43,6 +46,10 @@ const KeyRangeRef keyServersKeyServersKeys(LiteralStringRef("\xff/keyServers/\xf LiteralStringRef("\xff/keyServers/\xff/keyServers0")); const KeyRef keyServersKeyServersKey = keyServersKeyServersKeys.begin; +// These constants are selected to be easily recognized during debugging. +const UID anonymousShardId = UID(0x666666, 0x88888888); +const uint64_t emptyShardId = 0x7777777; + const Key keyServersKey(const KeyRef& k) { return k.withPrefix(keyServersPrefix); } @@ -87,6 +94,21 @@ const Value keyServersValue(RangeResult result, const std::vector& src, con return keyServersValue(srcTag, destTag); } + +const Value keyServersValue(const std::vector& src, + const std::vector& dest, + const UID& srcID, + const UID& destID) { + BinaryWriter wr(IncludeVersion(ProtocolVersion::withShardEncodeLocationMetaData())); + if (dest.empty()) { + ASSERT(!destID.isValid()); + wr << src << dest << srcID; + } else { + wr << src << dest << srcID << destID; + } + return wr.toValue(); +} + const Value keyServersValue(const std::vector& srcTag, const std::vector& destTag) { // src and dest are expected to be sorted BinaryWriter wr(IncludeVersion(ProtocolVersion::withKeyServerValueV2())); @@ -106,6 +128,11 @@ void decodeKeyServersValue(RangeResult result, } BinaryReader rd(value, IncludeVersion()); + if (rd.protocolVersion().hasShardEncodeLocationMetaData()) { + UID srcId, destId; + decodeKeyServersValue(result, value, src, dest, srcId, destId); + return; + } if (!rd.protocolVersion().hasKeyServerValueV2()) { rd >> src >> dest; return; @@ -145,6 +172,42 @@ void decodeKeyServersValue(RangeResult result, } } +void decodeKeyServersValue(RangeResult result, + const ValueRef& value, + std::vector& src, + std::vector& dest, + UID& srcID, + UID& destID, + bool missingIsError) { + src.clear(); + dest.clear(); + srcID = UID(); + destID = UID(); + + if (value.size() == 0) { + return; + } + + BinaryReader rd(value, IncludeVersion()); + if (rd.protocolVersion().hasShardEncodeLocationMetaData()) { + rd >> src >> dest >> srcID; + if (rd.empty()) { + ASSERT(dest.empty()); + } else { + rd >> destID; + rd.assertEnd(); + } + } else { + decodeKeyServersValue(result, value, src, dest, missingIsError); + if (!src.empty()) { + srcID = anonymousShardId; + } + if (!dest.empty()) { + destID = anonymousShardId; + } + } +} + void decodeKeyServersValue(std::map const& tag_uid, const ValueRef& value, std::vector& src, @@ -167,6 +230,16 @@ void decodeKeyServersValue(std::map const& tag_uid, if (value.size() != sizeof(ProtocolVersion) + sizeof(int) + srcLen * sizeof(Tag) + sizeof(int) + destLen * sizeof(Tag)) { rd >> src >> dest; + if (rd.protocolVersion().hasShardEncodeLocationMetaData()) { + UID srcId, destId; + rd >> srcId; + if (rd.empty()) { + ASSERT(dest.empty()); + destId = UID(); + } else { + rd >> destId; + } + } rd.assertEnd(); return; } @@ -242,6 +315,33 @@ CheckpointMetaData decodeCheckpointValue(const ValueRef& value) { return checkpoint; } +// "\xff/dataMoves/[[UID]] := [[DataMoveMetaData]]" +const KeyRangeRef dataMoveKeys("\xff/dataMoves/"_sr, "\xff/dataMoves0"_sr); +const Key dataMoveKeyFor(UID dataMoveId) { + BinaryWriter wr(Unversioned()); + wr.serializeBytes(dataMoveKeys.begin); + wr << dataMoveId; + return wr.toValue(); +} + +const Value dataMoveValue(const DataMoveMetaData& dataMoveMetaData) { + return ObjectWriter::toValue(dataMoveMetaData, IncludeVersion()); +} + +UID decodeDataMoveKey(const KeyRef& key) { + UID id; + BinaryReader rd(key.removePrefix(dataMoveKeys.begin), Unversioned()); + rd >> id; + return id; +} + +DataMoveMetaData decodeDataMoveValue(const ValueRef& value) { + DataMoveMetaData dataMove; + ObjectReader reader(value.begin(), IncludeVersion()); + reader.deserialize(dataMove); + return dataMove; +} + // "\xff/cacheServer/[[UID]] := StorageServerInterface" const KeyRangeRef storageCacheServerKeys(LiteralStringRef("\xff/cacheServer/"), LiteralStringRef("\xff/cacheServer0")); const KeyRef storageCacheServersPrefix = storageCacheServerKeys.begin; @@ -308,6 +408,20 @@ const ValueRef serverKeysTrue = "1"_sr, // compatible with what was serverKeysTr serverKeysTrueEmptyRange = "3"_sr, // the server treats the range as empty. serverKeysFalse; +const UID newShardId(const uint64_t physicalShardId, AssignEmptyRange assignEmptyRange, UnassignShard unassignShard) { + uint64_t split = 0; + if (assignEmptyRange) { + split = emptyShardId; + } else if (unassignShard) { + split = 0; + } else { + do { + split = deterministicRandom()->randomUInt64(); + } while (split == anonymousShardId.second() || split == 0 || split == emptyShardId); + } + return UID(physicalShardId, split); +} + const Key serverKeysKey(UID serverID, const KeyRef& key) { BinaryWriter wr(Unversioned()); wr.serializeBytes(serverKeysPrefix); @@ -342,7 +456,43 @@ std::pair serverKeysDecodeServerBegin(const KeyRef& key) { } bool serverHasKey(ValueRef storedValue) { - return storedValue == serverKeysTrue || storedValue == serverKeysTrueEmptyRange; + UID teamId; + bool assigned, emptyRange; + decodeServerKeysValue(storedValue, assigned, emptyRange, teamId); + return assigned; +} + +const Value serverKeysValue(const UID& id) { + if (!id.isValid()) { + return serverKeysFalse; + } + + BinaryWriter wr(IncludeVersion(ProtocolVersion::withShardEncodeLocationMetaData())); + wr << id; + return wr.toValue(); +} + +void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id) { + if (value.size() == 0) { + id = UID(); + assigned = false; + emptyRange = false; + } else if (value == serverKeysTrue) { + assigned = true; + emptyRange = false; + } else if (value == serverKeysTrueEmptyRange) { + assigned = true; + emptyRange = true; + } else if (value == serverKeysFalse) { + assigned = false; + emptyRange = false; + } else { + BinaryReader rd(value, IncludeVersion()); + ASSERT(rd.protocolVersion().hasShardEncodeLocationMetaData()); + rd >> id; + assigned = id.second() != 0; + emptyRange = id.second() == emptyShardId; + } } const KeyRef cacheKeysPrefix = LiteralStringRef("\xff\x02/cacheKeys/"); @@ -1466,3 +1616,91 @@ TEST_CASE("/SystemData/SerDes/SSI") { return Void(); } + +// Tests compatibility of different keyServersValue() and decodeKeyServersValue(). +TEST_CASE("noSim/SystemData/compat/KeyServers") { + printf("testing keyServers serdes\n"); + std::vector src, dest; + std::map tag_uid; + std::map uid_tag; + std::vector srcTag, destTag; + const int n = 3; + int8_t locality = 1; + uint16_t id = 1; + UID srcId = deterministicRandom()->randomUniqueID(), destId = deterministicRandom()->randomUniqueID(); + for (int i = 0; i < n; ++i) { + src.push_back(deterministicRandom()->randomUniqueID()); + tag_uid.emplace(Tag(locality, id), src.back()); + uid_tag.emplace(src.back(), Tag(locality, id++)); + dest.push_back(deterministicRandom()->randomUniqueID()); + tag_uid.emplace(Tag(locality, id), dest.back()); + uid_tag.emplace(dest.back(), Tag(locality, id++)); + } + std::sort(src.begin(), src.end()); + std::sort(dest.begin(), dest.end()); + RangeResult idTag; + for (int i = 0; i < src.size(); ++i) { + idTag.push_back_deep(idTag.arena(), KeyValueRef(serverTagKeyFor(src[i]), serverTagValue(uid_tag[src[i]]))); + } + for (int i = 0; i < dest.size(); ++i) { + idTag.push_back_deep(idTag.arena(), KeyValueRef(serverTagKeyFor(dest[i]), serverTagValue(uid_tag[dest[i]]))); + } + + auto decodeAndVerify = + [&src, &dest, &tag_uid, &idTag](ValueRef v, const UID expectedSrcId, const UID expectedDestId) { + std::vector resSrc, resDest; + UID resSrcId, resDestId; + + decodeKeyServersValue(idTag, v, resSrc, resDest, resSrcId, resDestId); + TraceEvent("VerifyKeyServersSerDes") + .detail("ExpectedSrc", describe(src)) + .detail("ActualSrc", describe(resSrc)) + .detail("ExpectedDest", describe(dest)) + .detail("ActualDest", describe(resDest)) + .detail("ExpectedDestID", expectedDestId) + .detail("ActualDestID", resDestId) + .detail("ExpectedSrcID", expectedSrcId) + .detail("ActualSrcID", resSrcId); + ASSERT(std::equal(src.begin(), src.end(), resSrc.begin())); + ASSERT(std::equal(dest.begin(), dest.end(), resDest.begin())); + ASSERT(resSrcId == expectedSrcId); + ASSERT(resDestId == expectedDestId); + + resSrc.clear(); + resDest.clear(); + decodeKeyServersValue(idTag, v, resSrc, resDest); + ASSERT(std::equal(src.begin(), src.end(), resSrc.begin())); + ASSERT(std::equal(dest.begin(), dest.end(), resDest.begin())); + + resSrc.clear(); + resDest.clear(); + decodeKeyServersValue(tag_uid, v, resSrc, resDest); + ASSERT(std::equal(src.begin(), src.end(), resSrc.begin())); + ASSERT(std::equal(dest.begin(), dest.end(), resDest.begin())); + }; + + Value v = keyServersValue(src, dest, srcId, destId); + decodeAndVerify(v, srcId, destId); + + printf("ssi serdes test part.1 complete\n"); + + v = keyServersValue(idTag, src, dest); + decodeAndVerify(v, anonymousShardId, anonymousShardId); + + printf("ssi serdes test part.2 complete\n"); + + dest.clear(); + destId = UID(); + + v = keyServersValue(src, dest, srcId, destId); + decodeAndVerify(v, srcId, destId); + + printf("ssi serdes test part.3 complete\n"); + + v = keyServersValue(idTag, src, dest); + decodeAndVerify(v, anonymousShardId, UID()); + + printf("ssi serdes test complete\n"); + + return Void(); +} \ No newline at end of file diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h index fa1ef9fa7c..6e60b56cbe 100644 --- a/fdbclient/include/fdbclient/ClientKnobs.h +++ b/fdbclient/include/fdbclient/ClientKnobs.h @@ -110,6 +110,7 @@ public: int RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT; bool QUARANTINE_TSS_ON_MISMATCH; double CHANGE_FEED_EMPTY_BATCH_TIME; + bool SHARD_ENCODE_LOCATION_METADATA; // KeyRangeMap int KRM_GET_RANGE_LIMIT; diff --git a/fdbclient/include/fdbclient/NativeAPI.actor.h b/fdbclient/include/fdbclient/NativeAPI.actor.h index 009c22d7cd..3abab222bb 100644 --- a/fdbclient/include/fdbclient/NativeAPI.actor.h +++ b/fdbclient/include/fdbclient/NativeAPI.actor.h @@ -537,7 +537,8 @@ inline uint64_t getWriteOperationCost(uint64_t bytes) { // Create a transaction to set the value of system key \xff/conf/perpetual_storage_wiggle. If enable == true, the value // will be 1. Otherwise, the value will be 0. -ACTOR Future setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware = LockAware::False); +// Returns the FDB version at which the transaction was committed. +ACTOR Future setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware = LockAware::False); ACTOR Future>> readStorageWiggleValues(Database cx, bool primary, diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index c6c926ab7a..7d8ec81444 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -520,6 +520,8 @@ public: int MOVE_KEYS_KRM_LIMIT_BYTES; // This must be sufficiently larger than CLIENT_KNOBS->KEY_SIZE_LIMIT // (fdbclient/Knobs.h) to ensure that at least two entries will be returned from an // attempt to read a key range map + int MOVE_SHARD_KRM_ROW_LIMIT; + int MOVE_SHARD_KRM_BYTE_LIMIT; int MAX_SKIP_TAGS; double MAX_ADDED_SOURCES_MULTIPLIER; diff --git a/fdbclient/include/fdbclient/StorageCheckpoint.h b/fdbclient/include/fdbclient/StorageCheckpoint.h index d7890eb6ce..8de9380147 100644 --- a/fdbclient/include/fdbclient/StorageCheckpoint.h +++ b/fdbclient/include/fdbclient/StorageCheckpoint.h @@ -86,4 +86,45 @@ struct CheckpointMetaData { } }; -#endif +// A DataMoveMetaData object corresponds to a single data move. +struct DataMoveMetaData { + enum Phase { + InvalidPhase = 0, + Prepare = 1, // System keyspace is being modified. + Running = 2, // System keyspace has been modified, data move in action. + Completing = 3, // Data transfer has finished, finalizing system keyspace. + Deleting = 4, // Data move is cancelled. + }; + + constexpr static FileIdentifier file_identifier = 13804362; + UID id; // A unique id for this data move. + Version version; + KeyRange range; + int priority; + std::set src; + std::set dest; + int16_t phase; // DataMoveMetaData::Phase. + + DataMoveMetaData() = default; + DataMoveMetaData(UID id, Version version, KeyRange range) + : id(id), version(version), range(std::move(range)), priority(0) {} + DataMoveMetaData(UID id, KeyRange range) : id(id), version(invalidVersion), range(std::move(range)), priority(0) {} + + Phase getPhase() const { return static_cast(phase); } + + void setPhase(Phase phase) { this->phase = static_cast(phase); } + + std::string toString() const { + std::string res = "DataMoveMetaData: [ID]: " + id.shortString() + " [Range]: " + range.toString() + + " [Phase]: " + std::to_string(static_cast(phase)) + + " [Source Servers]: " + describe(src) + " [Destination Servers]: " + describe(dest); + return res; + } + + template + void serialize(Ar& ar) { + serializer(ar, id, version, range, phase, src, dest); + } +}; + +#endif \ No newline at end of file diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 53b652ee9d..088d137d5c 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -33,6 +33,9 @@ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wunused-variable" +FDB_DECLARE_BOOLEAN_PARAM(AssignEmptyRange); +FDB_DECLARE_BOOLEAN_PARAM(UnassignShard); + struct RestoreLoaderInterface; struct RestoreApplierInterface; struct RestoreMasterInterface; @@ -49,14 +52,27 @@ extern const KeyRef afterAllKeys; // An internal mapping of where shards are located in the database. [[begin]] is the start of the shard range // and the result is a list of serverIDs or Tags where these shards are located. These values can be changed // as data movement occurs. +// With ShardEncodeLocationMetaData, the encoding format is: +// "\xff/keyServers/[[begin]]" := "[[std::vector, std::vector], srcID, destID]", where srcID +// and destID are the source and destination `shard id`, respectively. extern const KeyRangeRef keyServersKeys, keyServersKeyServersKeys; extern const KeyRef keyServersPrefix, keyServersEnd, keyServersKeyServersKey; + +// Used during the transition to the new location metadata format with shard IDs. +// If `SHARD_ENCODE_LOCATION_METADATA` is enabled, any shard that doesn't have a shard ID will be assigned this +// temporary ID, until a permanent ID is assigned to it. +extern const UID anonymousShardId; +extern const uint64_t assignedEmptyShardId; const Key keyServersKey(const KeyRef& k); const KeyRef keyServersKey(const KeyRef& k, Arena& arena); const Value keyServersValue(RangeResult result, const std::vector& src, const std::vector& dest = std::vector()); const Value keyServersValue(const std::vector& srcTag, const std::vector& destTag = std::vector()); +const Value keyServersValue(const std::vector& src, + const std::vector& dest, + const UID& srcID, + const UID& destID); // `result` must be the full result of getting serverTagKeys void decodeKeyServersValue(RangeResult result, const ValueRef& value, @@ -67,6 +83,13 @@ void decodeKeyServersValue(std::map const& tag_uid, const ValueRef& value, std::vector& src, std::vector& dest); +void decodeKeyServersValue(RangeResult result, + const ValueRef& value, + std::vector& src, + std::vector& dest, + UID& srcID, + UID& destID, + bool missingIsError = true); extern const KeyRef clusterIdKey; @@ -77,6 +100,13 @@ const Value checkpointValue(const CheckpointMetaData& checkpoint); UID decodeCheckpointKey(const KeyRef& key); CheckpointMetaData decodeCheckpointValue(const ValueRef& value); +// "\xff/dataMoves/[[UID]] := [[DataMoveMetaData]]" +extern const KeyRangeRef dataMoveKeys; +const Key dataMoveKeyFor(UID dataMoveId); +const Value dataMoveValue(const DataMoveMetaData& dataMove); +UID decodeDataMoveKey(const KeyRef& key); +DataMoveMetaData decodeDataMoveValue(const ValueRef& value); + // "\xff/storageCacheServer/[[UID]] := StorageServerInterface" // This will be added by the cache server on initialization and removed by DD // TODO[mpilman]: We will need a way to map uint16_t ids to UIDs in a future @@ -102,11 +132,16 @@ void decodeStorageCacheValue(const ValueRef& value, std::vector& serve extern const KeyRangeRef serverKeysRange; extern const KeyRef serverKeysPrefix; extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse; +const UID newShardId(const uint64_t physicalShardId, + AssignEmptyRange assignEmptyRange, + UnassignShard unassignShard = UnassignShard::False); const Key serverKeysKey(UID serverID, const KeyRef& keys); const Key serverKeysPrefixFor(UID serverID); UID serverKeysDecodeServer(const KeyRef& key); std::pair serverKeysDecodeServerBegin(const KeyRef& key); bool serverHasKey(ValueRef storedValue); +const Value serverKeysValue(const UID& id); +void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id); extern const KeyRangeRef conflictingKeysRange; extern const ValueRef conflictingKeysTrue, conflictingKeysFalse; diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index babec0e9e3..6159a878a1 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -46,6 +46,7 @@ FDB_DEFINE_BOOLEAN_PARAM(PreferLowerDiskUtil); FDB_DEFINE_BOOLEAN_PARAM(TeamMustHaveShards); FDB_DEFINE_BOOLEAN_PARAM(ForReadBalance); FDB_DEFINE_BOOLEAN_PARAM(PreferLowerReadUtil); +FDB_DEFINE_BOOLEAN_PARAM(FindTeamByServers); class DDTeamCollectionImpl { ACTOR static Future checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self) { @@ -154,6 +155,19 @@ public: return Void(); } + // Find the team with the exact storage servers as req.src. + static void getTeamByServers(DDTeamCollection* self, GetTeamRequest req) { + const std::string servers = TCTeamInfo::serversToString(req.src); + Optional> res; + for (const auto& team : self->teams) { + if (team->getServerIDsStr() == servers) { + res = team; + break; + } + } + req.reply.send(std::make_pair(res, false)); + } + // SOMEDAY: Make bestTeam better about deciding to leave a shard where it is (e.g. in PRIORITY_TEAM_HEALTHY case) // use keys, src, dest, metrics, priority, system load, etc.. to decide... ACTOR static Future getTeam(DDTeamCollection* self, GetTeamRequest req) { @@ -721,19 +735,19 @@ public: bool recheck = !healthy && (lastReady != self->initialFailureReactionDelay.isReady() || (lastZeroHealthy && !self->zeroHealthyTeams->get()) || containsFailed); - //TraceEvent("TeamHealthChangeDetected", self->distributorId) - // .detail("Team", team->getDesc()) - // .detail("ServersLeft", serversLeft) - // .detail("LastServersLeft", lastServersLeft) - // .detail("AnyUndesired", anyUndesired) - // .detail("LastAnyUndesired", lastAnyUndesired) - // .detail("AnyWrongConfiguration", anyWrongConfiguration) - // .detail("LastWrongConfiguration", lastWrongConfiguration) - // .detail("ContainsWigglingServer", anyWigglingServer) - // .detail("Recheck", recheck) - // .detail("BadTeam", badTeam) - // .detail("LastZeroHealthy", lastZeroHealthy) - // .detail("ZeroHealthyTeam", self->zeroHealthyTeams->get()); + TraceEvent(SevVerbose, "TeamHealthChangeDetected", self->distributorId) + .detail("Team", team->getDesc()) + .detail("ServersLeft", serversLeft) + .detail("LastServersLeft", lastServersLeft) + .detail("AnyUndesired", anyUndesired) + .detail("LastAnyUndesired", lastAnyUndesired) + .detail("AnyWrongConfiguration", anyWrongConfiguration) + .detail("LastWrongConfiguration", lastWrongConfiguration) + .detail("ContainsWigglingServer", anyWigglingServer) + .detail("Recheck", recheck) + .detail("BadTeam", badTeam) + .detail("LastZeroHealthy", lastZeroHealthy) + .detail("ZeroHealthyTeam", self->zeroHealthyTeams->get()); lastReady = self->initialFailureReactionDelay.isReady(); lastZeroHealthy = self->zeroHealthyTeams->get(); @@ -867,6 +881,11 @@ public: std::vector shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team(team->getServerIDs(), self->primary)); + TraceEvent(SevVerbose, "ServerTeamRelocatingShards", self->distributorId) + .detail("Info", team->getDesc()) + .detail("TeamID", team->getTeamID()) + .detail("Shards", shards.size()); + for (int i = 0; i < shards.size(); i++) { // Make it high priority to move keys off failed server or else RelocateShards may never be // addressed @@ -960,6 +979,7 @@ public: } catch (Error& e) { if (logTeamEvents) { TraceEvent("TeamTrackerStopping", self->distributorId) + .errorUnsuppressed(e) .detail("ServerPrimary", self->primary) .detail("Team", team->getDesc()) .detail("Priority", team->getPriority()); @@ -1608,10 +1628,10 @@ public: // could cause us to not store the mutations sent to the short lived storage server. if (ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) { bool canRemove = wait(canRemoveStorageServer(tr, serverID)); - // TraceEvent("WaitForAllDataRemoved") - // .detail("Server", serverID) - // .detail("CanRemove", canRemove) - // .detail("Shards", teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID)); + TraceEvent(SevVerbose, "WaitForAllDataRemoved") + .detail("Server", serverID) + .detail("CanRemove", canRemove) + .detail("Shards", teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID)); ASSERT_GE(teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID), 0); if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) == 0) { return Void(); @@ -2702,7 +2722,11 @@ public: ACTOR static Future serverGetTeamRequests(DDTeamCollection* self, TeamCollectionInterface tci) { loop { GetTeamRequest req = waitNext(tci.getTeam.getFuture()); - self->addActor.send(self->getTeam(req)); + if (req.findTeamByServers) { + getTeamByServers(self, req); + } else { + self->addActor.send(self->getTeam(req)); + } } } @@ -3134,12 +3158,19 @@ public: .detail("Primary", self->isPrimary()); for (i = 0; i < teams.size(); i++) { const auto& team = teams[i]; + TraceEvent("ServerTeamInfo", self->getDistributorId()) .detail("TeamIndex", i) .detail("Healthy", team->isHealthy()) .detail("TeamSize", team->size()) .detail("MemberIDs", team->getServerIDsStr()) - .detail("Primary", self->isPrimary()); + .detail("Primary", self->isPrimary()) + .detail("TeamID", team->getTeamID()) + .detail( + "Shards", + self->shardsAffectedByTeamFailure + ->getShardsFor(ShardsAffectedByTeamFailure::Team(team->getServerIDs(), self->primary)) + .size()); if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) { wait(yield()); } diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 429b3c4b64..c4e099d6ee 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -52,6 +52,57 @@ #include "flow/actorcompiler.h" // This must be the last #include. +void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int priority) { + if (!valid) { + if (shard.hasDest && shard.destId != anonymousShardId) { + TraceEvent(SevError, "DataMoveValidationError") + .detail("Range", range) + .detail("Reason", "DataMoveMissing") + .detail("ShardPrimaryDest", describe(shard.primaryDest)) + .detail("ShardRemoteDest", describe(shard.remoteDest)); + } + return; + } + + ASSERT(this->meta.range.contains(range)); + + if (!shard.hasDest) { + TraceEvent(SevError, "DataMoveValidationError") + .detail("Range", range) + .detail("Reason", "ShardMissingDest") + .detail("DataMoveMetaData", this->meta.toString()) + .detail("DataMovePrimaryDest", describe(this->primaryDest)) + .detail("DataMoveRemoteDest", describe(this->remoteDest)); + cancelled = true; + return; + } + + if (shard.destId != this->meta.id) { + TraceEvent(SevError, "DataMoveValidationError") + .detail("Range", range) + .detail("Reason", "DataMoveIDMissMatch") + .detail("DataMoveMetaData", this->meta.toString()) + .detail("ShardMoveID", shard.destId); + cancelled = true; + return; + } + + if (!std::includes( + this->primaryDest.begin(), this->primaryDest.end(), shard.primaryDest.begin(), shard.primaryDest.end()) || + !std::includes( + this->remoteDest.begin(), this->remoteDest.end(), shard.remoteDest.begin(), shard.remoteDest.end())) { + TraceEvent(SevError, "DataMoveValidationError") + .detail("Range", range) + .detail("Reason", "DataMoveDestMissMatch") + .detail("DataMoveMetaData", this->meta.toString()) + .detail("DataMovePrimaryDest", describe(this->primaryDest)) + .detail("DataMoveRemoteDest", describe(this->remoteDest)) + .detail("ShardPrimaryDest", describe(shard.primaryDest)) + .detail("ShardRemoteDest", describe(shard.remoteDest)); + cancelled = true; + } +} + // Read keyservers, return unique set of teams ACTOR Future> getInitialDataDistribution(Database cx, UID distributorId, @@ -68,14 +119,18 @@ ACTOR Future> getInitialDataDistribution(Data state std::map> server_dc; state std::map, std::pair, std::vector>> team_cache; state std::vector> tss_servers; + state int numDataMoves = 0; // Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure // causing entries to be duplicated loop { + numDataMoves = 0; server_dc.clear(); + result->allServers.clear(); + tss_servers.clear(); + team_cache.clear(); succeeded = false; try { - // Read healthyZone value which is later used to determine on/off of failure triggered DD tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE); @@ -113,8 +168,6 @@ ACTOR Future> getInitialDataDistribution(Data for (int i = 0; i < workers.get().size(); i++) id_data[workers.get()[i].locality.processId()] = workers.get()[i]; - succeeded = true; - for (int i = 0; i < serverList.get().size(); i++) { auto ssi = decodeServerListValue(serverList.get()[i].value); if (!ssi.isTss()) { @@ -125,6 +178,42 @@ ACTOR Future> getInitialDataDistribution(Data } } + RangeResult dms = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!dms.more && dms.size() < CLIENT_KNOBS->TOO_MANY); + for (int i = 0; i < dms.size(); ++i) { + auto dataMove = std::make_shared(decodeDataMoveValue(dms[i].value), true); + const DataMoveMetaData& meta = dataMove->meta; + for (const UID& id : meta.src) { + auto& dc = server_dc[id]; + if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) { + dataMove->remoteSrc.push_back(id); + } else { + dataMove->primarySrc.push_back(id); + } + } + for (const UID& id : meta.dest) { + auto& dc = server_dc[id]; + if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) { + dataMove->remoteDest.push_back(id); + } else { + dataMove->primaryDest.push_back(id); + } + } + std::sort(dataMove->primarySrc.begin(), dataMove->primarySrc.end()); + std::sort(dataMove->remoteSrc.begin(), dataMove->remoteSrc.end()); + std::sort(dataMove->primaryDest.begin(), dataMove->primaryDest.end()); + std::sort(dataMove->remoteDest.begin(), dataMove->remoteDest.end()); + + auto ranges = result->dataMoveMap.intersectingRanges(meta.range); + for (auto& r : ranges) { + ASSERT(!r.value()->valid); + } + result->dataMoveMap.insert(meta.range, std::move(dataMove)); + ++numDataMoves; + } + + succeeded = true; + break; } catch (Error& e) { wait(tr.onError(e)); @@ -153,11 +242,12 @@ ACTOR Future> getInitialDataDistribution(Data succeeded = true; std::vector src, dest, last; + UID srcId, destId; // for each range for (int i = 0; i < keyServers.size() - 1; i++) { - DDShardInfo info(keyServers[i].key); - decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); + decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest, srcId, destId); + DDShardInfo info(keyServers[i].key, srcId, destId); if (remoteDcIds.size()) { auto srcIter = team_cache.find(src); if (srcIter == team_cache.end()) { @@ -233,6 +323,14 @@ ACTOR Future> getInitialDataDistribution(Data // a dummy shard at the end with no keys or servers makes life easier for trackInitialShards() result->shards.push_back(DDShardInfo(allKeys.end)); + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && numDataMoves > 0) { + for (int shard = 0; shard < result->shards.size() - 1; ++shard) { + const DDShardInfo& iShard = result->shards[shard]; + KeyRangeRef keys = KeyRangeRef(iShard.key, result->shards[shard + 1].key); + result->dataMoveMap[keys.begin]->validateShard(iShard, keys); + } + } + // add tss to server list AFTER teams are built for (auto& it : tss_servers) { result->allServers.push_back(it); @@ -438,6 +536,8 @@ static std::set const& normalDDQueueErrors() { if (s.empty()) { s.insert(error_code_movekeys_conflict); s.insert(error_code_broken_promise); + s.insert(error_code_data_move_cancelled); + s.insert(error_code_data_move_dest_team_not_found); } return s; } @@ -644,39 +744,79 @@ ACTOR Future dataDistribution(Reference self, state int shard = 0; for (; shard < initData->shards.size() - 1; shard++) { - KeyRangeRef keys = KeyRangeRef(initData->shards[shard].key, initData->shards[shard + 1].key); + const DDShardInfo& iShard = initData->shards[shard]; + KeyRangeRef keys = KeyRangeRef(iShard.key, initData->shards[shard + 1].key); + shardsAffectedByTeamFailure->defineShard(keys); std::vector teams; - teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].primarySrc, true)); + teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.primarySrc, true)); if (configuration.usableRegions > 1) { - teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false)); + teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.remoteSrc, false)); } if (g_network->isSimulated()) { TraceEvent("DDInitShard") .detail("Keys", keys) - .detail("PrimarySrc", describe(initData->shards[shard].primarySrc)) - .detail("RemoteSrc", describe(initData->shards[shard].remoteSrc)) - .detail("PrimaryDest", describe(initData->shards[shard].primaryDest)) - .detail("RemoteDest", describe(initData->shards[shard].remoteDest)); + .detail("PrimarySrc", describe(iShard.primarySrc)) + .detail("RemoteSrc", describe(iShard.remoteSrc)) + .detail("PrimaryDest", describe(iShard.primaryDest)) + .detail("RemoteDest", describe(iShard.remoteDest)) + .detail("SrcID", iShard.srcId) + .detail("DestID", iShard.destId); } shardsAffectedByTeamFailure->moveShard(keys, teams); - if (initData->shards[shard].hasDest) { + if (iShard.hasDest && iShard.destId == anonymousShardId) { // This shard is already in flight. Ideally we should use dest in ShardsAffectedByTeamFailure and // generate a dataDistributionRelocator directly in DataDistributionQueue to track it, but it's // easier to just (with low priority) schedule it for movement. - bool unhealthy = initData->shards[shard].primarySrc.size() != configuration.storageTeamSize; + bool unhealthy = iShard.primarySrc.size() != configuration.storageTeamSize; if (!unhealthy && configuration.usableRegions > 1) { - unhealthy = initData->shards[shard].remoteSrc.size() != configuration.storageTeamSize; + unhealthy = iShard.remoteSrc.size() != configuration.storageTeamSize; } output.send(RelocateShard(keys, unhealthy ? SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY : SERVER_KNOBS->PRIORITY_RECOVER_MOVE, RelocateReason::OTHER)); } + wait(yield(TaskPriority::DataDistribution)); } + state KeyRangeMap>::iterator it = initData->dataMoveMap.ranges().begin(); + for (; it != initData->dataMoveMap.ranges().end(); ++it) { + const DataMoveMetaData& meta = it.value()->meta; + if (it.value()->isCancelled() || (it.value()->valid && !CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) { + RelocateShard rs(meta.range, SERVER_KNOBS->PRIORITY_RECOVER_MOVE, RelocateReason::OTHER); + rs.dataMoveId = meta.id; + rs.cancelled = true; + output.send(rs); + TraceEvent("DDInitScheduledCancelDataMove", self->ddId).detail("DataMove", meta.toString()); + } else if (it.value()->valid) { + TraceEvent(SevDebug, "DDInitFoundDataMove", self->ddId).detail("DataMove", meta.toString()); + ASSERT(meta.range == it.range()); + // TODO: Persist priority in DataMoveMetaData. + RelocateShard rs(meta.range, SERVER_KNOBS->PRIORITY_RECOVER_MOVE, RelocateReason::OTHER); + rs.dataMoveId = meta.id; + rs.dataMove = it.value(); + std::vector teams; + teams.push_back(ShardsAffectedByTeamFailure::Team(rs.dataMove->primaryDest, true)); + if (!rs.dataMove->remoteDest.empty()) { + teams.push_back(ShardsAffectedByTeamFailure::Team(rs.dataMove->remoteDest, false)); + } + + // Since a DataMove could cover more than one keyrange, e.g., during merge, we need to define + // the target shard and restart the shard tracker. + shardsAffectedByTeamFailure->restartShardTracker.send(rs.keys); + shardsAffectedByTeamFailure->defineShard(rs.keys); + + // When restoring a DataMove, the destination team is determined, and hence we need to register + // the data move now, so that team failures can be captured. + shardsAffectedByTeamFailure->moveShard(rs.keys, teams); + output.send(rs); + wait(yield(TaskPriority::DataDistribution)); + } + } + std::vector tcis; Reference> anyZeroHealthyTeams; @@ -859,6 +999,8 @@ static std::set const& normalDataDistributorErrors() { s.insert(error_code_actor_cancelled); s.insert(error_code_please_reboot); s.insert(error_code_movekeys_conflict); + s.insert(error_code_data_move_cancelled); + s.insert(error_code_data_move_dest_team_not_found); } return s; } diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index da783eddc8..299dff4d92 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -64,6 +64,7 @@ struct RelocateData { double startTime; UID randomId; + UID dataMoveId; int workFactor; std::vector src; std::vector completeSources; @@ -71,18 +72,24 @@ struct RelocateData { bool wantsNewServers; bool cancellable; TraceInterval interval; + std::shared_ptr dataMove; RelocateData() : priority(-1), boundaryPriority(-1), healthPriority(-1), reason(RelocateReason::INVALID), startTime(-1), - workFactor(0), wantsNewServers(false), cancellable(false), interval("QueuedRelocation") {} + dataMoveId(anonymousShardId), workFactor(0), wantsNewServers(false), cancellable(false), + interval("QueuedRelocation") {} explicit RelocateData(RelocateShard const& rs) : keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1), healthPriority(isHealthPriority(rs.priority) ? rs.priority : -1), reason(rs.reason), startTime(now()), - randomId(deterministicRandom()->randomUniqueID()), workFactor(0), + randomId(deterministicRandom()->randomUniqueID()), dataMoveId(rs.dataMoveId), workFactor(0), wantsNewServers(isMountainChopperPriority(rs.priority) || isValleyFillerPriority(rs.priority) || rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD || rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT), - cancellable(true), interval("QueuedRelocation") {} + cancellable(true), interval("QueuedRelocation"), dataMove(rs.dataMove) { + if (dataMove != nullptr) { + this->src.insert(this->src.end(), dataMove->meta.src.begin(), dataMove->meta.src.end()); + } + } static bool isHealthPriority(int priority) { return priority == SERVER_KNOBS->PRIORITY_POPULATE_REGION || @@ -97,6 +104,8 @@ struct RelocateData { return priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD || priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD; } + bool isRestore() const { return this->dataMove != nullptr; } + bool operator>(const RelocateData& rhs) const { return priority != rhs.priority ? priority > rhs.priority @@ -436,13 +445,26 @@ void complete(RelocateData const& relocation, std::map& busymap, completeDest(relocation, destBusymap); } +// Cancells in-flight data moves intersecting with range. +ACTOR Future cancelDataMove(struct DDQueueData* self, KeyRange range, const DDEnabledState* ddEnabledState); + ACTOR Future dataDistributionRelocator(struct DDQueueData* self, RelocateData rd, + Future prevCleanup, const DDEnabledState* ddEnabledState); struct DDQueueData { - ActorCollectionNoErrors noErrorActors; // has to be the last one to be destroyed because other Actors may use it. + struct DDDataMove { + DDDataMove() = default; + explicit DDDataMove(UID id) : id(id) {} + bool isValid() const { return id.isValid(); } + + UID id; + Future cancel; + }; + + ActorCollectionNoErrors noErrorActors; // has to be the last one to be destroyed because other Actors may use it. UID distributorId; MoveKeysLock lock; Database cx; @@ -454,6 +476,7 @@ struct DDQueueData { FlowLock startMoveKeysParallelismLock; FlowLock finishMoveKeysParallelismLock; + FlowLock cleanUpDataMoveParallelismLock; Reference fetchSourceLock; int activeRelocations; @@ -478,6 +501,7 @@ struct DDQueueData { KeyRangeMap inFlight; // Track all actors that relocates specified keys to a good place; Key: keyRange; Value: actor KeyRangeActorMap inFlightActors; + KeyRangeMap dataMoves; Promise error; PromiseStream dataTransferComplete; @@ -556,6 +580,7 @@ struct DDQueueData { shardsAffectedByTeamFailure(sABTF), getAverageShardBytes(getAverageShardBytes), startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM), finishMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM), + cleanUpDataMoveParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM), fetchSourceLock(new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM)), activeRelocations(0), queuedRelocations(0), bytesWritten(0), teamSize(teamSize), singleRegionTeamSize(singleRegionTeamSize), output(output), input(input), getShardMetrics(getShardMetrics), getTopKMetrics(getTopKMetrics), lastInterval(0), @@ -784,6 +809,10 @@ struct DDQueueData { /*TraceEvent(rrs.interval.end(), mi.id()).detail("Result","Cancelled") .detail("WasFetching", foundActiveFetching).detail("Contained", rd.keys.contains( rrs.keys ));*/ queuedRelocations--; + TraceEvent(SevVerbose, "QueuedRelocationsChanged") + .detail("DataMoveID", rrs.dataMoveId) + .detail("RandomID", rrs.randomId) + .detail("Total", queuedRelocations); finishRelocation(rrs.priority, rrs.healthPriority); } } @@ -812,6 +841,10 @@ struct DDQueueData { .detail("KeyBegin", rrs.keys.begin).detail("KeyEnd", rrs.keys.end) .detail("Priority", rrs.priority).detail("WantsNewServers", rrs.wantsNewServers);*/ queuedRelocations++; + TraceEvent(SevVerbose, "QueuedRelocationsChanged") + .detail("DataMoveID", rrs.dataMoveId) + .detail("RandomID", rrs.randomId) + .detail("Total", queuedRelocations); startRelocation(rrs.priority, rrs.healthPriority); fetchingSourcesQueue.insert(rrs); @@ -834,6 +867,10 @@ struct DDQueueData { .detail("Priority", newData.priority).detail("WantsNewServers", newData.wantsNewServers);*/ queuedRelocations++; + TraceEvent(SevVerbose, "QueuedRelocationsChanged") + .detail("DataMoveID", newData.dataMoveId) + .detail("RandomID", newData.randomId) + .detail("Total", queuedRelocations); startRelocation(newData.priority, newData.healthPriority); foundActiveRelocation = true; } @@ -945,6 +982,7 @@ struct DDQueueData { } if (overlappingInFlight) { + ASSERT(!rd.isRestore()); // logRelocation( rd, "SkippingOverlappingInFlight" ); continue; } @@ -963,7 +1001,7 @@ struct DDQueueData { // SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the // queue // FIXME: we need spare capacity even when we're just going to be cancelling work via TEAM_HEALTHY - if (!canLaunchSrc(rd, teamSize, singleRegionTeamSize, busymap, cancellableRelocations)) { + if (!rd.isRestore() && !canLaunchSrc(rd, teamSize, singleRegionTeamSize, busymap, cancellableRelocations)) { // logRelocation( rd, "SkippingQueuedRelocation" ); continue; } @@ -974,14 +1012,23 @@ struct DDQueueData { // logRelocation( rd, "LaunchingRelocation" ); //TraceEvent(rd.interval.end(), distributorId).detail("Result","Success"); - queuedRelocations--; - finishRelocation(rd.priority, rd.healthPriority); + if (!rd.isRestore()) { + queuedRelocations--; + TraceEvent(SevVerbose, "QueuedRelocationsChanged") + .detail("DataMoveID", rd.dataMoveId) + .detail("RandomID", rd.randomId) + .detail("Total", queuedRelocations); + finishRelocation(rd.priority, rd.healthPriority); - // now we are launching: remove this entry from the queue of all the src servers - for (int i = 0; i < rd.src.size(); i++) { - ASSERT(queue[rd.src[i]].erase(rd)); + // now we are launching: remove this entry from the queue of all the src servers + for (int i = 0; i < rd.src.size(); i++) { + ASSERT(queue[rd.src[i]].erase(rd)); + } } + Future fCleanup = + CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA ? cancelDataMove(this, rd.keys, ddEnabledState) : Void(); + // If there is a job in flight that wants data relocation which we are about to cancel/modify, // make sure that we keep the relocation intent for the job that we launch auto f = inFlight.intersectingRanges(rd.keys); @@ -1000,12 +1047,29 @@ struct DDQueueData { for (int r = 0; r < ranges.size(); r++) { RelocateData& rrs = inFlight.rangeContaining(ranges[r].begin)->value(); rrs.keys = ranges[r]; + if (rd.keys == ranges[r] && rd.isRestore()) { + ASSERT(rd.dataMove != nullptr); + ASSERT(CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA); + rrs.dataMoveId = rd.dataMove->meta.id; + } else { + ASSERT_WE_THINK(!rd.isRestore()); // Restored data move should not overlap. + // TODO(psm): The shard id is determined by DD. + rrs.dataMove.reset(); + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + rrs.dataMoveId = deterministicRandom()->randomUniqueID(); + } else { + rrs.dataMoveId = anonymousShardId; + } + } launch(rrs, busymap, singleRegionTeamSize); activeRelocations++; + TraceEvent(SevVerbose, "InFlightRelocationChange") + .detail("Launch", rrs.dataMoveId) + .detail("Total", activeRelocations); startRelocation(rrs.priority, rrs.healthPriority); // Start the actor that relocates data in the rrs.keys - inFlightActors.insert(rrs.keys, dataDistributionRelocator(this, rrs, ddEnabledState)); + inFlightActors.insert(rrs.keys, dataDistributionRelocator(this, rrs, fCleanup, ddEnabledState)); } // logRelocation( rd, "LaunchedRelocation" ); @@ -1048,8 +1112,58 @@ struct DDQueueData { for (auto& id : ids) lastAsSource[id] = t; } + + // Schedules cancellation of a data move. + void enqueueCancelledDataMove(UID dataMoveId, KeyRange range, const DDEnabledState* ddEnabledState) { + std::vector> cleanup; + auto f = this->dataMoves.intersectingRanges(range); + for (auto it = f.begin(); it != f.end(); ++it) { + if (it->value().isValid()) { + TraceEvent(SevError, "DDEnqueueCancelledDataMoveConflict", this->distributorId) + .detail("DataMoveID", dataMoveId) + .detail("CancelledRange", range) + .detail("ConflictingDataMoveID", it->value().id) + .detail("ConflictingRange", KeyRangeRef(it->range().begin, it->range().end)); + return; + } + } + + DDQueueData::DDDataMove dataMove(dataMoveId); + dataMove.cancel = cleanUpDataMove( + this->cx, dataMoveId, this->lock, &this->cleanUpDataMoveParallelismLock, range, ddEnabledState); + this->dataMoves.insert(range, dataMove); + TraceEvent(SevInfo, "DDEnqueuedCancelledDataMove", this->distributorId) + .detail("DataMoveID", dataMoveId) + .detail("Range", range); + } }; +ACTOR Future cancelDataMove(struct DDQueueData* self, KeyRange range, const DDEnabledState* ddEnabledState) { + std::vector> cleanup; + auto f = self->dataMoves.intersectingRanges(range); + for (auto it = f.begin(); it != f.end(); ++it) { + if (!it->value().isValid()) { + continue; + } + KeyRange keys = KeyRangeRef(it->range().begin, it->range().end); + TraceEvent(SevInfo, "DDQueueCancelDataMove", self->distributorId) + .detail("DataMoveID", it->value().id) + .detail("DataMoveRange", keys) + .detail("Range", range); + if (!it->value().cancel.isValid()) { + it->value().cancel = cleanUpDataMove( + self->cx, it->value().id, self->lock, &self->cleanUpDataMoveParallelismLock, keys, ddEnabledState); + } + cleanup.push_back(it->value().cancel); + } + wait(waitForAll(cleanup)); + auto ranges = self->dataMoves.getAffectedRangesAfterInsertion(range); + if (!ranges.empty()) { + self->dataMoves.insert(KeyRangeRef(ranges.front().begin, ranges.back().end), DDQueueData::DDDataMove()); + } + return Void(); +} + static std::string destServersString(std::vector, bool>> const& bestTeams) { std::stringstream ss; @@ -1064,7 +1178,10 @@ static std::string destServersString(std::vector dataDistributionRelocator(DDQueueData* self, RelocateData rd, const DDEnabledState* ddEnabledState) { +ACTOR Future dataDistributionRelocator(DDQueueData* self, + RelocateData rd, + Future prevCleanup, + const DDEnabledState* ddEnabledState) { state Promise errorOut(self->error); state TraceInterval relocateShardInterval("RelocateShard"); state PromiseStream dataTransferComplete(self->dataTransferComplete); @@ -1101,6 +1218,29 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, self->suppressIntervals = 0; } + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + auto inFlightRange = self->inFlight.rangeContaining(rd.keys.begin); + ASSERT(inFlightRange.range() == rd.keys); + ASSERT(inFlightRange.value().randomId == rd.randomId); + ASSERT(inFlightRange.value().dataMoveId == rd.dataMoveId); + inFlightRange.value().cancellable = false; + + wait(prevCleanup); + + auto f = self->dataMoves.intersectingRanges(rd.keys); + for (auto it = f.begin(); it != f.end(); ++it) { + KeyRangeRef kr(it->range().begin, it->range().end); + const UID mId = it->value().id; + if (mId.isValid() && mId != rd.dataMoveId) { + TraceEvent("DDRelocatorConflictingDataMove", distributorId) + .detail("CurrentDataMoveID", rd.dataMoveId) + .detail("DataMoveID", mId) + .detail("Range", kr); + } + } + self->dataMoves.insert(rd.keys, DDQueueData::DDDataMove(rd.dataMoveId)); + } + state StorageMetrics metrics = wait(brokenPromiseToNever(self->getShardMetrics.getReply(GetMetricsRequest(rd.keys)))); @@ -1112,6 +1252,7 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, loop { state int tciIndex = 0; state bool foundTeams = true; + state bool bestTeamReady = false; anyHealthy = false; allHealthy = true; anyWithSource = false; @@ -1119,58 +1260,83 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, bestTeams.clear(); // Get team from teamCollections in different DCs and find the best one while (tciIndex < self->teamCollections.size()) { - double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY; - if (rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || - rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) - inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY; - if (rd.healthPriority == SERVER_KNOBS->PRIORITY_POPULATE_REGION || - rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || - rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) - inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT; - - auto req = GetTeamRequest(WantNewServers(rd.wantsNewServers), - WantTrueBest(isValleyFillerPriority(rd.priority)), - PreferLowerDiskUtil::True, - TeamMustHaveShards::False, - ForReadBalance(rd.reason == RelocateReason::REBALANCE_READ), - PreferLowerReadUtil::True, - inflightPenalty); - - req.src = rd.src; - req.completeSources = rd.completeSources; - - // bestTeam.second = false if the bestTeam in the teamCollection (in the DC) does not have any - // server that hosts the relocateData. This is possible, for example, in a fearless configuration - // when the remote DC is just brought up. - Future>, bool>> fbestTeam = - brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)); - state bool bestTeamReady = fbestTeam.isReady(); - std::pair>, bool> bestTeam = wait(fbestTeam); - if (tciIndex > 0 && !bestTeamReady) { - // self->shardsAffectedByTeamFailure->moveShard must be called without any waits after getting - // the destination team or we could miss failure notifications for the storage servers in the - // destination team - TraceEvent("BestTeamNotReady"); - foundTeams = false; - break; - } - // If a DC has no healthy team, we stop checking the other DCs until - // the unhealthy DC is healthy again or is excluded. - if (!bestTeam.first.present()) { - foundTeams = false; - break; - } - if (!bestTeam.first.get()->isHealthy()) { - allHealthy = false; - } else { + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && rd.isRestore()) { + auto req = GetTeamRequest(tciIndex == 0 ? rd.dataMove->primaryDest : rd.dataMove->remoteDest); + Future>, bool>> fbestTeam = + brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)); + bestTeamReady = fbestTeam.isReady(); + std::pair>, bool> bestTeam = wait(fbestTeam); + if (tciIndex > 0 && !bestTeamReady) { + // self->shardsAffectedByTeamFailure->moveShard must be called without any waits after + // getting the destination team or we could miss failure notifications for the storage + // servers in the destination team + TraceEvent("BestTeamNotReady") + .detail("TeamCollectionIndex", tciIndex) + .detail("RestoreDataMoveForDest", + describe(tciIndex == 0 ? rd.dataMove->primaryDest : rd.dataMove->remoteDest)); + foundTeams = false; + break; + } + if (!bestTeam.first.present() || !bestTeam.first.get()->isHealthy()) { + foundTeams = false; + break; + } anyHealthy = true; - } + bestTeams.emplace_back(bestTeam.first.get(), bestTeam.second); + } else { + double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY; + if (rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || + rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) + inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY; + if (rd.healthPriority == SERVER_KNOBS->PRIORITY_POPULATE_REGION || + rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || + rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) + inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT; - if (bestTeam.second) { - anyWithSource = true; - } + auto req = GetTeamRequest(WantNewServers(rd.wantsNewServers), + WantTrueBest(isValleyFillerPriority(rd.priority)), + PreferLowerDiskUtil::True, + TeamMustHaveShards::False, + ForReadBalance(rd.reason == RelocateReason::REBALANCE_READ), + PreferLowerReadUtil::True, + inflightPenalty); - bestTeams.emplace_back(bestTeam.first.get(), bestTeam.second); + req.src = rd.src; + req.completeSources = rd.completeSources; + + // bestTeam.second = false if the bestTeam in the teamCollection (in the DC) does not have any + // server that hosts the relocateData. This is possible, for example, in a fearless + // configuration when the remote DC is just brought up. + Future>, bool>> fbestTeam = + brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)); + bestTeamReady = fbestTeam.isReady(); + std::pair>, bool> bestTeam = wait(fbestTeam); + if (tciIndex > 0 && !bestTeamReady) { + // self->shardsAffectedByTeamFailure->moveShard must be called without any waits after + // getting the destination team or we could miss failure notifications for the storage + // servers in the destination team + TraceEvent("BestTeamNotReady"); + foundTeams = false; + break; + } + // If a DC has no healthy team, we stop checking the other DCs until + // the unhealthy DC is healthy again or is excluded. + if (!bestTeam.first.present()) { + foundTeams = false; + break; + } + if (!bestTeam.first.get()->isHealthy()) { + allHealthy = false; + } else { + anyHealthy = true; + } + + if (bestTeam.second) { + anyWithSource = true; + } + + bestTeams.emplace_back(bestTeam.first.get(), bestTeam.second); + } tciIndex++; } // once we've found healthy candidate teams, make sure they're not overloaded with outstanding moves @@ -1204,6 +1370,9 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, .detail("TeamCollectionId", tciIndex) .detail("AnyDestOverloaded", anyDestOverloaded) .detail("NumOfTeamCollections", self->teamCollections.size()); + if (rd.isRestore() && stuckCount > 50) { + throw data_move_dest_team_not_found(); + } wait(delay(SERVER_KNOBS->BEST_TEAM_STUCK_DELAY, TaskPriority::DataDistributionLaunch)); } @@ -1225,6 +1394,7 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, auto& serverIds = bestTeams[i].first->getServerIDs(); destinationTeams.push_back(ShardsAffectedByTeamFailure::Team(serverIds, i == 0)); + // TODO(psm): Make DataMoveMetaData aware of the two-step data move optimization. if (allHealthy && anyWithSource && !bestTeams[i].second) { // When all servers in bestTeams[i] do not hold the shard (!bestTeams[i].second), it indicates // the bestTeams[i] is in a new DC where data has not been replicated to. @@ -1262,7 +1432,9 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, .detail("DestTeamSize", totalIds); } - self->shardsAffectedByTeamFailure->moveShard(rd.keys, destinationTeams); + if (!rd.isRestore()) { + self->shardsAffectedByTeamFailure->moveShard(rd.keys, destinationTeams); + } // FIXME: do not add data in flight to servers that were already in the src. healthyDestinations.addDataInFlightToTeam(+metrics.bytes); @@ -1296,6 +1468,7 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, state Promise dataMovementComplete; // Move keys from source to destination by changing the serverKeyList and keyServerList system keys state Future doMoveKeys = moveKeys(self->cx, + rd.dataMoveId, rd.keys, destIds, healthyIds, @@ -1305,7 +1478,8 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, &self->finishMoveKeysParallelismLock, self->teamCollections.size() > 1, relocateShardInterval.pairID, - ddEnabledState); + ddEnabledState, + CancelConflictingDataMoves::False); state Future pollHealth = signalledTransferComplete ? Never() : delay(SERVER_KNOBS->HEALTH_POLL_TIME, TaskPriority::DataDistributionLaunch); @@ -1319,6 +1493,7 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, extraIds.clear(); ASSERT(totalIds == destIds.size()); // Sanity check the destIDs before we move keys doMoveKeys = moveKeys(self->cx, + rd.dataMoveId, rd.keys, destIds, healthyIds, @@ -1328,9 +1503,20 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, &self->finishMoveKeysParallelismLock, self->teamCollections.size() > 1, relocateShardInterval.pairID, - ddEnabledState); + ddEnabledState, + CancelConflictingDataMoves::False); } else { self->fetchKeysComplete.insert(rd); + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + auto ranges = self->dataMoves.getAffectedRangesAfterInsertion(rd.keys); + if (ranges.size() == 1 && static_cast(ranges[0]) == rd.keys && + ranges[0].value.id == rd.dataMoveId && !ranges[0].value.cancel.isValid()) { + self->dataMoves.insert(rd.keys, DDQueueData::DDDataMove()); + TraceEvent(SevVerbose, "DequeueDataMoveOnSuccess", self->distributorId) + .detail("DataMoveID", rd.dataMoveId) + .detail("DataMoveRange", rd.keys); + } + } break; } } @@ -1423,12 +1609,13 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, } } } catch (Error& e) { + state Error err = e; TraceEvent(relocateShardInterval.end(), distributorId) - .errorUnsuppressed(e) + .errorUnsuppressed(err) .detail("Duration", now() - startTime); if (now() - startTime > 600) { TraceEvent(SevWarnAlways, "RelocateShardTooLong") - .errorUnsuppressed(e) + .errorUnsuppressed(err) .detail("Duration", now() - startTime) .detail("Dest", describe(destIds)) .detail("Src", describe(rd.src)); @@ -1438,12 +1625,16 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, relocationComplete.send(rd); - if (e.code() != error_code_actor_cancelled) { + if (err.code() == error_code_data_move_dest_team_not_found) { + wait(cancelDataMove(self, rd.keys, ddEnabledState)); + } + + if (err.code() != error_code_actor_cancelled && err.code() != error_code_data_move_cancelled) { if (errorOut.canBeSet()) { - errorOut.sendError(e); + errorOut.sendError(err); } } - throw; + throw err; } } @@ -2036,10 +2227,18 @@ ACTOR Future dataDistributionQueue(Database cx, choose { when(RelocateShard rs = waitNext(self.input)) { - bool wasEmpty = serversToLaunchFrom.empty(); - self.queueRelocation(rs, serversToLaunchFrom); - if (wasEmpty && !serversToLaunchFrom.empty()) - launchQueuedWorkTimeout = delay(0, TaskPriority::DataDistributionLaunch); + if (rs.isRestore()) { + ASSERT(rs.dataMove != nullptr); + ASSERT(rs.dataMoveId.isValid()); + self.launchQueuedWork(RelocateData(rs), ddEnabledState); + } else if (rs.cancelled) { + self.enqueueCancelledDataMove(rs.dataMoveId, rs.keys, ddEnabledState); + } else { + bool wasEmpty = serversToLaunchFrom.empty(); + self.queueRelocation(rs, serversToLaunchFrom); + if (wasEmpty && !serversToLaunchFrom.empty()) + launchQueuedWorkTimeout = delay(0, TaskPriority::DataDistributionLaunch); + } } when(wait(launchQueuedWorkTimeout)) { self.launchQueuedWork(serversToLaunchFrom, ddEnabledState); @@ -2059,6 +2258,10 @@ ACTOR Future dataDistributionQueue(Database cx, } when(RelocateData done = waitNext(self.relocationComplete.getFuture())) { self.activeRelocations--; + TraceEvent(SevVerbose, "InFlightRelocationChange") + .detail("Complete", done.dataMoveId) + .detail("IsRestore", done.isRestore()) + .detail("Total", self.activeRelocations); self.finishRelocation(done.priority, done.healthPriority); self.fetchKeysComplete.erase(done); // self.logRelocation( done, "ShardRelocatorDone" ); @@ -2123,7 +2326,8 @@ ACTOR Future dataDistributionQueue(Database cx, } catch (Error& e) { if (e.code() != error_code_broken_promise && // FIXME: Get rid of these broken_promise errors every time we // are killed by the master dying - e.code() != error_code_movekeys_conflict) + e.code() != error_code_movekeys_conflict && e.code() != error_code_data_move_cancelled && + e.code() != error_code_data_move_dest_team_not_found) TraceEvent(SevError, "DataDistributionQueueError", distributorId).error(e); throw e; } diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index 55b797290d..b4c30fba1f 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -712,19 +712,18 @@ ACTOR Future shardEvaluator(DataDistributionTracker* self, } } - /*TraceEvent("EdgeCaseTraceShardEvaluator", self->distributorId) - // .detail("TrackerId", trackerID) - .detail("BeginKey", keys.begin.printableNonNull()) - .detail("EndKey", keys.end.printableNonNull()) - .detail("ShouldSplit", shouldSplit) - .detail("ShouldMerge", shouldMerge) - .detail("HasBeenTrueLongEnough", wantsToMerge->hasBeenTrueForLongEnough()) - .detail("CurrentMetrics", stats.toString()) - .detail("ShardBoundsMaxBytes", shardBounds.max.bytes) - .detail("ShardBoundsMinBytes", shardBounds.min.bytes) - .detail("WriteBandwitdhStatus", bandwidthStatus) - .detail("SplitBecauseHighWriteBandWidth", ( bandwidthStatus == BandwidthStatusHigh && keys.begin < - keyServersKeys.begin ) ? "Yes" :"No");*/ + // TraceEvent("EdgeCaseTraceShardEvaluator", self->distributorId) + // .detail("BeginKey", keys.begin.printable()) + // .detail("EndKey", keys.end.printable()) + // .detail("ShouldSplit", shouldSplit) + // .detail("ShouldMerge", shouldMerge) + // .detail("HasBeenTrueLongEnough", wantsToMerge->hasBeenTrueForLongEnough()) + // .detail("CurrentMetrics", stats.toString()) + // .detail("ShardBoundsMaxBytes", shardBounds.max.bytes) + // .detail("ShardBoundsMinBytes", shardBounds.min.bytes) + // .detail("WriteBandwitdhStatus", bandwidthStatus) + // .detail("SplitBecauseHighWriteBandWidth", + // (bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin) ? "Yes" : "No"); if (!self->anyZeroHealthyTeams->get() && wantsToMerge->hasBeenTrueForLongEnough()) { onChange = onChange || shardMerger(self, keys, shardSize); @@ -1050,6 +1049,9 @@ ACTOR Future dataDistributionTracker(Reference in self.sizeChanges.add(fetchShardMetricsList(&self, req)); } when(wait(self.sizeChanges.getResult())) {} + when(KeyRange req = waitNext(self.shardsAffectedByTeamFailure->restartShardTracker.getFuture())) { + restartShardTrackers(&self, req); + } } } catch (Error& e) { TraceEvent(SevError, "DataDistributionTrackerError", self.distributorId).error(e); diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index bb51451581..9534b6c9d4 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -31,6 +31,131 @@ #include "flow/actorcompiler.h" // This must be the last #include. +FDB_DEFINE_BOOLEAN_PARAM(CancelConflictingDataMoves); + +namespace { +struct Shard { + Shard() = default; + Shard(KeyRangeRef range, const UID& id) : range(range), id(id) {} + + KeyRange range; + UID id; +}; + +// Unassigns keyrange `range` from server `ssId`, except ranges in `shards`. +// Note: krmSetRangeCoalescing() doesn't work in this case since each shard is assigned an ID. +ACTOR Future unassignServerKeys(Transaction* tr, UID ssId, KeyRange range, std::vector shards, UID logId) { + state Key mapPrefix = serverKeysPrefixFor(ssId); + if (shards.empty()) { + wait(krmSetRangeCoalescing(tr, mapPrefix, range, allKeys, serverKeysFalse)); + return Void(); + } + + state KeyRange withPrefix = + KeyRangeRef(mapPrefix.toString() + range.begin.toString(), mapPrefix.toString() + range.end.toString()); + state KeyRange maxWithPrefix = + KeyRangeRef(mapPrefix.toString() + allKeys.begin.toString(), mapPrefix.toString() + allKeys.end.toString()); + + state std::vector> keys; + keys.push_back( + tr->getRange(lastLessThan(withPrefix.begin), firstGreaterOrEqual(withPrefix.begin), 1, Snapshot::True)); + keys.push_back( + tr->getRange(lastLessOrEqual(withPrefix.end), firstGreaterThan(withPrefix.end) + 1, 2, Snapshot::True)); + wait(waitForAll(keys)); + + // Determine how far to extend this range at the beginning + auto beginRange = keys[0].get(); + bool hasBegin = beginRange.size() > 0 && beginRange[0].key.startsWith(mapPrefix); + Value beginValue = hasBegin ? beginRange[0].value : serverKeysFalse; + + Key beginKey = withPrefix.begin; + Value value = range.begin == shards[0].range.begin ? serverKeysValue(shards[0].id) : serverKeysFalse; + if (beginValue == value) { + bool outsideRange = !hasBegin || beginRange[0].key < maxWithPrefix.begin; + beginKey = outsideRange ? maxWithPrefix.begin : beginRange[0].key; + } + + std::vector kvs; + if (beginKey < withPrefix.begin) { + kvs.push_back(KeyValueRef(beginKey, value)); + } + Key preEnd = range.begin; + for (int i = 0; i < shards.size(); ++i) { + const Shard& shard = shards[i]; + if (shard.range.begin > preEnd && (kvs.empty() || kvs.back().value != serverKeysFalse)) { + kvs.push_back(KeyValueRef(preEnd.withPrefix(mapPrefix), serverKeysFalse)); + } + preEnd = shard.range.end; + Value cv = serverKeysValue(shard.id); + if (kvs.empty() || cv != kvs.back().value) { + kvs.push_back(KeyValueRef(shard.range.begin.withPrefix(mapPrefix), cv)); + } + } + if (range.end > preEnd) { + kvs.push_back(KeyValueRef(preEnd.withPrefix(mapPrefix), serverKeysFalse)); + } + + // Determine how far to extend this range at the end + auto endRange = keys[1].get(); + bool hasEnd = endRange.size() >= 1 && endRange[0].key.startsWith(mapPrefix) && endRange[0].key <= withPrefix.end; + bool hasNext = (endRange.size() == 2 && endRange[1].key.startsWith(mapPrefix)) || + (endRange.size() == 1 && withPrefix.end < endRange[0].key && endRange[0].key.startsWith(mapPrefix)); + Value existingValue = hasEnd ? endRange[0].value : serverKeysFalse; + + Key endKey; + Value endValue; + const bool valueMatches = kvs.back().value == existingValue; + + // Case 1: Coalesce completely with the following range + if (hasNext && endRange.back().key <= maxWithPrefix.end && valueMatches) { + endKey = endRange.back().key; + endValue = endRange.back().value; + } + + // Case 2: Coalesce with the following range only up to the end of allKeys + else if (valueMatches) { + endKey = maxWithPrefix.end; + endValue = existingValue; + } + + // Case 3: Don't coalesce + else { + endKey = withPrefix.end; + endValue = existingValue; + } + + kvs.push_back(KeyValueRef(endKey, endValue)); + + for (int i = 0; i < kvs.size(); ++i) { + TraceEvent(SevDebug, "UnassignServerKeys", logId) + .detail("SSID", ssId) + .detail("Range", range) + .detail("Point", kvs[i]); + } + + KeyRange conflictRange = KeyRangeRef(hasBegin ? beginRange[0].key : mapPrefix, withPrefix.begin); + if (!conflictRange.empty()) { + tr->addReadConflictRange(conflictRange); + } + conflictRange = KeyRangeRef(hasEnd ? endRange[0].key : mapPrefix, + hasNext ? keyAfter(endRange.end()[-1].key) : strinc(mapPrefix)); + if (!conflictRange.empty()) { + tr->addReadConflictRange(conflictRange); + } + + tr->clear(KeyRangeRef(beginKey, endKey)); + + for (int i = 0; i < kvs.size() - 1; ++i) { + ASSERT(kvs[i].value != kvs[i + 1].value || kvs[i + 1].key.removePrefix(mapPrefix) == allKeys.end); + tr->set(kvs[i].key, kvs[i].value); + tr->set(kvs[i + 1].key, kvs[i + 1].value); + } + + return Void(); +} + +} // namespace + bool DDEnabledState::isDDEnabled() const { return ddEnabled; } @@ -163,6 +288,86 @@ ACTOR Future> checkReadWrite(Future> f return Optional(uid); } +// Cleans up dest servers of a single shard, and unassigns the keyrange from the dest servers if necessary. +ACTOR Future cleanUpSingleShardDataMove(Database occ, + KeyRange keys, + MoveKeysLock lock, + FlowLock* cleanUpDataMoveParallelismLock, + UID dataMoveId, + const DDEnabledState* ddEnabledState) { + ASSERT(CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA); + TraceEvent(SevInfo, "CleanUpSingleShardDataMoveBegin", dataMoveId).detail("Range", keys); + + loop { + state Transaction tr(occ); + + try { + tr.trState->taskID = TaskPriority::MoveKeys; + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + state RangeResult currentShards = wait(krmGetRanges(&tr, + keyServersPrefix, + keys, + SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT, + SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT)); + ASSERT(!currentShards.empty() && !currentShards.more); + + state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); + + if (KeyRangeRef(currentShards[0].key, currentShards[1].key) != keys) { + throw operation_cancelled(); + } + + std::vector src; + std::vector dest; + UID srcId, destId; + decodeKeyServersValue(UIDtoTagMap, currentShards[0].value, src, dest, srcId, destId); + + if (dest.empty() || destId != anonymousShardId) { + return Void(); + } + + TraceEvent(SevInfo, "CleanUpSingleShardDataMove", dataMoveId) + .detail("Range", keys) + .detail("Src", describe(src)) + .detail("Dest", describe(dest)) + .detail("SrcID", srcId) + .detail("DestID", destId) + .detail("ReadVersion", tr.getReadVersion().get()); + + krmSetPreviouslyEmptyRange( + &tr, keyServersPrefix, keys, keyServersValue(UIDtoTagMap, src, {}), currentShards[1].value); + + std::vector> actors; + for (const auto& uid : dest) { + if (std::find(src.begin(), src.end(), uid) == src.end()) { + actors.push_back( + krmSetRangeCoalescing(&tr, serverKeysPrefixFor(uid), keys, allKeys, serverKeysFalse)); + } + } + + wait(waitForAll(actors)); + + wait(tr.commit()); + + break; + } catch (Error& e) { + state Error err = e; + wait(tr.onError(e)); + + TraceEvent(SevWarn, "CleanUpSingleShardDataMoveRetriableError", dataMoveId) + .error(err) + .detail("Range", keys); + } + } + + TraceEvent(SevInfo, "CleanUpSingleShardDataMoveEnd", dataMoveId).detail("Range", keys); + + return Void(); +} + Future removeOldDestinations(Reference tr, UID oldDest, VectorRef shards, @@ -1003,6 +1208,587 @@ ACTOR static Future finishMoveKeys(Database occ, return Void(); } +// keyServer: map from keys to destination servers. +// serverKeys: two-dimension map: [servers][keys], value is the servers' state of having the keys: active(not-have), +// complete(already has), ""(). +// Set keyServers[keys].dest = servers Set serverKeys[servers][keys] = dataMoveId for each +// subrange of keys. +// Set dataMoves[dataMoveId] = DataMoveMetaData. +ACTOR static Future startMoveShards(Database occ, + UID dataMoveId, + KeyRange keys, + std::vector servers, + MoveKeysLock lock, + FlowLock* startMoveKeysLock, + UID relocationIntervalId, + const DDEnabledState* ddEnabledState, + CancelConflictingDataMoves cancelConflictingDataMoves) { + ASSERT(CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA); + state Future warningLogger = logWarningAfter("StartMoveShardsTooLong", 600, servers); + + wait(startMoveKeysLock->take(TaskPriority::DataDistributionLaunch)); + state FlowLock::Releaser releaser(*startMoveKeysLock); + state DataMoveMetaData dataMove; + + TraceEvent(SevDebug, "StartMoveShardsBegin", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("TargetRange", keys); + + try { + state Key begin = keys.begin; + state KeyRange currentKeys = keys; + state int maxRetries = 0; + state bool complete = false; + + loop { + state Transaction tr(occ); + complete = false; + + try { + // Keep track of old dests that may need to have ranges removed from serverKeys + state std::set oldDests; + + // Keep track of shards for all src servers so that we can preserve their values in serverKeys + state std::unordered_map> physicalShardMap; + + tr.trState->taskID = TaskPriority::MoveKeys; + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + wait(checkMoveKeysLock(&tr, lock, ddEnabledState)); + + Optional val = wait(tr.get(dataMoveKeyFor(dataMoveId))); + if (val.present()) { + DataMoveMetaData dmv = decodeDataMoveValue(val.get()); // dmv: Data move value. + dataMove = dmv; + TraceEvent(SevVerbose, "StartMoveShardsFoundDataMove", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("DataMove", dataMove.toString()); + ASSERT(dataMove.range.begin == keys.begin); + if (dataMove.getPhase() == DataMoveMetaData::Deleting) { + TraceEvent(SevVerbose, "StartMoveShardsDataMove", relocationIntervalId) + .detail("DataMoveBeingDeleted", dataMoveId); + throw data_move_cancelled(); + } + if (dataMove.getPhase() == DataMoveMetaData::Running) { + TraceEvent(SevVerbose, "StartMoveShardsDataMove", relocationIntervalId) + .detail("DataMoveAlreadyCommitted", dataMoveId); + ASSERT(keys == dataMove.range); + return Void(); + } + begin = dataMove.range.end; + } else { + dataMove.id = dataMoveId; + TraceEvent(SevVerbose, "StartMoveKeysNewDataMove", relocationIntervalId) + .detail("DataMoveRange", keys) + .detail("DataMoveID", dataMoveId); + } + + std::vector>> serverListEntries; + serverListEntries.reserve(servers.size()); + for (int s = 0; s < servers.size(); s++) { + serverListEntries.push_back(tr.get(serverListKeyFor(servers[s]))); + } + + std::vector> serverListValues = wait(getAll(serverListEntries)); + for (int s = 0; s < serverListValues.size(); s++) { + if (!serverListValues[s].present()) { + // Attempt to move onto a server that isn't in serverList (removed or never added to the + // database) This can happen (why?) and is handled by the data distribution algorithm + // FIXME: Answer why this can happen? + // TODO(psm): Mark the data move as 'deleting'. + throw move_to_removed_server(); + } + } + + currentKeys = KeyRangeRef(begin, keys.end); + state std::vector> actors; + + if (!currentKeys.empty()) { + const int rowLimit = SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT; + const int byteLimit = SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT; + state RangeResult old = wait(krmGetRanges(&tr, keyServersPrefix, currentKeys, rowLimit, byteLimit)); + + state Key endKey = old.back().key; + currentKeys = KeyRangeRef(currentKeys.begin, endKey); + + // Check that enough servers for each shard are in the correct state + state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); + + // For each intersecting range, update keyServers[range] dest to be servers and clear existing dest + // servers from serverKeys + state int oldIndex = 0; + for (; oldIndex < old.size() - 1; ++oldIndex) { + state KeyRangeRef rangeIntersectKeys(old[oldIndex].key, old[oldIndex + 1].key); + state std::vector src; + state std::vector dest; + state UID srcId; + state UID destId; + decodeKeyServersValue(UIDtoTagMap, old[oldIndex].value, src, dest, srcId, destId); + TraceEvent(SevVerbose, "StartMoveShardsProcessingShard", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("Range", rangeIntersectKeys) + .detail("OldSrc", describe(src)) + .detail("OldDest", describe(dest)) + .detail("SrcID", srcId) + .detail("DestID", destId) + .detail("ReadVersion", tr.getReadVersion().get()); + + if (destId.isValid()) { + TraceEvent(SevWarn, "StartMoveShardsDestIDExist", relocationIntervalId) + .detail("Range", rangeIntersectKeys) + .detail("DataMoveID", dataMoveId) + .detail("DestID", destId) + .log(); + ASSERT(!dest.empty()); + + if (destId == dataMoveId) { + TraceEvent(SevWarn, "StartMoveShardsRangeAlreadyCommitted", relocationIntervalId) + .detail("Range", rangeIntersectKeys) + .detail("DataMoveID", dataMoveId); + continue; + } + + if (destId == anonymousShardId) { + wait(cleanUpSingleShardDataMove( + occ, rangeIntersectKeys, lock, startMoveKeysLock, dataMoveId, ddEnabledState)); + } else { + if (cancelConflictingDataMoves) { + TraceEvent( + SevWarn, "StartMoveShardsCancelConflictingDataMove", relocationIntervalId) + .detail("Range", rangeIntersectKeys) + .detail("DataMoveID", dataMoveId) + .detail("ExistingDataMoveID", destId); + wait(cleanUpDataMove(occ, destId, lock, startMoveKeysLock, keys, ddEnabledState)); + } else { + Optional val = wait(tr.get(dataMoveKeyFor(destId))); + ASSERT(val.present()); + DataMoveMetaData dmv = decodeDataMoveValue(val.get()); + TraceEvent( + SevWarnAlways, "StartMoveShardsFoundConflictingDataMove", relocationIntervalId) + .detail("Range", rangeIntersectKeys) + .detail("DataMoveID", dataMoveId) + .detail("ExistingDataMoveID", destId) + .detail("ExistingDataMove", dmv.toString()); + throw movekeys_conflict(); + } + } + } + + // Update dest servers for this range to be equal to servers + krmSetPreviouslyEmptyRange(&tr, + keyServersPrefix, + rangeIntersectKeys, + keyServersValue(src, servers, srcId, dataMoveId), + old[oldIndex + 1].value); + + // Track old destination servers. They may be removed from serverKeys soon, since they are + // about to be overwritten in keyServers + for (const UID& ssId : dest) { + oldDests.insert(ssId); + } + + // Keep track of src shards so that we can preserve their values when we overwrite serverKeys + for (const UID& ssId : src) { + physicalShardMap[ssId].emplace_back(rangeIntersectKeys, srcId); + } + + const UID checkpontId = deterministicRandom()->randomUniqueID(); + for (const UID& ssId : src) { + dataMove.src.insert(ssId); + // TODO(psm): Create checkpoint for the range. + } + } + + // Remove old dests from serverKeys. + for (const UID& destId : oldDests) { + if (std::find(servers.begin(), servers.end(), destId) == servers.end()) { + actors.push_back( + unassignServerKeys(&tr, destId, currentKeys, physicalShardMap[destId], dataMoveId)); + } + } + + // Update serverKeys to include keys (or the currently processed subset of keys) for each SS in + // servers. + for (int i = 0; i < servers.size(); i++) { + // Since we are setting this for the entire range, serverKeys and keyServers aren't guaranteed + // to have the same shard boundaries If that invariant was important, we would have to move this + // inside the loop above and also set it for the src servers. + actors.push_back(krmSetRangeCoalescing( + &tr, serverKeysPrefixFor(servers[i]), currentKeys, allKeys, serverKeysValue(dataMoveId))); + } + + dataMove.range = KeyRangeRef(keys.begin, currentKeys.end); + dataMove.dest.insert(servers.begin(), servers.end()); + } + + if (currentKeys.end == keys.end) { + dataMove.setPhase(DataMoveMetaData::Running); + complete = true; + TraceEvent(SevVerbose, "StartMoveShardsDataMoveComplete", dataMoveId) + .detail("DataMoveID", dataMoveId) + .detail("DataMove", dataMove.toString()); + } else { + dataMove.setPhase(DataMoveMetaData::Prepare); + TraceEvent(SevVerbose, "StartMoveShardsDataMovePartial", dataMoveId) + .detail("DataMoveID", dataMoveId) + .detail("CurrentRange", currentKeys) + .detail("DataMoveRange", keys) + .detail("NewDataMoveMetaData", dataMove.toString()); + } + + tr.set(dataMoveKeyFor(dataMoveId), dataMoveValue(dataMove)); + + wait(waitForAll(actors)); + + wait(tr.commit()); + + TraceEvent(SevVerbose, "DataMoveMetaDataCommit", dataMove.id) + .detail("DataMoveID", dataMoveId) + .detail("DataMoveKey", dataMoveKeyFor(dataMoveId)) + .detail("CommitVersion", tr.getCommittedVersion()) + .detail("DeltaRange", currentKeys.toString()) + .detail("Range", dataMove.range.toString()) + .detail("DataMove", dataMove.toString()); + + dataMove = DataMoveMetaData(); + if (complete) { + break; + } + } catch (Error& e) { + TraceEvent(SevWarn, "StartMoveShardsError", dataMoveId) + .errorUnsuppressed(e) + .detail("DataMoveID", dataMoveId) + .detail("DataMoveRange", keys) + .detail("CurrentDataMoveMetaData", dataMove.toString()); + state Error err = e; + if (err.code() == error_code_move_to_removed_server) { + throw; + } + wait(tr.onError(e)); + } + } + } catch (Error& e) { + TraceEvent(SevWarn, "StartMoveShardsError", relocationIntervalId) + .errorUnsuppressed(e) + .detail("DataMoveID", dataMoveId); + throw; + } + + TraceEvent(SevDebug, "StartMoveShardsEnd", relocationIntervalId).detail("DataMoveID", dataMoveId); + + return Void(); +} + +ACTOR static Future checkDataMoveComplete(Database occ, UID dataMoveId, KeyRange keys, UID relocationIntervalId) { + try { + state Reference tr = makeReference(occ); + state Key begin = keys.begin; + while (begin < keys.end) { + loop { + try { + tr->getTransaction().trState->taskID = TaskPriority::MoveKeys; + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + // Get all existing shards overlapping keys (exclude any that have been processed in a previous + // iteration of the outer loop) + state KeyRange currentKeys = KeyRangeRef(begin, keys.end); + + state RangeResult keyServers = wait(krmGetRanges(tr, + keyServersPrefix, + currentKeys, + SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT, + SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT)); + + // Determine the last processed key (which will be the beginning for the next iteration) + state Key endKey = keyServers.back().key; + currentKeys = KeyRangeRef(currentKeys.begin, endKey); + + // Check that enough servers for each shard are in the correct state + state RangeResult UIDtoTagMap = wait(tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); + + for (int i = 0; i < keyServers.size() - 1; ++i) { + KeyRangeRef rangeIntersectKeys(keyServers[i].key, keyServers[i + 1].key); + std::vector src; + std::vector dest; + UID srcId; + UID destId; + decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest, srcId, destId); + const KeyRange currentRange = KeyRangeRef(keyServers[i].key, keyServers[i + 1].key); + TraceEvent(SevVerbose, "CheckDataMoveCompleteShard", relocationIntervalId) + .detail("Range", currentRange) + .detail("SrcID", srcId) + .detail("Src", describe(src)) + .detail("DestID", destId) + .detail("Dest", describe(dest)); + if (!dest.empty() || srcId != dataMoveId) { + // There is ongoing data move, or the data move is complete, but moved to a different shard. + throw data_move_cancelled(); + } + } + + begin = endKey; + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + } + } catch (Error& e) { + TraceEvent(SevDebug, "CheckDataMoveCompleteError", relocationIntervalId).errorUnsuppressed(e); + throw; + } + + return Void(); +} + +// Set keyServers[keys].src = keyServers[keys].dest and keyServers[keys].dest=[], return when successful +// keyServers[k].dest must be the same for all k in keys. +// Set serverKeys[dest][keys] = dataMoveId; serverKeys[src][keys] = false for all src not in dest. +// Clear dataMoves[dataMoveId]. +ACTOR static Future finishMoveShards(Database occ, + UID dataMoveId, + KeyRange targetKeys, + std::vector destinationTeam, + MoveKeysLock lock, + FlowLock* finishMoveKeysParallelismLock, + bool hasRemote, + UID relocationIntervalId, + std::map tssMapping, + const DDEnabledState* ddEnabledState) { + ASSERT(CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA); + state KeyRange keys = targetKeys; + state Future warningLogger = logWarningAfter("FinishMoveShardsTooLong", 600, destinationTeam); + state int retries = 0; + state DataMoveMetaData dataMove; + state bool complete = false; + + wait(finishMoveKeysParallelismLock->take(TaskPriority::DataDistributionLaunch)); + state FlowLock::Releaser releaser = FlowLock::Releaser(*finishMoveKeysParallelismLock); + + ASSERT(!destinationTeam.empty()); + + try { + TraceEvent(SevDebug, "FinishMoveShardsBegin", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("TargetRange", keys); + + // This process can be split up into multiple transactions if getRange() doesn't return the entire + // target range. + loop { + state std::vector completeSrc; + state std::vector destServers; + state std::unordered_set allServers; + state KeyRange range; + state Transaction tr(occ); + complete = false; + try { + tr.trState->taskID = TaskPriority::MoveKeys; + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + wait(checkMoveKeysLock(&tr, lock, ddEnabledState)); + + Optional val = wait(tr.get(dataMoveKeyFor(dataMoveId))); + if (val.present()) { + dataMove = decodeDataMoveValue(val.get()); + TraceEvent(SevVerbose, "FinishMoveShardsFoundDataMove", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("DataMove", dataMove.toString()); + destServers.insert(destServers.end(), dataMove.dest.begin(), dataMove.dest.end()); + std::sort(destServers.begin(), destServers.end()); + if (dataMove.getPhase() == DataMoveMetaData::Deleting) { + TraceEvent(SevWarn, "FinishMoveShardsDataMoveDeleting", relocationIntervalId) + .detail("DataMoveID", dataMoveId); + throw data_move_cancelled(); + } + ASSERT(dataMove.getPhase() == DataMoveMetaData::Running); + range = dataMove.range; + } else { + TraceEvent(SevWarn, "FinishMoveShardsDataMoveDeleted", relocationIntervalId) + .detail("DataMoveID", dataMoveId); + wait(checkDataMoveComplete(occ, dataMoveId, targetKeys, relocationIntervalId)); + return Void(); + } + + state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); + + state RangeResult keyServers = wait(krmGetRanges(&tr, + keyServersPrefix, + range, + SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT, + SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT)); + ASSERT(!keyServers.empty()); + range = KeyRangeRef(range.begin, keyServers.back().key); + ASSERT(!range.empty()); + + for (int currentIndex = 0; currentIndex < keyServers.size() - 1; ++currentIndex) { + std::vector src; + std::vector dest; + UID srcId; + UID destId; + decodeKeyServersValue(UIDtoTagMap, keyServers[currentIndex].value, src, dest, srcId, destId); + const KeyRange currentRange = + KeyRangeRef(keyServers[currentIndex].key, keyServers[currentIndex + 1].key); + TraceEvent(SevVerbose, "FinishMoveShardsProcessingShard", relocationIntervalId) + .detail("Range", currentRange) + .detail("SrcID", srcId) + .detail("Src", describe(src)) + .detail("DestID", destId) + .detail("Dest", describe(dest)); + allServers.insert(src.begin(), src.end()); + allServers.insert(dest.begin(), dest.end()); + if (!destId.isValid()) { + TraceEvent(SevError, "FinishMoveShardsInvalidDestID", relocationIntervalId) + .detail("DataMoveID", dataMoveId); + continue; + } else { + ASSERT(destId == dataMoveId); + std::sort(dest.begin(), dest.end()); + ASSERT(std::equal(destServers.begin(), destServers.end(), dest.begin())); + } + + std::set srcSet; + for (int s = 0; s < src.size(); s++) { + srcSet.insert(src[s]); + } + + if (currentIndex == 0) { + completeSrc = src; + } else { + for (int i = 0; i < completeSrc.size(); i++) { + if (!srcSet.count(completeSrc[i])) { + swapAndPop(&completeSrc, i--); + } + } + } + } + + // Wait for a durable quorum of servers in destServers to have keys available (readWrite) + // They must also have at least the transaction read version so they can't "forget" the shard + // between now and when this transaction commits. + state std::vector> serverReady; // only for count below + state std::vector newDestinations; + std::set completeSrcSet(completeSrc.begin(), completeSrc.end()); + for (const UID& id : destServers) { + newDestinations.push_back(id); + } + + state std::vector storageServerInterfaces; + std::vector>> serverListEntries; + serverListEntries.reserve(newDestinations.size()); + for (const UID& id : newDestinations) { + serverListEntries.push_back(tr.get(serverListKeyFor(id))); + } + state std::vector> serverListValues = wait(getAll(serverListEntries)); + + releaser.release(); + + for (int s = 0; s < serverListValues.size(); s++) { + ASSERT(serverListValues[s] + .present()); // There should always be server list entries for servers in keyServers + auto si = decodeServerListValue(serverListValues[s].get()); + ASSERT(si.id() == newDestinations[s]); + storageServerInterfaces.push_back(si); + } + + // Wait for new destination servers to fetch the data range. + serverReady.reserve(storageServerInterfaces.size()); + for (int s = 0; s < storageServerInterfaces.size(); s++) { + serverReady.push_back(waitForShardReady( + storageServerInterfaces[s], range, tr.getReadVersion().get(), GetShardStateRequest::READABLE)); + } + + // Wait for all storage server moves, and explicitly swallow errors for tss ones with + // waitForAllReady If this takes too long the transaction will time out and retry, which is ok + wait(timeout(waitForAll(serverReady), + SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT, + Void(), + TaskPriority::MoveKeys)); + + int count = 0; + for (int s = 0; s < serverReady.size(); ++s) { + count += serverReady[s].isReady() && !serverReady[s].isError(); + } + + TraceEvent(SevVerbose, "FinishMoveShardsWaitedServers", relocationIntervalId) + .detail("ReadyServers", count); + + if (count == newDestinations.size()) { + std::vector> actors; + actors.push_back(krmSetRangeCoalescing( + &tr, keyServersPrefix, range, allKeys, keyServersValue(destServers, {}, dataMoveId, UID()))); + + for (const UID& ssId : allServers) { + const bool destHasServer = + std::find(destServers.begin(), destServers.end(), ssId) != destServers.end(); + actors.push_back( + krmSetRangeCoalescing(&tr, + serverKeysPrefixFor(ssId), + range, + allKeys, + destHasServer ? serverKeysValue(dataMoveId) : serverKeysFalse)); + TraceEvent(SevVerbose, "FinishMoveShardsSetServerKeyRange", dataMoveId) + .detail("StorageServerID", ssId) + .detail("KeyRange", range) + .detail("ShardID", destHasServer ? dataMoveId : UID()); + } + + wait(waitForAll(actors)); + + if (range.end == dataMove.range.end) { + tr.clear(dataMoveKeyFor(dataMoveId)); + complete = true; + TraceEvent(SevVerbose, "FinishMoveShardsDeleteMetaData", dataMoveId) + .detail("DataMove", dataMove.toString()); + } else { + TraceEvent(SevInfo, "FinishMoveShardsPartialComplete", dataMoveId) + .detail("DataMoveID", dataMoveId) + .detail("CurrentRange", range) + .detail("NewDataMoveMetaData", dataMove.toString()); + dataMove.range = KeyRangeRef(range.end, dataMove.range.end); + tr.set(dataMoveKeyFor(dataMoveId), dataMoveValue(dataMove)); + } + + wait(tr.commit()); + + if (complete) { + break; + } + } else { + tr.reset(); + } + } catch (Error& error) { + TraceEvent(SevWarn, "TryFinishMoveShardsError", relocationIntervalId) + .errorUnsuppressed(error) + .detail("DataMoveID", dataMoveId); + if (error.code() == error_code_actor_cancelled) + throw; + state Error err = error; + wait(tr.onError(error)); + retries++; + if (retries % 10 == 0) { + TraceEvent(retries == 20 ? SevWarnAlways : SevWarn, + "RelocateShard_FinishMoveKeysRetrying", + relocationIntervalId) + .error(err) + .detail("DataMoveID", dataMoveId); + } + } + } + } catch (Error& e) { + TraceEvent(SevWarn, "FinishMoveShardsError", relocationIntervalId).errorUnsuppressed(e); + throw; + } + + TraceEvent(SevDebug, "FinishMoveShardsEnd", relocationIntervalId).detail("DataMoveID", dataMoveId); + return Void(); +} + ACTOR Future> addStorageServer(Database cx, StorageServerInterface server) { state Reference tr = makeReference(cx); state KeyBackedMap tssMapDB = KeyBackedMap(tssMappingKeys.begin); @@ -1200,7 +1986,16 @@ ACTOR Future canRemoveStorageServer(Reference t // Return true if the entire range is false. Since these values are coalesced, we can return false if there is more // than one result - return keys[0].value == serverKeysFalse && keys[1].key == allKeys.end; + UID teamId; + bool assigned, emptyRange; + decodeServerKeysValue(keys[0].value, assigned, emptyRange, teamId); + TraceEvent(SevVerbose, "CanRemoveStorageServer") + .detail("ServerID", serverID) + .detail("Key1", keys[0].key) + .detail("Value1", keys[0].value) + .detail("Key2", keys[1].key) + .detail("Value2", keys[1].value); + return !assigned && keys[1].key == allKeys.end; } ACTOR Future removeStorageServer(Database cx, @@ -1330,6 +2125,8 @@ ACTOR Future removeKeysFromFailedServer(Database cx, state std::vector src; state std::vector dest; + state UID srcId; + state UID destId; // Multi-transactional removal in case of large number of shards, concern in violating 5s transaction limit while (begin < allKeys.end) { state Transaction tr(cx); @@ -1357,7 +2154,7 @@ ACTOR Future removeKeysFromFailedServer(Database cx, state int i = 0; for (; i < keyServers.size() - 1; ++i) { state KeyValueRef it = keyServers[i]; - decodeKeyServersValue(UIDtoTagMap, it.value, src, dest); + decodeKeyServersValue(UIDtoTagMap, it.value, src, dest, srcId, destId); // The failed server is not present if (std::find(src.begin(), src.end(), serverID) == src.end() && @@ -1370,18 +2167,23 @@ ACTOR Future removeKeysFromFailedServer(Database cx, src.erase(std::remove(src.begin(), src.end(), serverID), src.end()); dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end()); + state KeyRangeRef range(it.key, keyServers[i + 1].key); + // If the last src server is to be removed, first check if there are dest servers who is // hosting a read-write copy of the keyrange, and move such dest servers to the src list. if (src.empty() && !dest.empty()) { - std::vector newSources = - wait(pickReadWriteServers(&tr, dest, KeyRangeRef(it.key, keyServers[i + 1].key))); + std::vector newSources = wait(pickReadWriteServers(&tr, dest, range)); for (const UID& id : newSources) { TraceEvent(SevWarn, "FailedServerAdditionalSourceServer", serverID) - .detail("Key", it.key) + .detail("Range", range) .detail("NewSourceServerFromDest", id); - dest.erase(std::remove(dest.begin(), dest.end(), id), dest.end()); + if (destId == anonymousShardId) { + dest.erase(std::remove(dest.begin(), dest.end(), id), dest.end()); + } src.push_back(id); + srcId = anonymousShardId; } + // TODO(psm): We may need to cancel the data move since all sources servers are gone. } // Move the keyrange to teamForDroppedRange if the src list becomes empty, and also remove the shard @@ -1394,28 +2196,52 @@ ACTOR Future removeKeysFromFailedServer(Database cx, throw internal_error(); } + if (destId.isValid() && destId != anonymousShardId) { + Optional val = wait(tr.get(dataMoveKeyFor(destId))); + if (val.present()) { + state DataMoveMetaData dataMove = decodeDataMoveValue(val.get()); + TraceEvent(SevVerbose, "RemoveRangeFoundDataMove", serverID) + .detail("DataMoveMetaData", dataMove.toString()); + if (range == dataMove.range) { + tr.clear(dataMoveKeyFor(destId)); + } else { + dataMove.setPhase(DataMoveMetaData::Deleting); + tr.set(dataMoveKeyFor(destId), dataMoveValue(dataMove)); + } + } else { + TraceEvent(SevWarnAlways, "DataMoveMissing", serverID) + .detail("DataMoveID", destId) + .detail("Range", range); + } + } + + const UID shardId = newShardId(deterministicRandom()->randomUInt64(), AssignEmptyRange::True); + // Assign the shard to teamForDroppedRange in keyServer space. - tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {})); + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + tr.set(keyServersKey(it.key), keyServersValue(teamForDroppedRange, {}, shardId, UID())); + } else { + tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange)); + } std::vector> actors; // Unassign the shard from the dest servers. for (const UID& id : dest) { - actors.push_back(krmSetRangeCoalescing(&tr, - serverKeysPrefixFor(id), - KeyRangeRef(it.key, keyServers[i + 1].key), - allKeys, - serverKeysFalse)); + actors.push_back( + krmSetRangeCoalescing(&tr, serverKeysPrefixFor(id), range, allKeys, serverKeysFalse)); } // Assign the shard to the new team as an empty range. // Note, there could be data loss. for (const UID& id : teamForDroppedRange) { - actors.push_back(krmSetRangeCoalescing(&tr, - serverKeysPrefixFor(id), - KeyRangeRef(it.key, keyServers[i + 1].key), - allKeys, - serverKeysTrueEmptyRange)); + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + actors.push_back(krmSetRangeCoalescing( + &tr, serverKeysPrefixFor(id), range, allKeys, serverKeysValue(shardId))); + } else { + actors.push_back(krmSetRangeCoalescing( + &tr, serverKeysPrefixFor(id), range, allKeys, serverKeysTrueEmptyRange)); + } } wait(waitForAll(actors)); @@ -1432,7 +2258,13 @@ ACTOR Future removeKeysFromFailedServer(Database cx, .detail("Key", it.key) .detail("ValueSrc", describe(src)) .detail("ValueDest", describe(dest)); - tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest)); + if (srcId != anonymousShardId) { + if (dest.empty()) + destId = UID(); + tr.set(keyServersKey(it.key), keyServersValue(src, dest, srcId, destId)); + } else { + tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest)); + } } } @@ -1458,7 +2290,155 @@ ACTOR Future removeKeysFromFailedServer(Database cx, return Void(); } +ACTOR Future cleanUpDataMove(Database occ, + UID dataMoveId, + MoveKeysLock lock, + FlowLock* cleanUpDataMoveParallelismLock, + KeyRange keys, + const DDEnabledState* ddEnabledState) { + TraceEvent(SevVerbose, "CleanUpDataMoveBegin", dataMoveId).detail("DataMoveID", dataMoveId).detail("Range", keys); + state bool complete = false; + + wait(cleanUpDataMoveParallelismLock->take(TaskPriority::DataDistributionLaunch)); + state FlowLock::Releaser releaser = FlowLock::Releaser(*cleanUpDataMoveParallelismLock); + + try { + loop { + state Transaction tr(occ); + state std::unordered_map> physicalShardMap; + state std::set oldDests; + state KeyRange range; + + try { + tr.trState->taskID = TaskPriority::MoveKeys; + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + wait(checkMoveKeysLock(&tr, lock, ddEnabledState)); + + Optional val = wait(tr.get(dataMoveKeyFor(dataMoveId))); + if (val.present()) { + state DataMoveMetaData dataMove = decodeDataMoveValue(val.get()); + TraceEvent(SevVerbose, "CleanUpDataMoveMetaData", dataMoveId) + .detail("DataMoveID", dataMoveId) + .detail("DataMoveMetaData", dataMove.toString()); + range = dataMove.range; + ASSERT(!range.empty()); + } else { + TraceEvent(SevDebug, "CleanUpDataMoveNotExist", dataMoveId).detail("DataMoveID", dataMoveId); + break; + } + + dataMove.setPhase(DataMoveMetaData::Deleting); + + state RangeResult currentShards = wait(krmGetRanges(&tr, + keyServersPrefix, + range, + SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT, + SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT)); + ASSERT(!currentShards.empty()); + ASSERT(range.begin == currentShards.front().key); + range = KeyRangeRef(range.begin, currentShards.back().key); + + state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); + + // For each intersecting range, clear existing dest servers and checkpoints on all src servers. + state int i = 0; + for (; i < currentShards.size() - 1; ++i) { + KeyRangeRef rangeIntersectKeys(currentShards[i].key, currentShards[i + 1].key); + std::vector src; + std::vector dest; + UID srcId, destId; + decodeKeyServersValue(UIDtoTagMap, currentShards[i].value, src, dest, srcId, destId); + + for (const auto& uid : src) { + physicalShardMap[uid].push_back(Shard(rangeIntersectKeys, srcId)); + } + + TraceEvent(SevVerbose, "CleanUpDataMoveShard", dataMoveId) + .detail("DataMoveID", dataMoveId) + .detail("ShardRange", rangeIntersectKeys) + .detail("Src", describe(src)) + .detail("Dest", describe(dest)) + .detail("SrcID", srcId) + .detail("DestID", destId) + .detail("ReadVersion", tr.getReadVersion().get()); + + if (destId != dataMoveId) { + TraceEvent(SevVerbose, "CleanUpDataMoveSkipShard", dataMoveId) + .detail("DataMoveID", dataMoveId) + .detail("ShardRange", rangeIntersectKeys) + .detail("Src", describe(src)) + .detail("Dest", describe(dest)) + .detail("SrcID", srcId) + .detail("DestID", destId) + .detail("ReadVersion", tr.getReadVersion().get()); + continue; + } + + for (const auto& uid : dest) { + oldDests.insert(uid); + } + + krmSetPreviouslyEmptyRange(&tr, + keyServersPrefix, + rangeIntersectKeys, + keyServersValue(src, {}, srcId, UID()), + currentShards[i + 1].value); + } + + if (range.end == dataMove.range.end) { + tr.clear(dataMoveKeyFor(dataMoveId)); + complete = true; + TraceEvent(SevVerbose, "CleanUpDataMoveDeleteMetaData", dataMoveId) + .detail("DataMoveID", dataMove.toString()); + + } else { + dataMove.range = KeyRangeRef(range.end, dataMove.range.end); + dataMove.setPhase(DataMoveMetaData::Deleting); + tr.set(dataMoveKeyFor(dataMoveId), dataMoveValue(dataMove)); + TraceEvent(SevVerbose, "CleanUpDataMovePartial", dataMoveId) + .detail("DataMoveID", dataMoveId) + .detail("CurrentRange", range) + .detail("NewDataMove", dataMove.toString()); + } + + std::vector> actors; + for (const auto& uid : oldDests) { + actors.push_back(unassignServerKeys(&tr, uid, range, physicalShardMap[uid], dataMoveId)); + } + wait(waitForAll(actors)); + + wait(tr.commit()); + + TraceEvent(SevVerbose, "CleanUpDataMoveCommitted", dataMoveId) + .detail("DataMoveID", dataMoveId) + .detail("Range", range); + if (complete) { + break; + } + } catch (Error& e) { + state Error err = e; + wait(tr.onError(e)); + TraceEvent(SevWarn, "CleanUpDataMoveRetriableError", dataMoveId) + .error(err) + .detail("DataMoveRange", range.toString()); + } + } + } catch (Error& e) { + TraceEvent(SevWarn, "CleanUpDataMoveFail", dataMoveId).errorUnsuppressed(e); + throw; + } + + TraceEvent(SevDebug, "CleanUpDataMoveEnd", dataMoveId) + .detail("DataMoveID", dataMoveId) + .detail("DataMoveRange", range.toString()); + + return Void(); +} + ACTOR Future moveKeys(Database cx, + UID dataMoveId, KeyRange keys, std::vector destinationTeam, std::vector healthyDestinations, @@ -1468,33 +2448,60 @@ ACTOR Future moveKeys(Database cx, FlowLock* finishMoveKeysParallelismLock, bool hasRemote, UID relocationIntervalId, - const DDEnabledState* ddEnabledState) { + const DDEnabledState* ddEnabledState, + CancelConflictingDataMoves cancelConflictingDataMoves) { ASSERT(destinationTeam.size()); std::sort(destinationTeam.begin(), destinationTeam.end()); state std::map tssMapping; - wait(startMoveKeys(cx, - keys, - destinationTeam, - lock, - startMoveKeysParallelismLock, - relocationIntervalId, - &tssMapping, - ddEnabledState)); + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + wait(startMoveShards(cx, + dataMoveId, + keys, + destinationTeam, + lock, + startMoveKeysParallelismLock, + relocationIntervalId, + ddEnabledState, + cancelConflictingDataMoves)); + + } else { + wait(startMoveKeys(cx, + keys, + destinationTeam, + lock, + startMoveKeysParallelismLock, + relocationIntervalId, + &tssMapping, + ddEnabledState)); + } state Future completionSignaller = checkFetchingState(cx, healthyDestinations, keys, dataMovementComplete, relocationIntervalId, tssMapping); - wait(finishMoveKeys(cx, - keys, - destinationTeam, - lock, - finishMoveKeysParallelismLock, - hasRemote, - relocationIntervalId, - tssMapping, - ddEnabledState)); + if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + wait(finishMoveShards(cx, + dataMoveId, + keys, + destinationTeam, + lock, + finishMoveKeysParallelismLock, + hasRemote, + relocationIntervalId, + tssMapping, + ddEnabledState)); + } else { + wait(finishMoveKeys(cx, + keys, + destinationTeam, + lock, + finishMoveKeysParallelismLock, + hasRemote, + relocationIntervalId, + tssMapping, + ddEnabledState)); + } // This is defensive, but make sure that we always say that the movement is complete before moveKeys completes completionSignaller.cancel(); @@ -1556,9 +2563,21 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vectorSHARD_ENCODE_LOCATION_METADATA) { + const UID teamId = deterministicRandom()->randomUniqueID(); + ksValue = keyServersValue(serverSrcUID, /*dest=*/std::vector(), teamId, UID()); + krmSetPreviouslyEmptyRange(tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), ksValue, Value()); - for (auto& s : servers) { - krmSetPreviouslyEmptyRange(tr, arena, serverKeysPrefixFor(s.id()), allKeys, serverKeysTrue, serverKeysFalse); + for (auto& s : servers) { + krmSetPreviouslyEmptyRange( + tr, arena, serverKeysPrefixFor(s.id()), allKeys, serverKeysValue(teamId), serverKeysFalse); + } + } else { + krmSetPreviouslyEmptyRange(tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), ksValue, Value()); + + for (auto& s : servers) { + krmSetPreviouslyEmptyRange( + tr, arena, serverKeysPrefixFor(s.id()), allKeys, serverKeysTrue, serverKeysFalse); + } } } diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index f6db71ddd5..d6a5b6a8b3 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -142,6 +142,10 @@ int64_t getQueueSize(const TraceEventFields& md) { return inputBytes - durableBytes; } +int64_t getDurableVersion(const TraceEventFields& md) { + return boost::lexical_cast(md.getValue("DurableVersion")); +} + // Computes the popped version lag for tlogs int64_t getPoppedVersionLag(const TraceEventFields& md) { int64_t persistentDataDurableVersion = boost::lexical_cast(md.getValue("PersistentDataDurableVersion")); @@ -355,23 +359,42 @@ int64_t extractMaxQueueSize(const std::vector>& message } // Timeout wrapper when getting the storage metrics. This will do some additional tracing -ACTOR Future getStorageMetricsTimeout(UID storage, WorkerInterface wi) { - state Future result = - wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics"))); - state Future timeout = delay(1.0); - choose { - when(TraceEventFields res = wait(result)) { return res; } - when(wait(timeout)) { +ACTOR Future getStorageMetricsTimeout(UID storage, WorkerInterface wi, Version version) { + state int retries = 0; + loop { + ++retries; + state Future result = + wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics"))); + state Future timeout = delay(30.0); + choose { + when(TraceEventFields res = wait(result)) { + if (version == invalidVersion || getDurableVersion(res) >= static_cast(version)) { + return res; + } + } + when(wait(timeout)) { + TraceEvent("QuietDatabaseFailure") + .detail("Reason", "Could not fetch StorageMetrics") + .detail("Storage", format("%016" PRIx64, storage.first())); + throw timed_out(); + } + } + if (retries > 30) { TraceEvent("QuietDatabaseFailure") - .detail("Reason", "Could not fetch StorageMetrics") - .detail("Storage", format("%016" PRIx64, storage.first())); + .detail("Reason", "Could not fetch StorageMetrics x30") + .detail("Storage", format("%016" PRIx64, storage.first())) + .detail("Version", version); throw timed_out(); } + wait(delay(1.0)); } -}; +} // Gets the maximum size of all the storage server queues -ACTOR Future getMaxStorageServerQueueSize(Database cx, Reference const> dbInfo) { +ACTOR Future getMaxStorageServerQueueSize(Database cx, + Reference const> dbInfo, + Version version) { + TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers"); Future> serversFuture = getStorageServers(cx); @@ -394,7 +417,7 @@ ACTOR Future getMaxStorageServerQueueSize(Database cx, Referencesecond)); + messages.push_back(getStorageMetricsTimeout(servers[i].id(), itr->second, version)); } wait(waitForAll(messages)); @@ -767,7 +790,7 @@ ACTOR Future waitForQuietDatabase(Database cx, // To get around this, quiet Database will disable the perpetual wiggle in the setup phase. printf("Set perpetual_storage_wiggle=0 ...\n"); - wait(setPerpetualStorageWiggle(cx, false, LockAware::True)); + state Version version = wait(setPerpetualStorageWiggle(cx, false, LockAware::True)); printf("Set perpetual_storage_wiggle=0 Done.\n"); // Require 3 consecutive successful quiet database checks spaced 2 second apart @@ -784,7 +807,7 @@ ACTOR Future waitForQuietDatabase(Database cx, tLogQueueInfo = getTLogQueueInfo(cx, dbInfo); dataDistributionQueueSize = getDataDistributionQueueSize(cx, distributorWorker, dataInFlightGate == 0); teamCollectionValid = getTeamCollectionValid(cx, distributorWorker); - storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo); + storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo, version); dataDistributionActive = getDataDistributionActive(cx, distributorWorker); storageServersRecruiting = getStorageServersRecruiting(cx, distributorWorker, distributorUID); versionOffset = getVersionOffset(cx, distributorWorker, dbInfo); diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index fc74c9af19..57db74ae8e 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -1764,7 +1764,9 @@ void SimulationConfig::setProcessesPerMachine(const TestConfig& testConfig) { // Also configures the cluster behaviour through setting some flags on the simulator. void SimulationConfig::setTss(const TestConfig& testConfig) { int tssCount = 0; - if (!testConfig.simpleConfig && !testConfig.disableTss && deterministicRandom()->random01() < 0.25) { + // TODO: Support TSS in SHARD_ENCODE_LOCATION_METADATA mode. + if (!testConfig.simpleConfig && !testConfig.disableTss && !CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && + deterministicRandom()->random01() < 0.25) { // 1 or 2 tss tssCount = deterministicRandom()->randomInt(1, 3); } diff --git a/fdbserver/TCInfo.actor.cpp b/fdbserver/TCInfo.actor.cpp index dce3a422dd..98f5e81f21 100644 --- a/fdbserver/TCInfo.actor.cpp +++ b/fdbserver/TCInfo.actor.cpp @@ -319,6 +319,21 @@ TCTeamInfo::TCTeamInfo(std::vector> const& servers, Opti } } +// static +std::string TCTeamInfo::serversToString(std::vector servers) { + if (servers.empty()) { + return "[unset]"; + } + + std::sort(servers.begin(), servers.end()); + std::stringstream ss; + for (const auto& id : servers) { + ss << id.toString() << " "; + } + + return ss.str(); +} + std::vector TCTeamInfo::getLastKnownServerInterfaces() const { std::vector v; v.reserve(servers.size()); @@ -329,16 +344,7 @@ std::vector TCTeamInfo::getLastKnownServerInterfaces() c } std::string TCTeamInfo::getServerIDsStr() const { - std::stringstream ss; - - if (serverIDs.empty()) - return "[unset]"; - - for (const auto& id : serverIDs) { - ss << id.toString() << " "; - } - - return std::move(ss).str(); + return serversToString(this->serverIDs); } void TCTeamInfo::addDataInFlightToTeam(int64_t delta) { diff --git a/fdbserver/include/fdbserver/DataDistribution.actor.h b/fdbserver/include/fdbserver/DataDistribution.actor.h index fb369ec8fb..117ede61d0 100644 --- a/fdbserver/include/fdbserver/DataDistribution.actor.h +++ b/fdbserver/include/fdbserver/DataDistribution.actor.h @@ -24,23 +24,54 @@ #elif !defined(FDBSERVER_DATA_DISTRIBUTION_ACTOR_H) #define FDBSERVER_DATA_DISTRIBUTION_ACTOR_H -#include -#include #include "fdbclient/NativeAPI.actor.h" -#include "fdbserver/MoveKeys.actor.h" -#include "fdbserver/LogSystem.h" #include "fdbclient/RunTransaction.actor.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/LogSystem.h" +#include "fdbserver/MoveKeys.actor.h" +#include +#include + #include "flow/actorcompiler.h" // This must be the last #include. enum class RelocateReason { INVALID = -1, OTHER, REBALANCE_DISK, REBALANCE_READ }; +struct DDShardInfo; + +// Represents a data move in DD. +struct DataMove { + DataMove() : meta(DataMoveMetaData()), restore(false), valid(false), cancelled(false) {} + explicit DataMove(DataMoveMetaData meta, bool restore) + : meta(std::move(meta)), restore(restore), valid(true), cancelled(meta.getPhase() == DataMoveMetaData::Deleting) { + } + + // Checks if the DataMove is consistent with the shard. + void validateShard(const DDShardInfo& shard, KeyRangeRef range, int priority = SERVER_KNOBS->PRIORITY_RECOVER_MOVE); + + bool isCancelled() const { return this->cancelled; } + + const DataMoveMetaData meta; + bool restore; // The data move is scheduled by a previous DD, and is being recovered now. + bool valid; // The data move data is integral. + bool cancelled; // The data move has been cancelled. + std::vector primarySrc; + std::vector remoteSrc; + std::vector primaryDest; + std::vector remoteDest; +}; + struct RelocateShard { KeyRange keys; int priority; + bool cancelled; // The data move should be cancelled. + std::shared_ptr dataMove; // Not null if this is a restored data move. + UID dataMoveId; RelocateReason reason; - RelocateShard() : priority(0), reason(RelocateReason::INVALID) {} + RelocateShard() : priority(0), cancelled(false), dataMoveId(anonymousShardId), reason(RelocateReason::INVALID) {} RelocateShard(KeyRange const& keys, int priority, RelocateReason reason) - : keys(keys), priority(priority), reason(reason) {} + : keys(keys), priority(priority), cancelled(false), dataMoveId(anonymousShardId), reason(reason) {} + + bool isRestore() const { return this->dataMove != nullptr; } }; struct IDataDistributionTeam { @@ -88,6 +119,7 @@ FDB_DECLARE_BOOLEAN_PARAM(PreferLowerDiskUtil); FDB_DECLARE_BOOLEAN_PARAM(TeamMustHaveShards); FDB_DECLARE_BOOLEAN_PARAM(ForReadBalance); FDB_DECLARE_BOOLEAN_PARAM(PreferLowerReadUtil); +FDB_DECLARE_BOOLEAN_PARAM(FindTeamByServers); struct GetTeamRequest { bool wantsNewServers; // In additional to servers in completeSources, try to find teams with new server @@ -97,6 +129,7 @@ struct GetTeamRequest { bool forReadBalance; bool preferLowerReadUtil; // only make sense when forReadBalance is true double inflightPenalty; + bool findTeamByServers; std::vector completeSources; std::vector src; Promise>, bool>> reply; @@ -113,7 +146,13 @@ struct GetTeamRequest { double inflightPenalty = 1.0) : wantsNewServers(wantsNewServers), wantsTrueBest(wantsTrueBest), preferLowerDiskUtil(preferLowerDiskUtil), teamMustHaveShards(teamMustHaveShards), forReadBalance(forReadBalance), - preferLowerReadUtil(preferLowerReadUtil), inflightPenalty(inflightPenalty) {} + preferLowerReadUtil(preferLowerReadUtil), inflightPenalty(inflightPenalty), + findTeamByServers(FindTeamByServers::False) {} + GetTeamRequest(std::vector servers) + : wantsNewServers(WantNewServers::False), wantsTrueBest(WantTrueBest::False), + preferLowerDiskUtil(PreferLowerDiskUtil::False), teamMustHaveShards(TeamMustHaveShards::False), + forReadBalance(ForReadBalance::False), preferLowerReadUtil(PreferLowerReadUtil::False), inflightPenalty(1.0), + findTeamByServers(FindTeamByServers::True), src(std::move(servers)) {} // return true if a.score < b.score [[nodiscard]] bool lessCompare(TeamRef a, TeamRef b, int64_t aLoadBytes, int64_t bLoadBytes) const { @@ -129,7 +168,8 @@ struct GetTeamRequest { ss << "WantsNewServers:" << wantsNewServers << " WantsTrueBest:" << wantsTrueBest << " PreferLowerDiskUtil:" << preferLowerDiskUtil << " teamMustHaveShards:" << teamMustHaveShards - << "forReadBalance" << forReadBalance << " inflightPenalty:" << inflightPenalty << ";"; + << "forReadBalance" << forReadBalance << " inflightPenalty:" << inflightPenalty + << " findTeamByServers:" << findTeamByServers << ";"; ss << "CompleteSources:"; for (const auto& cs : completeSources) { ss << cs.toString() << ","; @@ -224,6 +264,8 @@ public: bool operator>=(const Team& r) const { return !(*this < r); } bool operator==(const Team& r) const { return servers == r.servers && primary == r.primary; } bool operator!=(const Team& r) const { return !(*this == r); } + + std::string toString() const { return describe(servers); }; }; // This tracks the data distribution on the data distribution server so that teamTrackers can @@ -254,6 +296,8 @@ public: void finishMove(KeyRangeRef keys); void check() const; + PromiseStream restartShardTracker; + private: struct OrderByTeamKey { bool operator()(const std::pair& lhs, const std::pair& rhs) const { @@ -283,17 +327,23 @@ struct DDShardInfo { std::vector primaryDest; std::vector remoteDest; bool hasDest; + UID srcId; + UID destId; explicit DDShardInfo(Key key) : key(key), hasDest(false) {} + DDShardInfo(Key key, UID srcId, UID destId) : key(key), hasDest(false), srcId(srcId), destId(destId) {} }; struct InitialDataDistribution : ReferenceCounted { + InitialDataDistribution() : dataMoveMap(std::make_shared()) {} + int mode; std::vector> allServers; std::set> primaryTeams; std::set> remoteTeams; std::vector shards; Optional initHealthyZoneValue; + KeyRangeMap> dataMoveMap; }; struct ShardMetrics { diff --git a/fdbserver/include/fdbserver/MoveKeys.actor.h b/fdbserver/include/fdbserver/MoveKeys.actor.h index 92a14b1eb6..b809b02521 100644 --- a/fdbserver/include/fdbserver/MoveKeys.actor.h +++ b/fdbserver/include/fdbserver/MoveKeys.actor.h @@ -25,12 +25,15 @@ #elif !defined(FDBSERVER_MOVEKEYS_ACTOR_H) #define FDBSERVER_MOVEKEYS_ACTOR_H -#include "fdbclient/NativeAPI.actor.h" #include "fdbclient/CommitTransaction.h" #include "fdbclient/KeyRangeMap.h" +#include "fdbclient/NativeAPI.actor.h" #include "fdbserver/MasterInterface.h" +#include "flow/BooleanParam.h" #include "flow/actorcompiler.h" +FDB_DECLARE_BOOLEAN_PARAM(CancelConflictingDataMoves); + struct MoveKeysLock { UID prevOwner, myOwner, prevWrite; template @@ -66,7 +69,13 @@ void seedShardServers(Arena& trArena, CommitTransactionRef& tr, std::vector moveKeys(Database occ, + UID dataMoveId, KeyRange keys, std::vector destinationTeam, std::vector healthyDestinations, @@ -76,11 +85,16 @@ ACTOR Future moveKeys(Database occ, FlowLock* finishMoveKeysParallelismLock, bool hasRemote, UID relocationIntervalId, // for logging only - const DDEnabledState* ddEnabledState); -// Eventually moves the given keys to the given destination team -// Caller is responsible for cancelling it before issuing an overlapping move, -// for restarting the remainder, and for not otherwise cancelling it before -// it returns (since it needs to execute the finishMoveKeys transaction). + const DDEnabledState* ddEnabledState, + CancelConflictingDataMoves cancelConflictingDataMoves = CancelConflictingDataMoves::False); + +// Cancels a data move designated by dataMoveId. +ACTOR Future cleanUpDataMove(Database occ, + UID dataMoveId, + MoveKeysLock lock, + FlowLock* cleanUpDataMoveParallelismLock, + KeyRange range, + const DDEnabledState* ddEnabledState); ACTOR Future> addStorageServer(Database cx, StorageServerInterface server); // Adds a newly recruited storage server to a database (e.g. adding it to FF/serverList) diff --git a/fdbserver/include/fdbserver/QuietDatabase.h b/fdbserver/include/fdbserver/QuietDatabase.h index b3a212156f..f2f23b5e6a 100644 --- a/fdbserver/include/fdbserver/QuietDatabase.h +++ b/fdbserver/include/fdbserver/QuietDatabase.h @@ -24,13 +24,16 @@ #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/DatabaseContext.h" // for clone() +#include "fdbclient/FDBTypes.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/WorkerInterface.actor.h" Future getDataInFlight(Database const& cx, Reference const> const&); Future> getTLogQueueInfo(Database const& cx, Reference const> const&); -Future getMaxStorageServerQueueSize(Database const& cx, Reference const> const&); +Future getMaxStorageServerQueueSize(Database const& cx, + Reference const> const&, + Version const& version); Future getDataDistributionQueueSize(Database const& cx, Reference const> const&, bool const& reportInFlight); diff --git a/fdbserver/include/fdbserver/TCInfo.h b/fdbserver/include/fdbserver/TCInfo.h index fb5b2bf57f..1abc393c0b 100644 --- a/fdbserver/include/fdbserver/TCInfo.h +++ b/fdbserver/include/fdbserver/TCInfo.h @@ -184,6 +184,8 @@ public: Optional>& getTenant() { return tenant; } + static std::string serversToString(std::vector servers); + std::string getTeamID() const override { return id.shortString(); } std::vector getLastKnownServerInterfaces() const override; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 9a7a97c1eb..10125b09f9 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -6678,6 +6678,7 @@ private: KeyRef startKey; bool nowAssigned; bool emptyRange; + UID dataMoveId; bool processedStartKey; KeyRef cacheStartKey; @@ -6712,8 +6713,7 @@ private: // We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same // keys startKey = m.param1; - nowAssigned = m.param2 != serverKeysFalse; - emptyRange = m.param2 == serverKeysTrueEmptyRange; + decodeServerKeysValue(m.param2, nowAssigned, emptyRange, dataMoveId); processedStartKey = true; } else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) { // lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version) diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 273cd9f6b0..b7fd85c554 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -1652,7 +1652,7 @@ ACTOR Future runTests(ReferencedbInfo)); + int64_t maxStorageServerQueueSize = + wait(getMaxStorageServerQueueSize(cx, self->dbInfo, invalidVersion)); if (maxStorageServerQueueSize > 0) { TraceEvent("ConsistencyCheck_ExceedStorageServerQueueLimit") .detail("MaxQueueSize", maxStorageServerQueueSize); diff --git a/fdbserver/workloads/DataLossRecovery.actor.cpp b/fdbserver/workloads/DataLossRecovery.actor.cpp index a5f7d24af2..048b6d4bec 100644 --- a/fdbserver/workloads/DataLossRecovery.actor.cpp +++ b/fdbserver/workloads/DataLossRecovery.actor.cpp @@ -49,7 +49,7 @@ struct DataLossRecoveryWorkload : TestWorkload { NetworkAddress addr; DataLossRecoveryWorkload(WorkloadContext const& wcx) - : TestWorkload(wcx), startMoveKeysParallelismLock(1), finishMoveKeysParallelismLock(1), enabled(!clientId), + : TestWorkload(wcx), startMoveKeysParallelismLock(5), finishMoveKeysParallelismLock(5), enabled(!clientId), pass(true) {} void validationFailed(ErrorOr> expectedValue, ErrorOr> actualValue) { @@ -78,19 +78,26 @@ struct DataLossRecoveryWorkload : TestWorkload { wait(self->writeAndVerify(self, cx, key, oldValue)); + TraceEvent("DataLossRecovery").detail("Phase", "InitialWrites"); // Move [key, endKey) to team: {address}. state NetworkAddress address = wait(self->disableDDAndMoveShard(self, cx, KeyRangeRef(key, endKey))); + TraceEvent("DataLossRecovery").detail("Phase", "Moved"); wait(self->readAndVerify(self, cx, key, oldValue)); + TraceEvent("DataLossRecovery").detail("Phase", "ReadAfterMove"); // Kill team {address}, and expect read to timeout. self->killProcess(self, address); + TraceEvent("DataLossRecovery").detail("Phase", "KilledProcess"); wait(self->readAndVerify(self, cx, key, timed_out())); + TraceEvent("DataLossRecovery").detail("Phase", "VerifiedReadTimeout"); // Reenable DD and exclude address as fail, so that [key, endKey) will be dropped and moved to a new team. // Expect read to return 'value not found'. int ignore = wait(setDDMode(cx, 1)); wait(self->exclude(cx, address)); + TraceEvent("DataLossRecovery").detail("Phase", "Excluded"); wait(self->readAndVerify(self, cx, key, Optional())); + TraceEvent("DataLossRecovery").detail("Phase", "VerifiedDataDropped"); // Write will scceed. wait(self->writeAndVerify(self, cx, key, newValue)); @@ -172,6 +179,7 @@ struct DataLossRecoveryWorkload : TestWorkload { ACTOR Future disableDDAndMoveShard(DataLossRecoveryWorkload* self, Database cx, KeyRange keys) { // Disable DD to avoid DD undoing of our move. state int ignore = wait(setDDMode(cx, 0)); + TraceEvent("DataLossRecovery").detail("Phase", "DisabledDD"); state NetworkAddress addr; // Pick a random SS as the dest, keys will reside on a single server after the move. @@ -203,7 +211,9 @@ struct DataLossRecoveryWorkload : TestWorkload { MoveKeysLock moveKeysLock; moveKeysLock.myOwner = owner; + TraceEvent("DataLossRecovery").detail("Phase", "StartMoveKeys"); wait(moveKeys(cx, + deterministicRandom()->randomUniqueID(), keys, dest, dest, @@ -213,9 +223,11 @@ struct DataLossRecoveryWorkload : TestWorkload { &self->finishMoveKeysParallelismLock, false, UID(), // for logging only - &ddEnabledState)); + &ddEnabledState, + CancelConflictingDataMoves::True)); break; } catch (Error& e) { + TraceEvent("DataLossRecovery").error(e).detail("Phase", "MoveRangeError"); if (e.code() == error_code_movekeys_conflict) { // Conflict on moveKeysLocks with the current running DD is expected, just retry. tr.reset(); diff --git a/fdbserver/workloads/RandomMoveKeys.actor.cpp b/fdbserver/workloads/RandomMoveKeys.actor.cpp index de6255c000..6ec85f8381 100644 --- a/fdbserver/workloads/RandomMoveKeys.actor.cpp +++ b/fdbserver/workloads/RandomMoveKeys.actor.cpp @@ -143,6 +143,7 @@ struct MoveKeysWorkload : TestWorkload { state Promise signal; state DDEnabledState ddEnabledState; wait(moveKeys(cx, + deterministicRandom()->randomUniqueID(), keys, destinationTeamIDs, destinationTeamIDs, @@ -152,7 +153,8 @@ struct MoveKeysWorkload : TestWorkload { &fl2, false, relocateShardInterval.pairID, - &ddEnabledState)); + &ddEnabledState, + CancelConflictingDataMoves::True)); TraceEvent(relocateShardInterval.end()).detail("Result", "Success"); return Void(); } catch (Error& e) { diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 4b82c77438..1d0923bd55 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -209,7 +209,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) { init( ZERO_LENGTH_FILE_PAD, 1 ); init( TRACE_FLUSH_INTERVAL, 0.25 ); init( TRACE_RETRY_OPEN_INTERVAL, 1.00 ); - init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10 ); // Related to the trace severity in Trace.h + init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10 ); // Related to the trace severity in Trace.h init( MAX_TRACE_SUPPRESSIONS, 1e4 ); init( TRACE_DATETIME_ENABLED, true ); // trace time in human readable format (always real time) init( TRACE_SYNC_ENABLED, 0 ); diff --git a/flow/include/flow/ProtocolVersion.h b/flow/include/flow/ProtocolVersion.h index cec0cddbb6..323dcffcf0 100644 --- a/flow/include/flow/ProtocolVersion.h +++ b/flow/include/flow/ProtocolVersion.h @@ -173,6 +173,7 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext); PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking); PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, EncryptionAtRest); + PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, ShardEncodeLocationMetaData); }; template <> diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h index 366d1b414b..519a0cb8b0 100755 --- a/flow/include/flow/error_definitions.h +++ b/flow/include/flow/error_definitions.h @@ -95,6 +95,8 @@ ERROR( page_encoding_not_supported, 1071, "Page encoding type is not supported o ERROR( page_decoding_failed, 1072, "Page content decoding failed" ) ERROR( unexpected_encoding_type, 1073, "Page content decoding failed" ) ERROR( encryption_key_not_found, 1074, "Encryption key not found" ) +ERROR( data_move_cancelled, 1075, "Data move was cancelled" ) +ERROR( data_move_dest_team_not_found, 1076, "Dest team was not found for data move" ) ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" ) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ee5360ba99..0b49bd5089 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -186,6 +186,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/WriteDuringRead.toml) add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml) add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml UNIT) + add_fdb_test(TEST_FILES noSim/SystemDataTest.toml UNIT) if (WITH_ROCKSDB_EXPERIMENTAL) add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml) add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT)