This commit is contained in:
Yao Xiao 2024-07-18 11:28:34 -07:00 committed by GitHub
parent 078aaf153c
commit c630fa2296
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 32 additions and 46 deletions

View File

@ -359,6 +359,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL, 30.0 ); if( isSimulated ) DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL = 5.0;
init( REBALANCE_STORAGE_QUEUE_SHARD_PER_KSEC_MIN, SHARD_MIN_BYTES_PER_KSEC);
init( DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD, true ); if ( isSimulated ) DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD = deterministicRandom()->coinflip();
init( DD_WAIT_TSS_DATA_MOVE_DELAY, 15.0 ); if (isSimulated) DD_WAIT_TSS_DATA_MOVE_DELAY = deterministicRandom()->randomInt(5, 30);
// Large teams are disabled when SHARD_ENCODE_LOCATION_METADATA is enabled
init( DD_MAX_SHARDS_ON_LARGE_TEAMS, 100 ); if( randomize && BUGGIFY ) DD_MAX_SHARDS_ON_LARGE_TEAMS = deterministicRandom()->randomInt(0, 3);

View File

@ -344,6 +344,7 @@ public:
int64_t REBALANCE_STORAGE_QUEUE_SHARD_PER_KSEC_MIN;
bool DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD; // Enable to allow storage queue rebalancer to move
// light-traffic shards out of the overloading server
double DD_WAIT_TSS_DATA_MOVE_DELAY;
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor

View File

@ -19,6 +19,7 @@
*/
#include <vector>
#include <limits.h>
#include "fdbclient/BlobRestoreCommon.h"
#include "fdbclient/FDBOptions.g.h"
@ -2001,10 +2002,9 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
wait(finishMoveKeysParallelismLock->take(TaskPriority::DataDistributionLaunch));
state FlowLock::Releaser releaser = FlowLock::Releaser(*finishMoveKeysParallelismLock);
state std::unordered_set<UID> tssToIgnore;
// try waiting for tss for a 2 loops, give up if they're behind to not affect the rest of the cluster
state int waitForTSSCounter = 2;
state bool runPreCheck = true;
state bool skipTss = false;
state double ssReadyTime = std::numeric_limits<double>::max();
ASSERT(!destinationTeam.empty());
@ -2161,16 +2161,16 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
// Wait for new destination servers to fetch the data range.
serverReady.reserve(storageServerInterfaces.size());
tssReady.reserve(storageServerInterfaces.size());
tssReadyInterfs.reserve(storageServerInterfaces.size());
for (int s = 0; s < storageServerInterfaces.size(); s++) {
serverReady.push_back(waitForShardReady(
storageServerInterfaces[s], range, tr.getReadVersion().get(), GetShardStateRequest::READABLE));
if (skipTss)
continue;
auto tssPair = tssMapping.find(storageServerInterfaces[s].id());
if (tssPair != tssMapping.end() && waitForTSSCounter > 0 &&
!tssToIgnore.count(tssPair->second.id())) {
if (tssPair != tssMapping.end()) {
tssReadyInterfs.push_back(tssPair->second);
tssReady.push_back(waitForShardReady(
tssPair->second, range, tr.getReadVersion().get(), GetShardStateRequest::READABLE));
@ -2188,40 +2188,7 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
Void(),
TaskPriority::MoveKeys));
// Check to see if we're waiting only on tss. If so, decrement the waiting counter.
// If the waiting counter is zero, ignore the slow/non-responsive tss processes before finalizing
// the data move.
if (tssReady.size()) {
bool allSSDone = true;
for (auto& f : serverReady) {
allSSDone &= f.isReady() && !f.isError();
if (!allSSDone) {
break;
}
}
if (allSSDone) {
bool anyTssNotDone = false;
for (auto& f : tssReady) {
if (!f.isReady() || f.isError()) {
anyTssNotDone = true;
waitForTSSCounter--;
break;
}
}
if (anyTssNotDone && waitForTSSCounter == 0) {
for (int i = 0; i < tssReady.size(); i++) {
if (!tssReady[i].isReady() || tssReady[i].isError()) {
tssToIgnore.insert(tssReadyInterfs[i].id());
}
}
}
}
}
std::vector<UID> readyServers;
state std::vector<UID> readyServers;
for (int s = 0; s < serverReady.size(); ++s) {
if (serverReady[s].isReady() && !serverReady[s].isError()) {
readyServers.push_back(storageServerInterfaces[s].uniqueID);
@ -2229,7 +2196,24 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
}
int tssCount = 0;
for (int s = 0; s < tssReady.size(); s++) {
tssCount += tssReady[s].isReady() && !tssReady[s].isError();
if (tssReady[s].isReady() && !tssReady[s].isError()) {
tssCount += 1;
}
}
if (readyServers.size() == serverReady.size() && !skipTss) {
ssReadyTime = std::min(now(), ssReadyTime);
if (tssCount < tssReady.size() &&
now() - ssReadyTime >= SERVER_KNOBS->DD_WAIT_TSS_DATA_MOVE_DELAY) {
skipTss = true;
TraceEvent(SevWarnAlways, "FinishMoveShardsSkipTSS")
.detail("DataMoveID", dataMoveId)
.detail("ReadyServers", describe(readyServers))
.detail("NewDestinations", describe(newDestinations))
.detail("ReadyTSS", tssCount)
.detail("TSSInfo", describe(tssReadyInterfs))
.detail("SSReadyTime", ssReadyTime);
}
}
TraceEvent(sevDm, "FinishMoveShardsWaitedServers", relocationIntervalId)
@ -2309,7 +2293,7 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
retries++;
if (retries % 10 == 0) {
TraceEvent(retries == 20 ? SevWarnAlways : SevWarn,
"RelocateShard_FinishMoveKeysRetrying",
"RelocateShard_FinishMoveShardsRetrying",
relocationIntervalId)
.error(err)
.detail("DataMoveID", dataMoveId);

View File

@ -10386,15 +10386,14 @@ void changeServerKeysWithPhysicalShards(StorageServer* data,
} else {
ASSERT(shard->adding != nullptr || shard->moveInShard != nullptr);
if (shard->desiredShardId != desiredId) {
TraceEvent(SevError, "CSKConflictingMoveInShards", data->thisServerID)
TraceEvent(SevWarnAlways, "CSKConflictingMoveInShards", data->thisServerID)
.detail("DataMoveID", dataMoveId)
.detail("Range", range)
.detailf("TargetShard", "%016llx", desiredId)
.detailf("CurrentShard", "%016llx", shard->desiredShardId)
.detail("IsTSS", data->isTss())
.detail("Version", cVer);
// TODO(heliu): Mark the data move as failed locally, instead of crashing ss.
ASSERT(false);
throw data_move_conflict();
} else {
TraceEvent(SevInfo, "CSKMoveInToSameShard", data->thisServerID)
.detail("DataMoveID", dataMoveId)

View File

@ -107,6 +107,7 @@ ERROR( unknown_storage_engine, 1082, "Storage engine type is not recognized." )
ERROR( duplicate_snapshot_request, 1083, "A duplicate snapshot request has been sent, the old request is discarded.")
ERROR( dd_config_changed, 1084, "DataDistribution configuration changed." )
ERROR( consistency_check_urgent_task_failed, 1085, "Consistency check urgent task is failed")
ERROR( data_move_conflict, 1086, "Data move conflict in SS")
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )