diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 38e7beb341..8cbb77defb 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -210,6 +210,7 @@ void ServerKnobs::initialize(Randomize _randomize, ClientKnobs* clientKnobs, IsS init( ALL_DATA_REMOVED_DELAY, 1.0 ); init( INITIAL_FAILURE_REACTION_DELAY, 30.0 ); if( randomize && BUGGIFY ) INITIAL_FAILURE_REACTION_DELAY = 0.0; init( CHECK_TEAM_DELAY, 30.0 ); + init( PERPETUAL_WIGGLE_DELAY, 50.0 ); init( LOG_ON_COMPLETION_DELAY, DD_QUEUE_LOGGING_INTERVAL ); init( BEST_TEAM_MAX_TEAM_TRIES, 10 ); init( BEST_TEAM_OPTION_COUNT, 4 ); diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index c704541a2e..30468c7e84 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -160,6 +160,7 @@ public: double ALL_DATA_REMOVED_DELAY; double INITIAL_FAILURE_REACTION_DELAY; double CHECK_TEAM_DELAY; + double PERPETUAL_WIGGLE_DELAY; double LOG_ON_COMPLETION_DELAY; int BEST_TEAM_MAX_TEAM_TRIES; int BEST_TEAM_OPTION_COUNT; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 121409337f..3dd6aceeec 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -2840,8 +2840,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> { } this->wiggle_addresses.push_back(addr); this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING); - moveFutures.push_back( - waitForAllDataRemoved(this->cx, info->lastKnownInterface.id(), info->addedVersion, this)); + moveFutures.push_back(info->onRemoved); } if (!moveFutures.empty()) { this->restartRecruiting.trigger(); @@ -3898,29 +3897,27 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL // to a sorted PID set maintained by the data distributor. If now no storage server exists, the new Process ID is 0. ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection) { state ReadYourWritesTransaction tr(teamCollection->cx); - state Value writeValue = LiteralStringRef("0"); + state Value writeValue; loop { try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); Optional<Value> value = wait(tr.get(wigglingStorageServerKey)); if (teamCollection->pid2server_info.empty()) { - tr.set(wigglingStorageServerKey, LiteralStringRef("0")); + writeValue = LiteralStringRef(""); } else { Value pid = teamCollection->pid2server_info.begin()->first; if (value.present()) { auto nextIt = teamCollection->pid2server_info.upper_bound(value.get()); if (nextIt == teamCollection->pid2server_info.end()) { - tr.set(wigglingStorageServerKey, pid); writeValue = pid; } else { - tr.set(wigglingStorageServerKey, nextIt->first); writeValue = nextIt->first; } } else { - tr.set(wigglingStorageServerKey, pid); writeValue = pid; } } + tr.set(wigglingStorageServerKey, writeValue); wait(tr.commit()); break; } catch (Error& e) { @@ -3939,10 +3936,20 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal, FutureStream<Void> finishStorageWiggleSignal, DDTeamCollection* teamCollection) { + state int lastFinishTime = now(); loop { choose { when(wait(stopSignal->onChange())) {} - when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); } + when(waitNext(finishStorageWiggleSignal)) { + state bool takeRest = true; // delay to avoid delete and update ServerList too frequently + while (takeRest) { + wait(delayJittered(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY)); + // there must not have other teams to place wiggled data + takeRest = teamCollection->server_info.size() <= teamCollection->configuration.storageTeamSize || + teamCollection->machine_info.size() < teamCollection->configuration.storageTeamSize; + } + wait(updateNextWigglingStoragePID(teamCollection)); + } } if (stopSignal->get()) { break; @@ -3976,17 +3983,24 @@ ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(DDTe } // periodically check whether the cluster is healthy if we continue perpetual wiggle -ACTOR Future<Void> clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self) { +ACTOR Future<Void> clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self, int* extraTeamCount) { + state int pausePenalty = 1; loop { Promise<int> countp; self->getUnhealthyRelocationCount.send(countp); int count = wait(countp.getFuture()); // pause wiggle when // a. DDQueue is busy with unhealthy relocation request - // b. no healthy team + // b. healthy teams are not enough // c. the overall disk space is not enough - if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD || self->healthyTeamCount == 0 || + if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD || self->healthyTeamCount <= *extraTeamCount || self->bestTeamStuck) { + // if we pause wiggle not because the reason a, increase extraTeamCount. This helps avoid oscillation + // between pause and non-pause status. + if ((self->healthyTeamCount <= *extraTeamCount || self->bestTeamStuck) && !self->pauseWiggle->get()) { + *extraTeamCount = std::min(*extraTeamCount + pausePenalty, (int)self->teams.size()); + pausePenalty = std::min(pausePenalty * 2, (int)self->teams.size()); + } self->pauseWiggle->set(true); } else { self->pauseWiggle->set(false); @@ -4004,9 +4018,9 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal, const DDEnabledState* ddEnabledState) { state Future<Void> watchFuture = Never(); state Future<Void> moveFinishFuture = Never(); - state Future<Void> ddQueueCheck = clusterHealthCheckForPerpetualWiggle(self); + state int extraTeamCount = 0; + state Future<Void> ddQueueCheck = clusterHealthCheckForPerpetualWiggle(self, &extraTeamCount); state int movingCount = 0; - state vector<UID> excludedServerIds; state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self)); ASSERT(!self->wigglingPid.present()); // only single process wiggle is allowed self->wigglingPid = Optional<Key>(res.second); @@ -4020,26 +4034,19 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal, self->includeStorageServersForWiggle(); TraceEvent("PerpetualStorageWigglePause", self->distributorId) .detail("ProcessId", pid) + .detail("ExtraHealthyTeamCount", extraTeamCount) + .detail("HealthyTeamCount", self->healthyTeamCount) .detail("StorageCount", movingCount); } else { - // pre-check whether wiggling chosen servers still satisfy replica requirement - excludedServerIds.clear(); - for (const auto& info : self->pid2server_info[self->wigglingPid.get()]) { - excludedServerIds.push_back(info->id); - } - if (_exclusionSafetyCheck(excludedServerIds, self)) { - TEST(true); // start wiggling - auto fv = self->excludeStorageServersForWiggle(pid); - movingCount = fv.size(); - moveFinishFuture = waitForAll(fv); - TraceEvent("PerpetualStorageWiggleStart", self->distributorId) - .detail("ProcessId", pid) - .detail("StorageCount", movingCount); - } else { - TEST(true); // skip wiggling current process - TraceEvent("PerpetualStorageWiggleSkip", self->distributorId).detail("ProcessId", pid.toString()); - moveFinishFuture = Void(); - } + TEST(true); // start wiggling + auto fv = self->excludeStorageServersForWiggle(pid); + movingCount = fv.size(); + moveFinishFuture = waitForAll(fv); + TraceEvent("PerpetualStorageWiggleStart", self->distributorId) + .detail("ProcessId", pid) + .detail("ExtraHealthyTeamCount", extraTeamCount) + .detail("HealthyTeamCount", self->healthyTeamCount) + .detail("StorageCount", movingCount); } } @@ -4055,9 +4062,9 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal, wait(delayJittered(5.0, TaskPriority::DataDistributionLow)); } when(wait(moveFinishFuture)) { - TEST(true); // finish wiggling this process ASSERT(self->wigglingPid.present()); StringRef pid = self->wigglingPid.get(); + TEST(pid != LiteralStringRef("")); // finish wiggling this process moveFinishFuture = Never(); self->includeStorageServersForWiggle(); @@ -4068,6 +4075,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal, self->wigglingPid.reset(); watchFuture = res.first; finishStorageWiggleSignal.send(Void()); + extraTeamCount = std::max(0, extraTeamCount - 1); } when(wait(ddQueueCheck || self->pauseWiggle->onChange() || stopSignal->onChange())) {} } @@ -4093,7 +4101,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal, ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollection, const DDEnabledState* ddEnabledState) { state int speed = 0; - state AsyncVar<bool> stopWiggleSignal(false); + state AsyncVar<bool> stopWiggleSignal(true); state PromiseStream<Void> finishStorageWiggleSignal; state SignalableActorCollection collection; teamCollection->pauseWiggle = makeReference<AsyncVar<bool>>(true); @@ -4119,11 +4127,13 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio collection.add(perpetualStorageWiggler( &stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState)); TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId); - } else if (speed == 0 && !stopWiggleSignal.get()) { - stopWiggleSignal.set(true); - wait(collection.signalAndReset()); + } else if (speed == 0) { + if (!stopWiggleSignal.get()) { + stopWiggleSignal.set(true); + wait(collection.signalAndReset()); + teamCollection->pauseWiggle->set(true); + } TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId); - teamCollection->pauseWiggle->set(true); } wait(watchFuture); break; diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index bb87bf4e33..ba8e0f416a 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1662,6 +1662,8 @@ ACTOR Future<Void> dataDistributionQueue(Database cx, self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]) .detail("PriorityRebalanceOverutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]) + .detail("PriorityStorageWiggle", + self.priority_relocations[SERVER_KNOBS->PRIORITY_PERPETUAL_STORAGE_WIGGLE]) .detail("PriorityTeamHealthy", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_HEALTHY]) .detail("PriorityTeamContainsUndesiredServer", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER])