/* * FailureMonitorClient.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbclient/FailureMonitorClient.h" #include "fdbrpc/FailureMonitor.h" #include "fdbclient/ClusterInterface.h" struct FailureMonitorClientState : ReferenceCounted { std::set knownAddrs; double serverFailedTimeout; FailureMonitorClientState() { serverFailedTimeout = CLIENT_KNOBS->FAILURE_TIMEOUT_DELAY; } }; ACTOR Future failureMonitorClientLoop( SimpleFailureMonitor* monitor, ClusterInterface controller, Reference fmState, bool trackMyStatus) { state Version version = 0; state Future request = Never(); state Future nextRequest = delay(0, TaskFailureMonitor); state Future requestTimeout = Never(); state double before = now(); state double waitfor = 0; monitor->setStatus(controller.failureMonitoring.getEndpoint().getPrimaryAddress(), FailureStatus(false)); fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().getCompatibleAddress() ); //The cluster controller's address (controller.failureMonitoring.getEndpoint().getPrimaryAddress()) is treated specially because we can declare that it is down independently //of the response from the cluster controller. It still needs to be in knownAddrs in case the cluster controller changes, so the next cluster controller resets its state try { loop { choose { when( FailureMonitoringReply reply = wait( request ) ) { g_network->setCurrentTask(TaskDefaultDelay); request = Never(); requestTimeout = Never(); if (reply.allOthersFailed) { // Reset all systems *not* mentioned in the reply to the default (failed) state fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().getPrimaryAddress() ); std::set changedAddresses; for(int c=0; cknownAddrs) if (!changedAddresses.count( it )) monitor->setStatus( it, FailureStatus() ); fmState->knownAddrs.clear(); } else { ASSERT( version != 0 ); } if( monitor->getState( controller.failureMonitoring.getEndpoint() ).isFailed() ) TraceEvent("FailureMonitoringServerUp").detail("OldServer",controller.id()); monitor->setStatus( controller.failureMonitoring.getEndpoint().getPrimaryAddress(), FailureStatus(false) ); fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().getPrimaryAddress() ); //if (version != reply.failureInformationVersion) // printf("Client '%s': update from %lld to %lld (%d changes, aof=%d)\n", g_network->getLocalAddress().toString().c_str(), version, reply.failureInformationVersion, reply.changes.size(), reply.allOthersFailed); version = reply.failureInformationVersion; fmState->serverFailedTimeout = reply.considerServerFailedTimeoutMS * .001; for(int c=0; cgetLocalAddress().toString().c_str(), reply.changes[c].address.toString().c_str(), reply.changes[c].status.failed ? "Failed" : "OK"); monitor->setStatus( reply.changes[c].address, reply.changes[c].status ); if (reply.changes[c].status != FailureStatus()) fmState->knownAddrs.insert( reply.changes[c].address ); else fmState->knownAddrs.erase( reply.changes[c].address ); ASSERT( reply.changes[c].address != controller.failureMonitoring.getEndpoint().getPrimaryAddress() || !reply.changes[c].status.failed ); } before = now(); waitfor = reply.clientRequestIntervalMS * .001; nextRequest = delayJittered( waitfor, TaskFailureMonitor ); } when( wait( requestTimeout ) ) { g_network->setCurrentTask(TaskDefaultDelay); requestTimeout = Never(); TraceEvent(SevWarn, "FailureMonitoringServerDown").detail("OldServerID",controller.id()); monitor->setStatus( controller.failureMonitoring.getEndpoint().getPrimaryAddress(), FailureStatus(true) ); fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().getPrimaryAddress() ); } when( wait( nextRequest ) ) { g_network->setCurrentTask(TaskDefaultDelay); nextRequest = Never(); double elapsed = now() - before; double slowThreshold = .200 + waitfor + FLOW_KNOBS->MAX_BUGGIFIED_DELAY; double warnAlwaysThreshold = CLIENT_KNOBS->FAILURE_MIN_DELAY/2; if (elapsed > slowThreshold && g_random->random01() < elapsed / warnAlwaysThreshold) { TraceEvent(elapsed > warnAlwaysThreshold ? SevWarnAlways : SevWarn, "FailureMonitorClientSlow").detail("Elapsed", elapsed).detail("Expected", waitfor); } FailureMonitoringRequest req; req.failureInformationVersion = version; if (trackMyStatus) req.senderStatus = FailureStatus(false); request = controller.failureMonitoring.getReply( req, TaskFailureMonitor ); if(!controller.failureMonitoring.getEndpoint().isLocal()) requestTimeout = delay( fmState->serverFailedTimeout, TaskFailureMonitor ); } } } } catch (Error& e) { if (e.code() == error_code_broken_promise) // broken promise from clustercontroller means it has died (and hopefully will be replaced) return Void(); TraceEvent(SevError, "FailureMonitorClientError").error(e); throw; // goes nowhere } } ACTOR Future failureMonitorClient( Reference>> ci, bool trackMyStatus ) { state SimpleFailureMonitor* monitor = static_cast( &IFailureMonitor::failureMonitor() ); state Reference fmState = Reference(new FailureMonitorClientState()); loop { state Future client = ci->get().present() ? failureMonitorClientLoop(monitor, ci->get().get(), fmState, trackMyStatus) : Void(); wait( ci->onChange() ); } }