diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index fc82b60434..917664343c 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -719,6 +719,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( PEER_LATENCY_DEGRADATION_THRESHOLD, 0.05 ); init( PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD, 0.1 ); init( PEER_DEGRADATION_CONNECTION_FAILURE_COUNT, 1 ); + init( WORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER, true ); // Test harness init( WORKER_POLL_DELAY, 1.0 ); diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index ec7fdc08b0..13859ac9d6 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -662,6 +662,9 @@ public: double PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD; // The percentage of timeout to consider a peer degraded. int PEER_DEGRADATION_CONNECTION_FAILURE_COUNT; // The number of connection failures experienced during measurement // period to consider a peer degraded. + bool WORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER; // When enabled, the worker's health monitor also report any recent + // destroyed peers who are part of the transaction system to + // cluster controller. // Test harness double WORKER_POLL_DELAY; diff --git a/fdbrpc/HealthMonitor.actor.cpp b/fdbrpc/HealthMonitor.actor.cpp index db30979d0a..d187c9494c 100644 --- a/fdbrpc/HealthMonitor.actor.cpp +++ b/fdbrpc/HealthMonitor.actor.cpp @@ -36,6 +36,10 @@ void HealthMonitor::purgeOutdatedHistory() { --count; ASSERT(count >= 0); peerClosedHistory.pop_front(); + + if (count == 0) { + peerClosedNum.erase(p.second); + } } else { break; } @@ -44,10 +48,27 @@ void HealthMonitor::purgeOutdatedHistory() { bool HealthMonitor::tooManyConnectionsClosed(const NetworkAddress& peerAddress) { purgeOutdatedHistory(); + if (peerClosedNum.find(peerAddress) == peerClosedNum.end()) { + return false; + } return peerClosedNum[peerAddress] > FLOW_KNOBS->HEALTH_MONITOR_CONNECTION_MAX_CLOSED; } int HealthMonitor::closedConnectionsCount(const NetworkAddress& peerAddress) { purgeOutdatedHistory(); + if (peerClosedNum.find(peerAddress) == peerClosedNum.end()) { + return 0; + } return peerClosedNum[peerAddress]; } + +std::unordered_set HealthMonitor::getRecentClosedPeers() { + purgeOutdatedHistory(); + std::unordered_set closedPeers; + for (const auto& [peerAddr, count] : peerClosedNum) { + if (count > 0) { + closedPeers.insert(peerAddr); + } + } + return closedPeers; +} diff --git a/fdbrpc/HealthMonitor.h b/fdbrpc/HealthMonitor.h index d9e2bc8ae1..0a1da323da 100644 --- a/fdbrpc/HealthMonitor.h +++ b/fdbrpc/HealthMonitor.h @@ -31,6 +31,7 @@ public: void reportPeerClosed(const NetworkAddress& peerAddress); bool tooManyConnectionsClosed(const NetworkAddress& peerAddress); int closedConnectionsCount(const NetworkAddress& peerAddress); + std::unordered_set getRecentClosedPeers(); private: void purgeOutdatedHistory(); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index b3dcd3e10f..2a83b1d5b2 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -936,6 +936,26 @@ ACTOR Future healthMonitor(ReferenceWORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER) { + // When the worker cannot connect to a remote peer, the peer maybe erased from the list returned + // from getAllPeers(). Therefore, we also look through all the recent closed peers in the flow + // transport's health monitor. Note that all the closed peers stored here are caused by connection + // failure, but not normal connection close. Therefore, we report all such peers if they are also + // part of the transaction sub system. + for (const auto& address : FlowTransport::transport().healthMonitor()->getRecentClosedPeers()) { + if (allPeers.find(address) != allPeers.end()) { + // We have checked this peer in the above for loop. + continue; + } + + if ((workerInPrimary && addressInDbAndPrimaryDc(address, dbInfo)) || + (!workerInPrimary && addressInDbAndRemoteDc(address, dbInfo))) { + TraceEvent("HealthMonitorDetectRecentClosedPeer").suppressFor(30).detail("Peer", address); + req.degradedPeers.push_back(address); + } + } + } } if (!req.degradedPeers.empty()) {