Add delay between quickly re-recruiting the same singleton process, to avoid recruit thrashing when there are temporarily multiple cluster controllers (#7000)

This commit is contained in:
Josh Slocum 2022-04-28 17:45:09 -05:00 committed by GitHub
parent 13a388ff14
commit db6d7396ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 33 deletions

View File

@ -542,6 +542,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( CC_ENABLE_ENTIRE_SATELLITE_MONITORING, false ); init( CC_ENABLE_ENTIRE_SATELLITE_MONITORING, false );
init( CC_SATELLITE_DEGRADATION_MIN_COMPLAINER, 3 ); init( CC_SATELLITE_DEGRADATION_MIN_COMPLAINER, 3 );
init( CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER, 3 ); init( CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER, 3 );
init( CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL, 0.5 );
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0; init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit ); init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );

View File

@ -481,6 +481,8 @@ public:
// be determined as degraded worker. // be determined as degraded worker.
int CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER; // The minimum amount of degraded server in satellite DC to be int CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER; // The minimum amount of degraded server in satellite DC to be
// determined as degraded satellite. // determined as degraded satellite.
double CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL; // The interval to prevent re-recruiting the same singleton if a
// recruiting fight between two cluster controllers occurs.
// Knobs used to select the best policy (via monte carlo) // Knobs used to select the best policy (via monte carlo)
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare) int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)

View File

@ -1933,8 +1933,24 @@ ACTOR Future<Void> handleForcedRecoveries(ClusterControllerData* self, ClusterCo
} }
} }
ACTOR Future<Void> startDataDistributor(ClusterControllerData* self) { struct SingletonRecruitThrottler {
wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID. double lastRecruitStart;
SingletonRecruitThrottler() : lastRecruitStart(-1) {}
double newRecruitment() {
double n = now();
double waitTime =
std::max(0.0, (lastRecruitStart + SERVER_KNOBS->CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL - n));
lastRecruitStart = n;
return waitTime;
}
};
ACTOR Future<Void> startDataDistributor(ClusterControllerData* self, double waitTime) {
// If master fails at the same time, give it a chance to clear master PID.
// Also wait to avoid too many consecutive recruits in a small time window.
wait(delay(waitTime));
TraceEvent("CCStartDataDistributor", self->id).log(); TraceEvent("CCStartDataDistributor", self->id).log();
loop { loop {
@ -2003,6 +2019,7 @@ ACTOR Future<Void> startDataDistributor(ClusterControllerData* self) {
} }
ACTOR Future<Void> monitorDataDistributor(ClusterControllerData* self) { ACTOR Future<Void> monitorDataDistributor(ClusterControllerData* self) {
state SingletonRecruitThrottler recruitThrottler;
while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange()); wait(self->db.serverInfo->onChange());
} }
@ -2019,13 +2036,15 @@ ACTOR Future<Void> monitorDataDistributor(ClusterControllerData* self) {
when(wait(self->recruitDistributor.onChange())) {} when(wait(self->recruitDistributor.onChange())) {}
} }
} else { } else {
wait(startDataDistributor(self)); wait(startDataDistributor(self, recruitThrottler.newRecruitment()));
} }
} }
} }
ACTOR Future<Void> startRatekeeper(ClusterControllerData* self) { ACTOR Future<Void> startRatekeeper(ClusterControllerData* self, double waitTime) {
wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID. // If master fails at the same time, give it a chance to clear master PID.
// Also wait to avoid too many consecutive recruits in a small time window.
wait(delay(waitTime));
TraceEvent("CCStartRatekeeper", self->id).log(); TraceEvent("CCStartRatekeeper", self->id).log();
loop { loop {
@ -2091,6 +2110,7 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData* self) {
} }
ACTOR Future<Void> monitorRatekeeper(ClusterControllerData* self) { ACTOR Future<Void> monitorRatekeeper(ClusterControllerData* self) {
state SingletonRecruitThrottler recruitThrottler;
while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange()); wait(self->db.serverInfo->onChange());
} }
@ -2107,34 +2127,15 @@ ACTOR Future<Void> monitorRatekeeper(ClusterControllerData* self) {
when(wait(self->recruitRatekeeper.onChange())) {} when(wait(self->recruitRatekeeper.onChange())) {}
} }
} else { } else {
wait(startRatekeeper(self)); wait(startRatekeeper(self, recruitThrottler.newRecruitment()));
} }
} }
} }
// Acquires the BM lock by getting the next epoch no. ACTOR Future<Void> startEncryptKeyProxy(ClusterControllerData* self, double waitTime) {
ACTOR Future<int64_t> getNextBMEpoch(ClusterControllerData* self) { // If master fails at the same time, give it a chance to clear master PID.
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx); // Also wait to avoid too many consecutive recruits in a small time window.
wait(delay(waitTime));
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
Optional<Value> oldEpoch = wait(tr->get(blobManagerEpochKey));
state int64_t newEpoch = oldEpoch.present() ? decodeBlobManagerEpochValue(oldEpoch.get()) + 1 : 1;
tr->set(blobManagerEpochKey, blobManagerEpochValueFor(newEpoch));
wait(tr->commit());
TraceEvent(SevDebug, "CCNextBlobManagerEpoch", self->id).detail("Epoch", newEpoch);
return newEpoch;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> startEncryptKeyProxy(ClusterControllerData* self) {
wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID.
TraceEvent("CCEKP_Start", self->id).log(); TraceEvent("CCEKP_Start", self->id).log();
loop { loop {
@ -2208,6 +2209,7 @@ ACTOR Future<Void> startEncryptKeyProxy(ClusterControllerData* self) {
} }
ACTOR Future<Void> monitorEncryptKeyProxy(ClusterControllerData* self) { ACTOR Future<Void> monitorEncryptKeyProxy(ClusterControllerData* self) {
state SingletonRecruitThrottler recruitThrottler;
loop { loop {
if (self->db.serverInfo->get().encryptKeyProxy.present() && !self->recruitEncryptKeyProxy.get()) { if (self->db.serverInfo->get().encryptKeyProxy.present() && !self->recruitEncryptKeyProxy.get()) {
choose { choose {
@ -2219,13 +2221,36 @@ ACTOR Future<Void> monitorEncryptKeyProxy(ClusterControllerData* self) {
when(wait(self->recruitEncryptKeyProxy.onChange())) {} when(wait(self->recruitEncryptKeyProxy.onChange())) {}
} }
} else { } else {
wait(startEncryptKeyProxy(self)); wait(startEncryptKeyProxy(self, recruitThrottler.newRecruitment()));
} }
} }
} }
ACTOR Future<Void> startBlobManager(ClusterControllerData* self) { // Acquires the BM lock by getting the next epoch no.
wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID. ACTOR Future<int64_t> getNextBMEpoch(ClusterControllerData* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
Optional<Value> oldEpoch = wait(tr->get(blobManagerEpochKey));
state int64_t newEpoch = oldEpoch.present() ? decodeBlobManagerEpochValue(oldEpoch.get()) + 1 : 1;
tr->set(blobManagerEpochKey, blobManagerEpochValueFor(newEpoch));
wait(tr->commit());
TraceEvent(SevDebug, "CCNextBlobManagerEpoch", self->id).detail("Epoch", newEpoch);
return newEpoch;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> startBlobManager(ClusterControllerData* self, double waitTime) {
// If master fails at the same time, give it a chance to clear master PID.
// Also wait to avoid too many consecutive recruits in a small time window.
wait(delay(waitTime));
TraceEvent("CCStartBlobManager", self->id).log(); TraceEvent("CCStartBlobManager", self->id).log();
loop { loop {
@ -2322,6 +2347,7 @@ ACTOR Future<Void> watchBlobGranulesConfigKey(ClusterControllerData* self) {
} }
ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) { ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) {
state SingletonRecruitThrottler recruitThrottler;
while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange()); wait(self->db.serverInfo->onChange());
} }
@ -2352,7 +2378,7 @@ ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) {
} }
} else if (self->db.blobGranulesEnabled.get()) { } else if (self->db.blobGranulesEnabled.get()) {
// if there is no blob manager present but blob granules are now enabled, recruit a BM // if there is no blob manager present but blob granules are now enabled, recruit a BM
wait(startBlobManager(self)); wait(startBlobManager(self, recruitThrottler.newRecruitment()));
} else { } else {
// if there is no blob manager present and blob granules are disabled, wait for a config change // if there is no blob manager present and blob granules are disabled, wait for a config change
wait(self->db.blobGranulesEnabled.onChange()); wait(self->db.blobGranulesEnabled.onChange());