diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 917664343c..14a97b0fad 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -527,6 +527,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( CC_HEALTH_TRIGGER_FAILOVER, false ); init( CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION, 5 ); init( CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION, 10 ); + init( CC_ENABLE_ENTIRE_SATELLITE_MONITORING, false ); + init( CC_SATELLITE_DEGRADATION_MIN_COMPLAINER, 3 ); + init( CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER, 3 ); init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0; init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit ); @@ -717,6 +720,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( PEER_LATENCY_CHECK_MIN_POPULATION, 30 ); init( PEER_LATENCY_DEGRADATION_PERCENTILE, 0.90 ); init( PEER_LATENCY_DEGRADATION_THRESHOLD, 0.05 ); + init( PEER_LATENCY_DEGRADATION_PERCENTILE_SATELLITE, 0.90 ); + init( PEER_LATENCY_DEGRADATION_THRESHOLD_SATELLITE, 0.1 ); init( PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD, 0.1 ); init( PEER_DEGRADATION_CONNECTION_FAILURE_COUNT, 1 ); init( WORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER, true ); diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 13859ac9d6..acb2006784 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -464,6 +464,14 @@ public: // failover. int CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION; // The maximum number of degraded servers that can trigger a // failover. + bool CC_ENABLE_ENTIRE_SATELLITE_MONITORING; // When enabled, gray failure tries to detect whether the entire + // satellite DC is degraded. + int CC_SATELLITE_DEGRADATION_MIN_COMPLAINER; // When the network between primary and satellite becomes bad, all the + // workers in primary may have bad network talking to the satellite. + // This is the minimum amount of complainer for a satellite worker to + // be determined as degraded worker. + int CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER; // The minimum amount of degraded server in satellite DC to be + // determined as degraded satellite. // Knobs used to select the best policy (via monte carlo) int POLICY_RATING_TESTS; // number of tests per policy (in order to compare) @@ -657,8 +665,12 @@ public: bool ENABLE_WORKER_HEALTH_MONITOR; double WORKER_HEALTH_MONITOR_INTERVAL; // Interval between two health monitor health check. int PEER_LATENCY_CHECK_MIN_POPULATION; // The minimum number of latency samples required to check a peer. - double PEER_LATENCY_DEGRADATION_PERCENTILE; // The percentile latency used to check peer health. + double PEER_LATENCY_DEGRADATION_PERCENTILE; // The percentile latency used to check peer health among workers inside + // primary or remote DC. double PEER_LATENCY_DEGRADATION_THRESHOLD; // The latency threshold to consider a peer degraded. + double PEER_LATENCY_DEGRADATION_PERCENTILE_SATELLITE; // The percentile latency used to check peer health between + // primary and primary satellite. + double PEER_LATENCY_DEGRADATION_THRESHOLD_SATELLITE; // The latency threshold to consider a peer degraded. double PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD; // The percentage of timeout to consider a peer degraded. int PEER_DEGRADATION_CONNECTION_FAILURE_COUNT; // The number of connection failures experienced during measurement // period to consider a peer degraded. diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 8d44c257f6..8a75c9516c 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -2410,24 +2410,26 @@ ACTOR Future workerHealthMonitor(ClusterControllerData* self) { wait(lowPriorityDelay(SERVER_KNOBS->CC_WORKER_HEALTH_CHECKING_INTERVAL)); } - self->degradedServers = self->getServersWithDegradedLink(); + self->degradationInfo = self->getDegradationInfo(); // Compare `self->degradedServers` with `self->excludedDegradedServers` and remove those that have // recovered. for (auto it = self->excludedDegradedServers.begin(); it != self->excludedDegradedServers.end();) { - if (self->degradedServers.find(*it) == self->degradedServers.end()) { + if (self->degradationInfo.degradedServers.find(*it) == self->degradationInfo.degradedServers.end()) { self->excludedDegradedServers.erase(it++); } else { ++it; } } - if (!self->degradedServers.empty()) { + if (!self->degradationInfo.degradedServers.empty() || self->degradationInfo.degradedSatellite) { std::string degradedServerString; - for (const auto& server : self->degradedServers) { + for (const auto& server : self->degradationInfo.degradedServers) { degradedServerString += server.toString() + " "; } - TraceEvent("ClusterControllerHealthMonitor").detail("DegradedServers", degradedServerString); + TraceEvent("ClusterControllerHealthMonitor") + .detail("DegradedServers", degradedServerString) + .detail("DegradedSatellite", self->degradationInfo.degradedSatellite); // Check if the cluster controller should trigger a recovery to exclude any degraded servers from // the transaction system. @@ -2435,7 +2437,7 @@ ACTOR Future workerHealthMonitor(ClusterControllerData* self) { if (SERVER_KNOBS->CC_HEALTH_TRIGGER_RECOVERY) { if (self->recentRecoveryCountDueToHealth() < SERVER_KNOBS->CC_MAX_HEALTH_RECOVERY_COUNT) { self->recentHealthTriggeredRecoveryTime.push(now()); - self->excludedDegradedServers = self->degradedServers; + self->excludedDegradedServers = self->degradationInfo.degradedServers; TraceEvent("DegradedServerDetectedAndTriggerRecovery") .detail("RecentRecoveryCountDueToHealth", self->recentRecoveryCountDueToHealth()); self->db.forceMasterFailure.trigger(); @@ -2784,7 +2786,7 @@ TEST_CASE("/fdbserver/clustercontroller/updateRecoveredWorkers") { return Void(); } -TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") { +TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") { // Create a testing ClusterControllerData. Most of the internal states do not matter in this test. ClusterControllerData data(ClusterControllerFullInterface(), LocalityData(), @@ -2800,18 +2802,18 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") { // cluster controller. { data.workerHealth[worker].degradedPeers[badPeer1] = { now(), now() }; - ASSERT(data.getServersWithDegradedLink().empty()); + ASSERT(data.getDegradationInfo().degradedServers.empty()); data.workerHealth.clear(); } - // Test that when there is only one reported degraded link, getServersWithDegradedLink can return correct + // Test that when there is only one reported degraded link, getDegradationInfo can return correct // degraded server. { data.workerHealth[worker].degradedPeers[badPeer1] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1, now() }; - auto degradedServers = data.getServersWithDegradedLink(); - ASSERT(degradedServers.size() == 1); - ASSERT(degradedServers.find(badPeer1) != degradedServers.end()); + auto degradationInfo = data.getDegradationInfo(); + ASSERT(degradationInfo.degradedServers.size() == 1); + ASSERT(degradationInfo.degradedServers.find(badPeer1) != degradationInfo.degradedServers.end()); data.workerHealth.clear(); } @@ -2821,10 +2823,10 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") { now() }; data.workerHealth[badPeer1].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1, now() }; - auto degradedServers = data.getServersWithDegradedLink(); - ASSERT(degradedServers.size() == 1); - ASSERT(degradedServers.find(worker) != degradedServers.end() || - degradedServers.find(badPeer1) != degradedServers.end()); + auto degradationInfo = data.getDegradationInfo(); + ASSERT(degradationInfo.degradedServers.size() == 1); + ASSERT(degradationInfo.degradedServers.find(worker) != degradationInfo.degradedServers.end() || + degradationInfo.degradedServers.find(badPeer1) != degradationInfo.degradedServers.end()); data.workerHealth.clear(); } @@ -2839,9 +2841,9 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") { now() }; data.workerHealth[badPeer2].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1, now() }; - auto degradedServers = data.getServersWithDegradedLink(); - ASSERT(degradedServers.size() == 1); - ASSERT(degradedServers.find(worker) != degradedServers.end()); + auto degradationInfo = data.getDegradationInfo(); + ASSERT(degradationInfo.degradedServers.size() == 1); + ASSERT(degradationInfo.degradedServers.find(worker) != degradationInfo.degradedServers.end()); data.workerHealth.clear(); } @@ -2856,7 +2858,7 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") { now() }; data.workerHealth[badPeer4].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1, now() }; - ASSERT(data.getServersWithDegradedLink().empty()); + ASSERT(data.getDegradationInfo().degradedServers.empty()); data.workerHealth.clear(); } @@ -2880,7 +2882,7 @@ TEST_CASE("/fdbserver/clustercontroller/getServersWithDegradedLink") { now() }; data.workerHealth[badPeer4].degradedPeers[worker] = { now() - SERVER_KNOBS->CC_MIN_DEGRADATION_INTERVAL - 1, now() }; - ASSERT(data.getServersWithDegradedLink().empty()); + ASSERT(data.getDegradationInfo().degradedServers.empty()); data.workerHealth.clear(); } @@ -2977,42 +2979,42 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers()); // Trigger recovery when master is degraded. - data.degradedServers.insert(master); + data.degradationInfo.degradedServers.insert(master); ASSERT(data.shouldTriggerRecoveryDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); // Trigger recovery when primary TLog is degraded. - data.degradedServers.insert(tlog); + data.degradationInfo.degradedServers.insert(tlog); ASSERT(data.shouldTriggerRecoveryDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); // No recovery when satellite Tlog is degraded. - data.degradedServers.insert(satelliteTlog); + data.degradationInfo.degradedServers.insert(satelliteTlog); ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); // No recovery when remote tlog is degraded. - data.degradedServers.insert(remoteTlog); + data.degradationInfo.degradedServers.insert(remoteTlog); ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); // No recovery when log router is degraded. - data.degradedServers.insert(logRouter); + data.degradationInfo.degradedServers.insert(logRouter); ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); // No recovery when backup worker is degraded. - data.degradedServers.insert(backup); + data.degradationInfo.degradedServers.insert(backup); ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); // Trigger recovery when proxy is degraded. - data.degradedServers.insert(proxy); + data.degradationInfo.degradedServers.insert(proxy); ASSERT(data.shouldTriggerRecoveryDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); // Trigger recovery when resolver is degraded. - data.degradedServers.insert(resolver); + data.degradationInfo.degradedServers.insert(resolver); ASSERT(data.shouldTriggerRecoveryDueToDegradedServers()); return Void(); @@ -3090,16 +3092,16 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer ASSERT(!data.shouldTriggerFailoverDueToDegradedServers()); // No failover when small number of degraded servers - data.degradedServers.insert(master); + data.degradationInfo.degradedServers.insert(master); ASSERT(!data.shouldTriggerFailoverDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); // Trigger failover when enough servers in the txn system are degraded. - data.degradedServers.insert(master); - data.degradedServers.insert(tlog); - data.degradedServers.insert(proxy); - data.degradedServers.insert(proxy2); - data.degradedServers.insert(resolver); + data.degradationInfo.degradedServers.insert(master); + data.degradationInfo.degradedServers.insert(tlog); + data.degradationInfo.degradedServers.insert(proxy); + data.degradationInfo.degradedServers.insert(proxy2); + data.degradationInfo.degradedServers.insert(resolver); ASSERT(data.shouldTriggerFailoverDueToDegradedServers()); // No failover when usable region is 1. @@ -3108,18 +3110,29 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerFailoverDueToDegradedServer data.db.config.usableRegions = 2; // No failover when remote is also degraded. - data.degradedServers.insert(remoteTlog); + data.degradationInfo.degradedServers.insert(remoteTlog); ASSERT(!data.shouldTriggerFailoverDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); // No failover when some are not from transaction system - data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 1)); - data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 2)); - data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 3)); - data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 4)); - data.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 5)); + data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 1)); + data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 2)); + data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 3)); + data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 4)); + data.degradationInfo.degradedServers.insert(NetworkAddress(IPAddress(0x13131313), 5)); ASSERT(!data.shouldTriggerFailoverDueToDegradedServers()); - data.degradedServers.clear(); + data.degradationInfo.degradedServers.clear(); + + // Trigger failover when satellite is degraded. + data.degradationInfo.degradedSatellite = true; + ASSERT(data.shouldTriggerFailoverDueToDegradedServers()); + data.degradationInfo.degradedServers.clear(); + + // No failover when satellite is degraded, but remote is not healthy. + data.degradationInfo.degradedSatellite = true; + data.degradationInfo.degradedServers.insert(remoteTlog); + ASSERT(!data.shouldTriggerFailoverDueToDegradedServers()); + data.degradationInfo.degradedServers.clear(); return Void(); } diff --git a/fdbserver/ClusterController.actor.h b/fdbserver/ClusterController.actor.h index d9e245425b..c2380cbc38 100644 --- a/fdbserver/ClusterController.actor.h +++ b/fdbserver/ClusterController.actor.h @@ -2981,9 +2981,16 @@ public: } } + struct DegradationInfo { + std::unordered_set + degradedServers; // The servers that the cluster controller is considered as degraded. The servers in this + // list are not excluded unless they are added to `excludedDegradedServers`. + + bool degradedSatellite = false; // Indicates that the entire satellite DC is degraded. + }; // Returns a list of servers who are experiencing degraded links. These are candidates to perform exclusion. Note // that only one endpoint of a bad link will be included in this list. - std::unordered_set getServersWithDegradedLink() { + DegradationInfo getDegradationInfo() { updateRecoveredWorkers(); // Build a map keyed by measured degraded peer. This map gives the info that who complains a particular server. @@ -3014,7 +3021,11 @@ public: // // For example, if server A is already considered as a degraded server, and A complains B, we won't add B as // degraded since A is already considered as degraded. + // + // In the meantime, we also count the number of satellite workers got complained. If enough number of satellite + // workers are degraded, this may indicates that the whole network between primary and satellite is bad. std::unordered_set currentDegradedServers; + int satelliteBadServerCount = 0; for (const auto& [complainerCount, badServer] : count2DegradedPeer) { for (const auto& complainer : degradedLinkDst2Src[badServer]) { if (currentDegradedServers.find(complainer) == currentDegradedServers.end()) { @@ -3022,23 +3033,37 @@ public: break; } } + + if (SERVER_KNOBS->CC_ENABLE_ENTIRE_SATELLITE_MONITORING && + addressInDbAndPrimarySatelliteDc(badServer, db.serverInfo) && + complainerCount >= SERVER_KNOBS->CC_SATELLITE_DEGRADATION_MIN_COMPLAINER) { + ++satelliteBadServerCount; + } } // For degraded server that are complained by more than SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE, we // don't know if it is a hot server, or the network is bad. We remove from the returned degraded server list. std::unordered_set currentDegradedServersWithinLimit; + DegradationInfo currentDegradationInfo; for (const auto& badServer : currentDegradedServers) { if (degradedLinkDst2Src[badServer].size() <= SERVER_KNOBS->CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE) { - currentDegradedServersWithinLimit.insert(badServer); + currentDegradationInfo.degradedServers.insert(badServer); } } - return currentDegradedServersWithinLimit; + + // If enough number of satellite workers are bad, we mark the entire satellite is bad. Note that this needs to + // be used with caution (controlled by CC_ENABLE_ENTIRE_SATELLITE_MONITORING knob), since the slow workers may + // also be caused by workload. + if (satelliteBadServerCount >= SERVER_KNOBS->CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER) { + currentDegradationInfo.degradedSatellite = true; + } + return currentDegradationInfo; } // Whether the transaction system (in primary DC if in HA setting) contains degraded servers. bool transactionSystemContainsDegradedServers() { const ServerDBInfo dbi = db.serverInfo->get(); - for (const auto& excludedServer : degradedServers) { + for (const auto& excludedServer : degradationInfo.degradedServers) { if (dbi.master.addresses().contains(excludedServer)) { return true; } @@ -3083,7 +3108,7 @@ public: return false; } - for (const auto& excludedServer : degradedServers) { + for (const auto& excludedServer : degradationInfo.degradedServers) { if (addressInDbAndRemoteDc(excludedServer, db.serverInfo)) { return true; } @@ -3121,7 +3146,7 @@ public: // Returns true when the cluster controller should trigger a recovery due to degraded servers used in the // transaction system in the primary data center. bool shouldTriggerRecoveryDueToDegradedServers() { - if (degradedServers.size() > SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) { + if (degradationInfo.degradedServers.size() > SERVER_KNOBS->CC_MAX_EXCLUSION_DUE_TO_HEALTH) { return false; } @@ -3154,8 +3179,14 @@ public: return false; } - if (degradedServers.size() < SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION || - degradedServers.size() > SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION) { + bool remoteIsHealthy = !remoteTransactionSystemContainsDegradedServers(); + if (degradationInfo.degradedSatellite && remoteIsHealthy) { + // If the satellite DC is bad, a failover is desired despite the number of degraded servers. + return true; + } + + if (degradationInfo.degradedServers.size() < SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MIN_DEGRADATION || + degradationInfo.degradedServers.size() > SERVER_KNOBS->CC_FAILOVER_DUE_TO_HEALTH_MAX_DEGRADATION) { return false; } @@ -3165,7 +3196,7 @@ public: return false; } - return transactionSystemContainsDegradedServers() && !remoteTransactionSystemContainsDegradedServers(); + return transactionSystemContainsDegradedServers() && remoteIsHealthy; } int recentRecoveryCountDueToHealth() { @@ -3248,9 +3279,7 @@ public: // TODO(zhewu): Include disk and CPU signals. }; std::unordered_map workerHealth; - std::unordered_set - degradedServers; // The servers that the cluster controller is considered as degraded. The servers in this list - // are not excluded unless they are added to `excludedDegradedServers`. + DegradationInfo degradationInfo; std::unordered_set excludedDegradedServers; // The degraded servers to be excluded when assigning workers to roles. std::queue recentHealthTriggeredRecoveryTime; diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 695990aa7a..78ef782a33 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -1142,6 +1142,10 @@ ACTOR Future backupWorker(BackupInterface bi, void registerThreadForProfiling(); +// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's primary +// satellite DC. +bool addressInDbAndPrimarySatelliteDc(const NetworkAddress& address, Reference const> dbInfo); + // Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's remote DC. bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference const> dbInfo); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 088b0697dc..f416098021 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -778,6 +778,82 @@ TEST_CASE("/fdbserver/worker/addressInDbAndPrimaryDc") { } // namespace +// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's primary +// satellite DC. +bool addressInDbAndPrimarySatelliteDc(const NetworkAddress& address, Reference const> dbInfo) { + for (const auto& logSet : dbInfo->get().logSystemConfig.tLogs) { + if (logSet.isLocal && logSet.locality == tagLocalitySatellite) { + for (const auto& tlog : logSet.tLogs) { + if (tlog.present() && tlog.interf().addresses().contains(address)) { + return true; + } + } + } + } + + return false; +} + +bool addressesInDbAndPrimarySatelliteDc(const NetworkAddressList& addresses, + Reference const> dbInfo) { + return addressInDbAndPrimarySatelliteDc(addresses.address, dbInfo) || + (addresses.secondaryAddress.present() && + addressInDbAndPrimarySatelliteDc(addresses.secondaryAddress.get(), dbInfo)); +} + +namespace { + +TEST_CASE("/fdbserver/worker/addressInDbAndPrimarySatelliteDc") { + // Setup a ServerDBInfo for test. + ServerDBInfo testDbInfo; + LocalityData testLocal; + testLocal.set(LiteralStringRef("dcid"), StringRef(std::to_string(1))); + testDbInfo.master.locality = testLocal; + + // First, create an empty TLogInterface, and check that it shouldn't be considered as in satellite DC. + testDbInfo.logSystemConfig.tLogs.push_back(TLogSet()); + testDbInfo.logSystemConfig.tLogs.back().isLocal = true; + testDbInfo.logSystemConfig.tLogs.back().locality = tagLocalitySatellite; + testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface()); + ASSERT(!addressInDbAndPrimarySatelliteDc(g_network->getLocalAddress(), + makeReference>(testDbInfo))); + + // Create a satellite tlog, and it should be considered as in primary satellite DC. + NetworkAddress satelliteTLogAddress(IPAddress(0x13131313), 1); + TLogInterface satelliteTLog(testLocal); + satelliteTLog.initEndpoints(); + satelliteTLog.peekMessages = RequestStream(Endpoint({ satelliteTLogAddress }, UID(1, 2))); + testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(satelliteTLog)); + ASSERT(addressInDbAndPrimarySatelliteDc(satelliteTLogAddress, makeReference>(testDbInfo))); + + // Create a primary TLog, and it shouldn't be considered as in primary Satellite DC. + NetworkAddress primaryTLogAddress(IPAddress(0x26262626), 1); + testDbInfo.logSystemConfig.tLogs.push_back(TLogSet()); + testDbInfo.logSystemConfig.tLogs.back().isLocal = true; + TLogInterface primaryTLog(testLocal); + primaryTLog.initEndpoints(); + primaryTLog.peekMessages = RequestStream(Endpoint({ primaryTLogAddress }, UID(1, 2))); + testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(primaryTLog)); + ASSERT(!addressInDbAndPrimarySatelliteDc(primaryTLogAddress, makeReference>(testDbInfo))); + + // Create a remote TLog, and it should be considered as in remote DC. + NetworkAddress remoteTLogAddress(IPAddress(0x37373737), 1); + LocalityData fakeRemote; + fakeRemote.set(LiteralStringRef("dcid"), StringRef(std::to_string(2))); + TLogInterface remoteTLog(fakeRemote); + remoteTLog.initEndpoints(); + remoteTLog.peekMessages = RequestStream(Endpoint({ remoteTLogAddress }, UID(1, 2))); + + testDbInfo.logSystemConfig.tLogs.push_back(TLogSet()); + testDbInfo.logSystemConfig.tLogs.back().isLocal = false; + testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(remoteTLog)); + ASSERT(!addressInDbAndPrimarySatelliteDc(remoteTLogAddress, makeReference>(testDbInfo))); + + return Void(); +} + +} // namespace + bool addressInDbAndRemoteDc(const NetworkAddress& address, Reference const> dbInfo) { const auto& dbi = dbInfo->get(); @@ -872,17 +948,17 @@ ACTOR Future healthMonitor(ReferenceconnectFailedCount == 0 && peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) { @@ -895,37 +971,50 @@ ACTOR Future healthMonitor(ReferenceconnectFailedCount >= SERVER_KNOBS->PEER_DEGRADATION_CONNECTION_FAILURE_COUNT || peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE) > SERVER_KNOBS->PEER_LATENCY_DEGRADATION_THRESHOLD || peer->timeoutCount / (double)(peer->pingLatencies.getPopulationSize()) > SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) { - // This is a degraded peer. - TraceEvent("HealthMonitorDetectDegradedPeer") - .suppressFor(30) - .detail("Peer", address) - .detail("Elapsed", now() - peer->lastLoggedTime) - .detail("MinLatency", peer->pingLatencies.min()) - .detail("MaxLatency", peer->pingLatencies.max()) - .detail("MeanLatency", peer->pingLatencies.mean()) - .detail("MedianLatency", peer->pingLatencies.median()) - .detail("CheckedPercentile", SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE) - .detail( - "CheckedPercentileLatency", - peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE)) - .detail("PingCount", peer->pingLatencies.getPopulationSize()) - .detail("PingTimeoutCount", peer->timeoutCount) - .detail("ConnectionFailureCount", peer->connectFailedCount); - - req.degradedPeers.push_back(address); + degradedPeer = true; } + } else if (workerLocation == Primary && addressInDbAndPrimarySatelliteDc(address, dbInfo)) { + // Monitors inter DC latencies between servers in primary and primary satellite DC. Note that + // TLog workers in primary satellite DC are on the critical path of serving a commit. + if (peer->connectFailedCount >= SERVER_KNOBS->PEER_DEGRADATION_CONNECTION_FAILURE_COUNT || + peer->pingLatencies.percentile( + SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE_SATELLITE) > + SERVER_KNOBS->PEER_LATENCY_DEGRADATION_THRESHOLD_SATELLITE || + peer->timeoutCount / (double)(peer->pingLatencies.getPopulationSize()) > + SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) { + degradedPeer = true; + } + } + + if (degradedPeer) { + TraceEvent("HealthMonitorDetectDegradedPeer") + .suppressFor(30) + .detail("Peer", address) + .detail("Elapsed", now() - peer->lastLoggedTime) + .detail("MinLatency", peer->pingLatencies.min()) + .detail("MaxLatency", peer->pingLatencies.max()) + .detail("MeanLatency", peer->pingLatencies.mean()) + .detail("MedianLatency", peer->pingLatencies.median()) + .detail("CheckedPercentile", SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE) + .detail("CheckedPercentileLatency", + peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE)) + .detail("PingCount", peer->pingLatencies.getPopulationSize()) + .detail("PingTimeoutCount", peer->timeoutCount) + .detail("ConnectionFailureCount", peer->connectFailedCount); + + req.degradedPeers.push_back(address); } } @@ -941,8 +1030,9 @@ ACTOR Future healthMonitor(Reference