mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 02:18:39 +08:00
grey failure detection account for the case where the connection between primary and satellite DC becomes bad.
This commit is contained in:
parent
ebb60f690b
commit
e017faa6c4
@ -527,6 +527,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||
init( CC_HEALTH_TRIGGER_FAILOVER, false );
|
||||
init( CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION, 5 );
|
||||
init( CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION, 10 );
|
||||
init( CC_ENABLE_ENTIRE_SATELLITE_MONITORING, false );
|
||||
init( CC_SATELLITE_DEGRADATION_MIN_COMPLAINER, 3 );
|
||||
init( CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER, 3 );
|
||||
|
||||
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
|
||||
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );
|
||||
@ -717,6 +720,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||
init( PEER_LATENCY_CHECK_MIN_POPULATION, 30 );
|
||||
init( PEER_LATENCY_DEGRADATION_PERCENTILE, 0.90 );
|
||||
init( PEER_LATENCY_DEGRADATION_THRESHOLD, 0.05 );
|
||||
init( PEER_LATENCY_DEGRADATION_PERCENTILE_SATELLITE, 0.90 );
|
||||
init( PEER_LATENCY_DEGRADATION_THRESHOLD_SATELLITE, 0.1 );
|
||||
init( PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD, 0.1 );
|
||||
init( PEER_DEGRADATION_CONNECTION_FAILURE_COUNT, 1 );
|
||||
init( WORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER, true );
|
||||
|
@ -464,6 +464,14 @@ public:
|
||||
// failover.
|
||||
int CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION; // The maximum number of degraded servers that can trigger a
|
||||
// failover.
|
||||
bool CC_ENABLE_ENTIRE_SATELLITE_MONITORING; // When enabled, gray failure tries to detect whether the entire
|
||||
// satellite DC is degraded.
|
||||
int CC_SATELLITE_DEGRADATION_MIN_COMPLAINER; // When the network between primary and satellite becomes bad, all the
|
||||
// workers in primary may have bad network talking to the satellite.
|
||||
// This is the minimum amount of complainer for a satellite worker to
|
||||
// be determined as degraded worker.
|
||||
int CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER; // The minimum amount of degraded server in satellite DC to be
|
||||
// determined as degraded satellite.
|
||||
|
||||
// Knobs used to select the best policy (via monte carlo)
|
||||
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)
|
||||
@ -657,8 +665,12 @@ public:
|
||||
bool ENABLE_WORKER_HEALTH_MONITOR;
|
||||
double WORKER_HEALTH_MONITOR_INTERVAL; // Interval between two health monitor health check.
|
||||
int PEER_LATENCY_CHECK_MIN_POPULATION; // The minimum number of latency samples required to check a peer.
|
||||
double PEER_LATENCY_DEGRADATION_PERCENTILE; // The percentile latency used to check peer health.
|
||||
double PEER_LATENCY_DEGRADATION_PERCENTILE; // The percentile latency used to check peer health among workers inside
|
||||
// primary or remote DC.
|
||||
double PEER_LATENCY_DEGRADATION_THRESHOLD; // The latency threshold to consider a peer degraded.
|
||||
double PEER_LATENCY_DEGRADATION_PERCENTILE_SATELLITE; // The percentile latency used to check peer health between
|
||||
// primary and primary satellite.
|
||||
double PEER_LATENCY_DEGRADATION_THRESHOLD_SATELLITE; // The latency threshold to consider a peer degraded.
|
||||
double PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD; // The percentage of timeout to consider a peer degraded.
|
||||
int PEER_DEGRADATION_CONNECTION_FAILURE_COUNT; // The number of connection failures experienced during measurement
|
||||
// period to consider a peer degraded.
|
||||
|
@ -2410,24 +2410,26 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
|
||||
wait(lowPriorityDelay(SERVER_KNOBS->CC_WORKER_HEALTH_CHECKING_INTERVAL));
|
||||
}
|
||||
|
||||
self->degradedServers = self->getServersWithDegradedLink();
|
||||
self->degradationInfo = self->getDegradationInfo();
|
||||
|
||||
// Compare `self->degradedServers` with `self->excludedDegradedServers` and remove those that have
|
||||
// recovered.
|
||||
for (auto it = self->excludedDegradedServers.begin(); it != self->excludedDegradedServers.end();) {
|
||||
if (self->degradedServers.find(*it) == self->degradedServers.end()) {
|
||||
if (self->degradationInfo.degradedServers.find(*it) == self->degradationInfo.degradedServers.end()) {
|
||||
self->excludedDegradedServers.erase(it++);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
if (!self->degradedServers.empty()) {
|
||||
if (!self->degradationInfo.degradedServers.empty() || self->degradationInfo.degradedSatellite) {
|
||||
std::string degradedServerString;
|
||||
for (const auto& server : self->degradedServers) {
|
||||
for (const auto& server : self->degradationInfo.degradedServers) {
|
||||
degradedServerString += server.toString() + " ";
|
||||
}
|
||||
TraceEvent("ClusterControllerHealthMonitor").detail("DegradedServers", degradedServerString);
|
||||
TraceEvent("ClusterControllerHealthMonitor")
|
||||
.detail("DegradedServers", degradedServerString)
|
||||
.detail("DegradedSatellite", self->degradationInfo.degradedSatellite);
|
||||
|
||||
// Check if the cluster controller should trigger a recovery to exclude any degraded servers from
|
||||
// the transaction system.
|
||||
@ -2435,7 +2437,7 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
|
||||
if (SERVER_KNOBS->CC_HEALTH_TRIGGER_RECOVERY) {
|
||||
if (self->recentRecoveryCountDueToHealth() < SERVER_KNOBS->CC_MAX_HEALTH_RECOVERY_COUNT) {
|
||||
self->recentHealthTriggeredRecoveryTime.push(now());
|
||||
self->excludedDegradedServers = self->degradedServers;
|
||||
self->excludedDegradedServers = self->degradationInfo.degradedServers;
|
||||
TraceEvent("DegradedServerDetectedAndTriggerRecovery")
|
||||
.detail("RecentRecoveryCountDueToHealth", self->recentRecoveryCountDueToHealth());
|
||||
self->db.forceMasterFailure.trigger();
|
||||
@ -2784,7 +2786,7 @@ TEST_CASE("/fdbserver/clustercontroller/updateRecoveredWorkers") {
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
|
||||
TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
|
||||
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
|
||||
ClusterControllerData data(ClusterControllerFullInterface(),
|
||||
LocalityData(),
|
||||
@ -2800,18 +2802,18 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
|
||||
// cluster controller.
|
||||
{
|
||||
data.workerHealth[worker].degradedPeers[badPeer1] = { now(), now() };
|
||||
ASSERT(data.getServersWithDegradedLink().empty());
|
||||
ASSERT(data.getDegradationInfo().degradedServers.empty());
|
||||
data.workerHealth.clear();
|
||||
}
|
||||
|
||||
// Test that when there is only one reported degraded link, getServersWithDegradedLink can return correct
|
||||
// Test that when there is only one reported degraded link, getDegradationInfo can return correct
|
||||
// degraded server.
|
||||
{
|
||||
data.workerHealth[worker].degradedPeers[badPeer1] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
|
||||
now() };
|
||||
auto degradedServers = data.getServersWithDegradedLink();
|
||||
ASSERT(degradedServers.size() == 1);
|
||||
ASSERT(degradedServers.find(badPeer1) != degradedServers.end());
|
||||
auto degradationInfo = data.getDegradationInfo();
|
||||
ASSERT(degradationInfo.degradedServers.size() == 1);
|
||||
ASSERT(degradationInfo.degradedServers.find(badPeer1) != degradationInfo.degradedServers.end());
|
||||
data.workerHealth.clear();
|
||||
}
|
||||
|
||||
@ -2821,10 +2823,10 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
|
||||
now() };
|
||||
data.workerHealth[badPeer1].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
|
||||
now() };
|
||||
auto degradedServers = data.getServersWithDegradedLink();
|
||||
ASSERT(degradedServers.size() == 1);
|
||||
ASSERT(degradedServers.find(worker) != degradedServers.end() ||
|
||||
degradedServers.find(badPeer1) != degradedServers.end());
|
||||
auto degradationInfo = data.getDegradationInfo();
|
||||
ASSERT(degradationInfo.degradedServers.size() == 1);
|
||||
ASSERT(degradationInfo.degradedServers.find(worker) != degradationInfo.degradedServers.end() ||
|
||||
degradationInfo.degradedServers.find(badPeer1) != degradationInfo.degradedServers.end());
|
||||
data.workerHealth.clear();
|
||||
}
|
||||
|
||||
@ -2839,9 +2841,9 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
|
||||
now() };
|
||||
data.workerHealth[badPeer2].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
|
||||
now() };
|
||||
auto degradedServers = data.getServersWithDegradedLink();
|
||||
ASSERT(degradedServers.size() == 1);
|
||||
ASSERT(degradedServers.find(worker) != degradedServers.end());
|
||||
auto degradationInfo = data.getDegradationInfo();
|
||||
ASSERT(degradationInfo.degradedServers.size() == 1);
|
||||
ASSERT(degradationInfo.degradedServers.find(worker) != degradationInfo.degradedServers.end());
|
||||
data.workerHealth.clear();
|
||||
}
|
||||
|
||||
@ -2856,7 +2858,7 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
|
||||
now() };
|
||||
data.workerHealth[badPeer4].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
|
||||
now() };
|
||||
ASSERT(data.getServersWithDegradedLink().empty());
|
||||
ASSERT(data.getDegradationInfo().degradedServers.empty());
|
||||
data.workerHealth.clear();
|
||||
}
|
||||
|
||||
@ -2880,7 +2882,7 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") {
|
||||
now() };
|
||||
data.workerHealth[badPeer4].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1,
|
||||
now() };
|
||||
ASSERT(data.getServersWithDegradedLink().empty());
|
||||
ASSERT(data.getDegradationInfo().degradedServers.empty());
|
||||
data.workerHealth.clear();
|
||||
}
|
||||
|
||||
@ -2977,42 +2979,42 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
|
||||
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
|
||||
|
||||
// Trigger recovery when master is degraded.
|
||||
data.degradedServers.insert(master);
|
||||
data.degradationInfo.degradedServers.insert(master);
|
||||
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// Trigger recovery when primary TLog is degraded.
|
||||
data.degradedServers.insert(tlog);
|
||||
data.degradationInfo.degradedServers.insert(tlog);
|
||||
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// No recovery when satellite Tlog is degraded.
|
||||
data.degradedServers.insert(satelliteTlog);
|
||||
data.degradationInfo.degradedServers.insert(satelliteTlog);
|
||||
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// No recovery when remote tlog is degraded.
|
||||
data.degradedServers.insert(remoteTlog);
|
||||
data.degradationInfo.degradedServers.insert(remoteTlog);
|
||||
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// No recovery when log router is degraded.
|
||||
data.degradedServers.insert(logRouter);
|
||||
data.degradationInfo.degradedServers.insert(logRouter);
|
||||
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// No recovery when backup worker is degraded.
|
||||
data.degradedServers.insert(backup);
|
||||
data.degradationInfo.degradedServers.insert(backup);
|
||||
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// Trigger recovery when proxy is degraded.
|
||||
data.degradedServers.insert(proxy);
|
||||
data.degradationInfo.degradedServers.insert(proxy);
|
||||
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// Trigger recovery when resolver is degraded.
|
||||
data.degradedServers.insert(resolver);
|
||||
data.degradationInfo.degradedServers.insert(resolver);
|
||||
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
|
||||
|
||||
return Void();
|
||||
@ -3090,16 +3092,16 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
|
||||
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
|
||||
|
||||
// No failover when small number of degraded servers
|
||||
data.degradedServers.insert(master);
|
||||
data.degradationInfo.degradedServers.insert(master);
|
||||
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// Trigger failover when enough servers in the txn system are degraded.
|
||||
data.degradedServers.insert(master);
|
||||
data.degradedServers.insert(tlog);
|
||||
data.degradedServers.insert(proxy);
|
||||
data.degradedServers.insert(proxy2);
|
||||
data.degradedServers.insert(resolver);
|
||||
data.degradationInfo.degradedServers.insert(master);
|
||||
data.degradationInfo.degradedServers.insert(tlog);
|
||||
data.degradationInfo.degradedServers.insert(proxy);
|
||||
data.degradationInfo.degradedServers.insert(proxy2);
|
||||
data.degradationInfo.degradedServers.insert(resolver);
|
||||
ASSERT(data.shouldTriggerFailoverDueToDegradedServers());
|
||||
|
||||
// No failover when usable region is 1.
|
||||
@ -3108,18 +3110,29 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer
|
||||
data.db.config.usableRegions = 2;
|
||||
|
||||
// No failover when remote is also degraded.
|
||||
data.degradedServers.insert(remoteTlog);
|
||||
data.degradationInfo.degradedServers.insert(remoteTlog);
|
||||
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// No failover when some are not from transaction system
|
||||
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 1));
|
||||
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 2));
|
||||
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 3));
|
||||
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 4));
|
||||
data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 5));
|
||||
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 1));
|
||||
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 2));
|
||||
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 3));
|
||||
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 4));
|
||||
data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 5));
|
||||
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
|
||||
data.degradedServers.clear();
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// Trigger failover when satellite is degraded.
|
||||
data.degradationInfo.degradedSatellite = true;
|
||||
ASSERT(data.shouldTriggerFailoverDueToDegradedServers());
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
// No failover when satellite is degraded, but remote is not healthy.
|
||||
data.degradationInfo.degradedSatellite = true;
|
||||
data.degradationInfo.degradedServers.insert(remoteTlog);
|
||||
ASSERT(!data.shouldTriggerFailoverDueToDegradedServers());
|
||||
data.degradationInfo.degradedServers.clear();
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -2981,9 +2981,16 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
struct DegradationInfo {
|
||||
std::unordered_set<NetworkAddress>
|
||||
degradedServers; // The servers that the cluster controller is considered as degraded. The servers in this
|
||||
// list are not excluded unless they are added to `excludedDegradedServers`.
|
||||
|
||||
bool degradedSatellite = false; // Indicates that the entire satellite DC is degraded.
|
||||
};
|
||||
// Returns a list of servers who are experiencing degraded links. These are candidates to perform exclusion. Note
|
||||
// that only one endpoint of a bad link will be included in this list.
|
||||
std::unordered_set<NetworkAddress> getServersWithDegradedLink() {
|
||||
DegradationInfo getDegradationInfo() {
|
||||
updateRecoveredWorkers();
|
||||
|
||||
// Build a map keyed by measured degraded peer. This map gives the info that who complains a particular server.
|
||||
@ -3014,7 +3021,11 @@ public:
|
||||
//
|
||||
// For example, if server A is already considered as a degraded server, and A complains B, we won't add B as
|
||||
// degraded since A is already considered as degraded.
|
||||
//
|
||||
// In the meantime, we also count the number of satellite workers got complained. If enough number of satellite
|
||||
// workers are degraded, this may indicates that the whole network between primary and satellite is bad.
|
||||
std::unordered_set<NetworkAddress> currentDegradedServers;
|
||||
int satelliteBadServerCount = 0;
|
||||
for (const auto& [complainerCount, badServer] : count2DegradedPeer) {
|
||||
for (const auto& complainer : degradedLinkDst2Src[badServer]) {
|
||||
if (currentDegradedServers.find(complainer) == currentDegradedServers.end()) {
|
||||
@ -3022,23 +3033,37 @@ public:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (SERVER_KNOBS->CC_ENABLE_ENTIRE_SATELLITE_MONITORING &&
|
||||
addressInDbAndPrimarySatelliteDc(badServer, db.serverInfo) &&
|
||||
complainerCount >= SERVER_KNOBS->CC_SATELLITE_DEGRADATION_MIN_COMPLAINER) {
|
||||
++satelliteBadServerCount;
|
||||
}
|
||||
}
|
||||
|
||||
// For degraded server that are complained by more than SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE, we
|
||||
// don't know if it is a hot server, or the network is bad. We remove from the returned degraded server list.
|
||||
std::unordered_set<NetworkAddress> currentDegradedServersWithinLimit;
|
||||
DegradationInfo currentDegradationInfo;
|
||||
for (const auto& badServer : currentDegradedServers) {
|
||||
if (degradedLinkDst2Src[badServer].size() <= SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE) {
|
||||
currentDegradedServersWithinLimit.insert(badServer);
|
||||
currentDegradationInfo.degradedServers.insert(badServer);
|
||||
}
|
||||
}
|
||||
return currentDegradedServersWithinLimit;
|
||||
|
||||
// If enough number of satellite workers are bad, we mark the entire satellite is bad. Note that this needs to
|
||||
// be used with caution (controlled by CC_ENABLE_ENTIRE_SATELLITE_MONITORING knob), since the slow workers may
|
||||
// also be caused by workload.
|
||||
if (satelliteBadServerCount >= SERVER_KNOBS->CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER) {
|
||||
currentDegradationInfo.degradedSatellite = true;
|
||||
}
|
||||
return currentDegradationInfo;
|
||||
}
|
||||
|
||||
// Whether the transaction system (in primary DC if in HA setting) contains degraded servers.
|
||||
bool transactionSystemContainsDegradedServers() {
|
||||
const ServerDBInfo dbi = db.serverInfo->get();
|
||||
for (const auto& excludedServer : degradedServers) {
|
||||
for (const auto& excludedServer : degradationInfo.degradedServers) {
|
||||
if (dbi.master.addresses().contains(excludedServer)) {
|
||||
return true;
|
||||
}
|
||||
@ -3083,7 +3108,7 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const auto& excludedServer : degradedServers) {
|
||||
for (const auto& excludedServer : degradationInfo.degradedServers) {
|
||||
if (addressInDbAndRemoteDc(excludedServer, db.serverInfo)) {
|
||||
return true;
|
||||
}
|
||||
@ -3121,7 +3146,7 @@ public:
|
||||
// Returns true when the cluster controller should trigger a recovery due to degraded servers used in the
|
||||
// transaction system in the primary data center.
|
||||
bool shouldTriggerRecoveryDueToDegradedServers() {
|
||||
if (degradedServers.size() > SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) {
|
||||
if (degradationInfo.degradedServers.size() > SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -3154,8 +3179,14 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
if (degradedServers.size() < SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION ||
|
||||
degradedServers.size() > SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION) {
|
||||
bool remoteIsHealthy = !remoteTransactionSystemContainsDegradedServers();
|
||||
if (degradationInfo.degradedSatellite && remoteIsHealthy) {
|
||||
// If the satellite DC is bad, a failover is desired despite the number of degraded servers.
|
||||
return true;
|
||||
}
|
||||
|
||||
if (degradationInfo.degradedServers.size() < SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION ||
|
||||
degradationInfo.degradedServers.size() > SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -3165,7 +3196,7 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
return transactionSystemContainsDegradedServers() && !remoteTransactionSystemContainsDegradedServers();
|
||||
return transactionSystemContainsDegradedServers() && remoteIsHealthy;
|
||||
}
|
||||
|
||||
int recentRecoveryCountDueToHealth() {
|
||||
@ -3248,9 +3279,7 @@ public:
|
||||
// TODO(zhewu): Include disk and CPU signals.
|
||||
};
|
||||
std::unordered_map<NetworkAddress, WorkerHealth> workerHealth;
|
||||
std::unordered_set<NetworkAddress>
|
||||
degradedServers; // The servers that the cluster controller is considered as degraded. The servers in this list
|
||||
// are not excluded unless they are added to `excludedDegradedServers`.
|
||||
DegradationInfo degradationInfo;
|
||||
std::unordered_set<NetworkAddress>
|
||||
excludedDegradedServers; // The degraded servers to be excluded when assigning workers to roles.
|
||||
std::queue<double> recentHealthTriggeredRecoveryTime;
|
||||
|
@ -1142,6 +1142,10 @@ ACTOR Future<Void> backupWorker(BackupInterface bi,
|
||||
|
||||
void registerThreadForProfiling();
|
||||
|
||||
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's primary
|
||||
// satellite DC.
|
||||
bool addressInDbAndPrimarySatelliteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
|
||||
|
||||
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's remote DC.
|
||||
bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
|
||||
|
||||
|
@ -778,6 +778,82 @@ TEST_CASE("/fdbserver/worker/addressInDbAndPrimaryDc") {
|
||||
|
||||
} // namespace
|
||||
|
||||
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's primary
|
||||
// satellite DC.
|
||||
bool addressInDbAndPrimarySatelliteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
for (const auto& logSet : dbInfo->get().logSystemConfig.tLogs) {
|
||||
if (logSet.isLocal && logSet.locality == tagLocalitySatellite) {
|
||||
for (const auto& tlog : logSet.tLogs) {
|
||||
if (tlog.present() && tlog.interf().addresses().contains(address)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool addressesInDbAndPrimarySatelliteDc(const NetworkAddressList& addresses,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
return addressInDbAndPrimarySatelliteDc(addresses.address, dbInfo) ||
|
||||
(addresses.secondaryAddress.present() &&
|
||||
addressInDbAndPrimarySatelliteDc(addresses.secondaryAddress.get(), dbInfo));
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
TEST_CASE("/fdbserver/worker/addressInDbAndPrimarySatelliteDc") {
|
||||
// Setup a ServerDBInfo for test.
|
||||
ServerDBInfo testDbInfo;
|
||||
LocalityData testLocal;
|
||||
testLocal.set(LiteralStringRef("dcid"), StringRef(std::to_string(1)));
|
||||
testDbInfo.master.locality = testLocal;
|
||||
|
||||
// First, create an empty TLogInterface, and check that it shouldn't be considered as in satellite DC.
|
||||
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
|
||||
testDbInfo.logSystemConfig.tLogs.back().isLocal = true;
|
||||
testDbInfo.logSystemConfig.tLogs.back().locality = tagLocalitySatellite;
|
||||
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface<TLogInterface>());
|
||||
ASSERT(!addressInDbAndPrimarySatelliteDc(g_network->getLocalAddress(),
|
||||
makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
|
||||
|
||||
// Create a satellite tlog, and it should be considered as in primary satellite DC.
|
||||
NetworkAddress satelliteTLogAddress(IPAddress(0x13131313), 1);
|
||||
TLogInterface satelliteTLog(testLocal);
|
||||
satelliteTLog.initEndpoints();
|
||||
satelliteTLog.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ satelliteTLogAddress }, UID(1, 2)));
|
||||
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(satelliteTLog));
|
||||
ASSERT(addressInDbAndPrimarySatelliteDc(satelliteTLogAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
|
||||
|
||||
// Create a primary TLog, and it shouldn't be considered as in primary Satellite DC.
|
||||
NetworkAddress primaryTLogAddress(IPAddress(0x26262626), 1);
|
||||
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
|
||||
testDbInfo.logSystemConfig.tLogs.back().isLocal = true;
|
||||
TLogInterface primaryTLog(testLocal);
|
||||
primaryTLog.initEndpoints();
|
||||
primaryTLog.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ primaryTLogAddress }, UID(1, 2)));
|
||||
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(primaryTLog));
|
||||
ASSERT(!addressInDbAndPrimarySatelliteDc(primaryTLogAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
|
||||
|
||||
// Create a remote TLog, and it should be considered as in remote DC.
|
||||
NetworkAddress remoteTLogAddress(IPAddress(0x37373737), 1);
|
||||
LocalityData fakeRemote;
|
||||
fakeRemote.set(LiteralStringRef("dcid"), StringRef(std::to_string(2)));
|
||||
TLogInterface remoteTLog(fakeRemote);
|
||||
remoteTLog.initEndpoints();
|
||||
remoteTLog.peekMessages = RequestStream<struct TLogPeekRequest>(Endpoint({ remoteTLogAddress }, UID(1, 2)));
|
||||
|
||||
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
|
||||
testDbInfo.logSystemConfig.tLogs.back().isLocal = false;
|
||||
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(remoteTLog));
|
||||
ASSERT(!addressInDbAndPrimarySatelliteDc(remoteTLogAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
const auto& dbi = dbInfo->get();
|
||||
|
||||
@ -872,17 +948,17 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
|
||||
const auto& allPeers = FlowTransport::transport().getAllPeers();
|
||||
UpdateWorkerHealthRequest req;
|
||||
|
||||
bool workerInDb = false;
|
||||
bool workerInPrimary = false;
|
||||
enum WorkerLocation { None, Primary, PrimarySatellite, Remote };
|
||||
WorkerLocation workerLocation = None;
|
||||
if (addressesInDbAndPrimaryDc(interf.addresses(), dbInfo)) {
|
||||
workerInDb = true;
|
||||
workerInPrimary = true;
|
||||
workerLocation = Primary;
|
||||
} else if (addressesInDbAndPrimarySatelliteDc(interf.addresses(), dbInfo)) {
|
||||
workerLocation = PrimarySatellite;
|
||||
} else if (addressesInDbAndRemoteDc(interf.addresses(), dbInfo)) {
|
||||
workerInDb = true;
|
||||
workerInPrimary = false;
|
||||
workerLocation = Remote;
|
||||
}
|
||||
|
||||
if (workerInDb) {
|
||||
if (workerLocation != None) {
|
||||
for (const auto& [address, peer] : allPeers) {
|
||||
if (peer->connectFailedCount == 0 &&
|
||||
peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) {
|
||||
@ -895,37 +971,50 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
|
||||
// last ping latencies logged.
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((workerInPrimary && addressInDbAndPrimaryDc(address, dbInfo)) ||
|
||||
(!workerInPrimary && addressInDbAndRemoteDc(address, dbInfo))) {
|
||||
// Only monitoring the servers that in the primary or remote DC's transaction systems.
|
||||
// Note that currently we are not monitor storage servers, since lagging in storage servers
|
||||
// today already can trigger server exclusion by data distributor.
|
||||
bool degradedPeer = false;
|
||||
if ((workerLocation == Primary && addressInDbAndPrimaryDc(address, dbInfo)) ||
|
||||
(workerLocation == Remote && addressInDbAndRemoteDc(address, dbInfo))) {
|
||||
// Monitors intra DC latencies between servers that in the primary or remote DC's transaction
|
||||
// systems. Note that currently we are not monitor storage servers, since lagging in storage
|
||||
// servers today already can trigger server exclusion by data distributor.
|
||||
|
||||
if (peer->connectFailedCount >= SERVER_KNOBS->PEER_DEGRADATION_CONNECTION_FAILURE_COUNT ||
|
||||
peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE) >
|
||||
SERVER_KNOBS->PEER_LATENCY_DEGRADATION_THRESHOLD ||
|
||||
peer->timeoutCount / (double)(peer->pingLatencies.getPopulationSize()) >
|
||||
SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) {
|
||||
// This is a degraded peer.
|
||||
TraceEvent("HealthMonitorDetectDegradedPeer")
|
||||
.suppressFor(30)
|
||||
.detail("Peer", address)
|
||||
.detail("Elapsed", now() - peer->lastLoggedTime)
|
||||
.detail("MinLatency", peer->pingLatencies.min())
|
||||
.detail("MaxLatency", peer->pingLatencies.max())
|
||||
.detail("MeanLatency", peer->pingLatencies.mean())
|
||||
.detail("MedianLatency", peer->pingLatencies.median())
|
||||
.detail("CheckedPercentile", SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE)
|
||||
.detail(
|
||||
"CheckedPercentileLatency",
|
||||
peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE))
|
||||
.detail("PingCount", peer->pingLatencies.getPopulationSize())
|
||||
.detail("PingTimeoutCount", peer->timeoutCount)
|
||||
.detail("ConnectionFailureCount", peer->connectFailedCount);
|
||||
|
||||
req.degradedPeers.push_back(address);
|
||||
degradedPeer = true;
|
||||
}
|
||||
} else if (workerLocation == Primary && addressInDbAndPrimarySatelliteDc(address, dbInfo)) {
|
||||
// Monitors inter DC latencies between servers in primary and primary satellite DC. Note that
|
||||
// TLog workers in primary satellite DC are on the critical path of serving a commit.
|
||||
if (peer->connectFailedCount >= SERVER_KNOBS->PEER_DEGRADATION_CONNECTION_FAILURE_COUNT ||
|
||||
peer->pingLatencies.percentile(
|
||||
SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE_SATELLITE) >
|
||||
SERVER_KNOBS->PEER_LATENCY_DEGRADATION_THRESHOLD_SATELLITE ||
|
||||
peer->timeoutCount / (double)(peer->pingLatencies.getPopulationSize()) >
|
||||
SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) {
|
||||
degradedPeer = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (degradedPeer) {
|
||||
TraceEvent("HealthMonitorDetectDegradedPeer")
|
||||
.suppressFor(30)
|
||||
.detail("Peer", address)
|
||||
.detail("Elapsed", now() - peer->lastLoggedTime)
|
||||
.detail("MinLatency", peer->pingLatencies.min())
|
||||
.detail("MaxLatency", peer->pingLatencies.max())
|
||||
.detail("MeanLatency", peer->pingLatencies.mean())
|
||||
.detail("MedianLatency", peer->pingLatencies.median())
|
||||
.detail("CheckedPercentile", SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE)
|
||||
.detail("CheckedPercentileLatency",
|
||||
peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE))
|
||||
.detail("PingCount", peer->pingLatencies.getPopulationSize())
|
||||
.detail("PingTimeoutCount", peer->timeoutCount)
|
||||
.detail("ConnectionFailureCount", peer->connectFailedCount);
|
||||
|
||||
req.degradedPeers.push_back(address);
|
||||
}
|
||||
}
|
||||
|
||||
@ -941,8 +1030,9 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((workerInPrimary && addressInDbAndPrimaryDc(address, dbInfo)) ||
|
||||
(!workerInPrimary && addressInDbAndRemoteDc(address, dbInfo))) {
|
||||
if ((workerLocation == Primary && addressInDbAndPrimaryDc(address, dbInfo)) ||
|
||||
(workerLocation == Remote && addressInDbAndRemoteDc(address, dbInfo)) ||
|
||||
(workerLocation == Primary && addressInDbAndPrimarySatelliteDc(address, dbInfo))) {
|
||||
TraceEvent("HealthMonitorDetectRecentClosedPeer").suppressFor(30).detail("Peer", address);
|
||||
req.degradedPeers.push_back(address);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user