Allow worker health monitor to report recent destroyed peers who currently have roles in transaction systems

This commit is contained in:
Zhe Wu 2022-04-05 22:22:06 -07:00
parent aad21bec1c
commit 5fd494a57b
5 changed files with 46 additions and 0 deletions

View File

@ -719,6 +719,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( PEER_LATENCY_DEGRADATION_THRESHOLD, 0.05 );
init( PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD, 0.1 );
init( PEER_DEGRADATION_CONNECTION_FAILURE_COUNT, 1 );
init( WORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER, true );
// Test harness
init( WORKER_POLL_DELAY, 1.0 );

View File

@ -662,6 +662,9 @@ public:
double PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD; // The percentage of timeout to consider a peer degraded.
int PEER_DEGRADATION_CONNECTION_FAILURE_COUNT; // The number of connection failures experienced during measurement
// period to consider a peer degraded.
bool WORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER; // When enabled, the worker's health monitor also report any recent
// destroyed peers who are part of the transaction system to
// cluster controller.
// Test harness
double WORKER_POLL_DELAY;

View File

@ -36,6 +36,10 @@ void HealthMonitor::purgeOutdatedHistory() {
--count;
ASSERT(count >= 0);
peerClosedHistory.pop_front();
if (count == 0) {
peerClosedNum.erase(p.second);
}
} else {
break;
}
@ -44,10 +48,27 @@ void HealthMonitor::purgeOutdatedHistory() {
bool HealthMonitor::tooManyConnectionsClosed(const NetworkAddress& peerAddress) {
purgeOutdatedHistory();
if (peerClosedNum.find(peerAddress) == peerClosedNum.end()) {
return false;
}
return peerClosedNum[peerAddress] > FLOW_KNOBS->HEALTH_MONITOR_CONNECTION_MAX_CLOSED;
}
int HealthMonitor::closedConnectionsCount(const NetworkAddress& peerAddress) {
purgeOutdatedHistory();
if (peerClosedNum.find(peerAddress) == peerClosedNum.end()) {
return 0;
}
return peerClosedNum[peerAddress];
}
std::unordered_set<NetworkAddress> HealthMonitor::getRecentClosedPeers() {
purgeOutdatedHistory();
std::unordered_set<NetworkAddress> closedPeers;
for (const auto& [peerAddr, count] : peerClosedNum) {
if (count > 0) {
closedPeers.insert(peerAddr);
}
}
return closedPeers;
}

View File

@ -31,6 +31,7 @@ public:
void reportPeerClosed(const NetworkAddress& peerAddress);
bool tooManyConnectionsClosed(const NetworkAddress& peerAddress);
int closedConnectionsCount(const NetworkAddress& peerAddress);
std::unordered_set<NetworkAddress> getRecentClosedPeers();
private:
void purgeOutdatedHistory();

View File

@ -936,6 +936,26 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
}
}
}
if (SERVER_KNOBS->WORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER) {
// When the worker cannot connect to a remote peer, the peer maybe erased from the list returned
// from getAllPeers(). Therefore, we also look through all the recent closed peers in the flow
// transport's health monitor. Note that all the closed peers stored here are caused by connection
// failure, but not normal connection close. Therefore, we report all such peers if they are also
// part of the transaction sub system.
for (const auto& address : FlowTransport::transport().healthMonitor()->getRecentClosedPeers()) {
if (allPeers.find(address) != allPeers.end()) {
// We have checked this peer in the above for loop.
continue;
}
if ((workerInPrimary && addressInDbAndPrimaryDc(address, dbInfo)) ||
(!workerInPrimary && addressInDbAndRemoteDc(address, dbInfo))) {
TraceEvent("HealthMonitorDetectRecentClosedPeer").suppressFor(30).detail("Peer", address);
req.degradedPeers.push_back(address);
}
}
}
}
if (!req.degradedPeers.empty()) {