Change SHARD_ENCODE_LOCATION_METADATA to a server knob. (#7770)

Co-authored-by: He Liu <heliu@apple.com>
This commit is contained in:
He Liu 2022-08-03 13:51:40 -07:00 committed by GitHub
parent 4f4d32de8e
commit fa418fd784
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 23 additions and 22 deletions

View File

@ -111,7 +111,6 @@ void ClientKnobs::initialize(Randomize randomize) {
init( RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT, 20 );
init( QUARANTINE_TSS_ON_MISMATCH, true ); if( randomize && BUGGIFY ) QUARANTINE_TSS_ON_MISMATCH = false; // if true, a tss mismatch will put the offending tss in quarantine. If false, it will just be killed
init( CHANGE_FEED_EMPTY_BATCH_TIME, 0.005 );
init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true;
//KeyRangeMap
init( KRM_GET_RANGE_LIMIT, 1e5 ); if( randomize && BUGGIFY ) KRM_GET_RANGE_LIMIT = 10;

View File

@ -167,6 +167,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( PRIORITY_SPLIT_SHARD, 950 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 350;
// Data distribution
init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true;
init( READ_REBALANCE_CPU_THRESHOLD, 15.0 );
init( READ_REBALANCE_SRC_PARALLELISM, 20 );
init( READ_REBALANCE_SHARD_TOPK, READ_REBALANCE_SRC_PARALLELISM * 2 );

View File

@ -109,7 +109,6 @@ public:
int RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT;
bool QUARANTINE_TSS_ON_MISMATCH;
double CHANGE_FEED_EMPTY_BATCH_TIME;
bool SHARD_ENCODE_LOCATION_METADATA;
// KeyRangeMap
int KRM_GET_RANGE_LIMIT;

View File

@ -163,6 +163,8 @@ public:
int PRIORITY_SPLIT_SHARD;
// Data distribution
bool SHARD_ENCODE_LOCATION_METADATA; // If true, location metadata will contain shard ID.
double READ_REBALANCE_CPU_THRESHOLD; // read rebalance only happens if the source servers' CPU > threshold
int READ_REBALANCE_SRC_PARALLELISM; // the max count a server become a source server within a certain interval
int READ_REBALANCE_SHARD_TOPK; // top k shards from which to select randomly for read-rebalance

View File

@ -349,7 +349,7 @@ class DDTxnProcessorImpl {
// a dummy shard at the end with no keys or servers makes life easier for trackInitialShards()
result->shards.push_back(DDShardInfo(allKeys.end));
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && numDataMoves > 0) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && numDataMoves > 0) {
for (int shard = 0; shard < result->shards.size() - 1; ++shard) {
const DDShardInfo& iShard = result->shards[shard];
KeyRangeRef keys = KeyRangeRef(iShard.key, result->shards[shard + 1].key);

View File

@ -491,7 +491,7 @@ public:
for (; it != self->initData->dataMoveMap.ranges().end(); ++it) {
const DataMoveMetaData& meta = it.value()->meta;
if (it.value()->isCancelled() || (it.value()->valid && !CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) {
if (it.value()->isCancelled() || (it.value()->valid && !SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) {
RelocateShard rs(meta.range, DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER);
rs.dataMoveId = meta.id;
rs.cancelled = true;

View File

@ -1099,7 +1099,7 @@ struct DDQueueData {
}
Future<Void> fCleanup =
CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA ? cancelDataMove(this, rd.keys, ddEnabledState) : Void();
SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA ? cancelDataMove(this, rd.keys, ddEnabledState) : Void();
// If there is a job in flight that wants data relocation which we are about to cancel/modify,
// make sure that we keep the relocation intent for the job that we launch
@ -1121,13 +1121,13 @@ struct DDQueueData {
rrs.keys = ranges[r];
if (rd.keys == ranges[r] && rd.isRestore()) {
ASSERT(rd.dataMove != nullptr);
ASSERT(CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
rrs.dataMoveId = rd.dataMove->meta.id;
} else {
ASSERT_WE_THINK(!rd.isRestore()); // Restored data move should not overlap.
// TODO(psm): The shard id is determined by DD.
rrs.dataMove.reset();
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
rrs.dataMoveId = deterministicRandom()->randomUniqueID();
} else {
rrs.dataMoveId = anonymousShardId;
@ -1290,7 +1290,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self,
self->suppressIntervals = 0;
}
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
auto inFlightRange = self->inFlight.rangeContaining(rd.keys.begin);
ASSERT(inFlightRange.range() == rd.keys);
ASSERT(inFlightRange.value().randomId == rd.randomId);
@ -1332,7 +1332,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self,
bestTeams.clear();
// Get team from teamCollections in different DCs and find the best one
while (tciIndex < self->teamCollections.size()) {
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && rd.isRestore()) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && rd.isRestore()) {
auto req = GetTeamRequest(tciIndex == 0 ? rd.dataMove->primaryDest : rd.dataMove->remoteDest);
Future<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> fbestTeam =
brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req));
@ -1579,7 +1579,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self,
CancelConflictingDataMoves::False);
} else {
self->fetchKeysComplete.insert(rd);
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
auto ranges = self->dataMoves.getAffectedRangesAfterInsertion(rd.keys);
if (ranges.size() == 1 && static_cast<KeyRange>(ranges[0]) == rd.keys &&
ranges[0].value.id == rd.dataMoveId && !ranges[0].value.cancel.isValid()) {

View File

@ -296,7 +296,7 @@ ACTOR Future<Void> cleanUpSingleShardDataMove(Database occ,
FlowLock* cleanUpDataMoveParallelismLock,
UID dataMoveId,
const DDEnabledState* ddEnabledState) {
ASSERT(CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
TraceEvent(SevInfo, "CleanUpSingleShardDataMoveBegin", dataMoveId).detail("Range", keys);
loop {
@ -1226,7 +1226,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
UID relocationIntervalId,
const DDEnabledState* ddEnabledState,
CancelConflictingDataMoves cancelConflictingDataMoves) {
ASSERT(CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
state Future<Void> warningLogger = logWarningAfter("StartMoveShardsTooLong", 600, servers);
wait(startMoveKeysLock->take(TaskPriority::DataDistributionLaunch));
@ -1561,7 +1561,7 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
UID relocationIntervalId,
std::map<UID, StorageServerInterface> tssMapping,
const DDEnabledState* ddEnabledState) {
ASSERT(CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
state KeyRange keys = targetKeys;
state Future<Void> warningLogger = logWarningAfter("FinishMoveShardsTooLong", 600, destinationTeam);
state int retries = 0;
@ -2225,7 +2225,7 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
const UID shardId = newShardId(deterministicRandom()->randomUInt64(), AssignEmptyRange::True);
// Assign the shard to teamForDroppedRange in keyServer space.
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
tr.set(keyServersKey(it.key), keyServersValue(teamForDroppedRange, {}, shardId, UID()));
} else {
tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange));
@ -2242,7 +2242,7 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
// Assign the shard to the new team as an empty range.
// Note, there could be data loss.
for (const UID& id : teamForDroppedRange) {
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
actors.push_back(krmSetRangeCoalescing(
&tr, serverKeysPrefixFor(id), range, allKeys, serverKeysValue(shardId)));
} else {
@ -2462,7 +2462,7 @@ ACTOR Future<Void> moveKeys(Database cx,
state std::map<UID, StorageServerInterface> tssMapping;
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
wait(startMoveShards(cx,
dataMoveId,
keys,
@ -2487,7 +2487,7 @@ ACTOR Future<Void> moveKeys(Database cx,
state Future<Void> completionSignaller =
checkFetchingState(cx, healthyDestinations, keys, dataMovementComplete, relocationIntervalId, tssMapping);
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
wait(finishMoveShards(cx,
dataMoveId,
keys,
@ -2570,7 +2570,7 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector<Storag
// We have to set this range in two blocks, because the master tracking of "keyServersLocations" depends on a change
// to a specific
// key (keyServersKeyServersKey)
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
const UID teamId = deterministicRandom()->randomUniqueID();
ksValue = keyServersValue(serverSrcUID, /*dest=*/std::vector<UID>(), teamId, UID());
krmSetPreviouslyEmptyRange(tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), ksValue, Value());

View File

@ -1792,7 +1792,7 @@ void SimulationConfig::setProcessesPerMachine(const TestConfig& testConfig) {
void SimulationConfig::setTss(const TestConfig& testConfig) {
int tssCount = 0;
// TODO: Support TSS in SHARD_ENCODE_LOCATION_METADATA mode.
if (!testConfig.simpleConfig && !testConfig.disableTss && !CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA &&
if (!testConfig.simpleConfig && !testConfig.disableTss && !SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA &&
deterministicRandom()->random01() < 0.25) {
// 1 or 2 tss
tssCount = deterministicRandom()->randomInt(1, 3);
@ -2394,7 +2394,7 @@ ACTOR void setupAndRun(std::string dataFolder,
state bool allowDisablingTenants = testConfig.allowDisablingTenants;
state bool allowCreatingTenants = true;
if (!CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
if (!SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
testConfig.storageEngineExcludeTypes.push_back(5);
}

View File

@ -1251,7 +1251,7 @@ public:
newestAvailableVersion.insert(allKeys, invalidVersion);
newestDirtyVersion.insert(allKeys, invalidVersion);
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA &&
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA &&
(SERVER_KNOBS->STORAGE_SERVER_SHARD_AWARE || storage->shardAware())) {
addShard(ShardInfo::newShard(this, StorageServerShard::notAssigned(allKeys)));
} else {
@ -10358,7 +10358,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder) {
state StorageServer self(persistentData, db, ssi);
self.shardAware = CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA &&
self.shardAware = SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA &&
(SERVER_KNOBS->STORAGE_SERVER_SHARD_AWARE || persistentData->shardAware());
state Future<Void> ssCore;
self.clusterId.send(clusterId);