1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-06-01 18:56:00 +08:00

reset WAN network connections every 5 minutes is responses take more than 500ms

This commit is contained in:
Evan Tschannen 2020-07-09 22:50:47 -07:00
parent 0e2f5e8bb5
commit 717242a0ee
6 changed files with 78 additions and 4 deletions

@ -471,7 +471,9 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
try {
self->transport->countConnEstablished++;
wait( connectionWriter( self, conn ) || reader || connectionMonitor(self) );
wait( connectionWriter(self, conn) || reader || connectionMonitor(self) || self->resetConnection.onTrigger() );
TraceEvent("ConnectionReset", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
throw connection_failed();
} catch (Error& e) {
if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
e.code() == error_code_connection_unreferenced ||
@ -1314,6 +1316,13 @@ Reference<AsyncVar<bool>> FlowTransport::getDegraded() {
return self->degraded;
}
void FlowTransport::resetConnection( NetworkAddress address ) {
auto peer = self->getPeer(address);
if(peer) {
peer->resetConnection.trigger();
}
}
bool FlowTransport::incompatibleOutgoingConnectionsPresent() {
return self->numIncompatibleConnections > 0;
}

@ -114,6 +114,7 @@ struct Peer : public ReferenceCounted<Peer> {
AsyncTrigger dataToSend; // Triggered when unsent.empty() becomes false
Future<Void> connect;
AsyncTrigger resetPing;
AsyncTrigger resetConnection;
bool compatible;
bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send
double lastConnectTime;
@ -193,6 +194,9 @@ public:
Reference<AsyncVar<bool>> getDegraded();
// This async var will be set to true when the process cannot connect to a public network address that the failure monitor thinks is healthy.
void resetConnection( NetworkAddress address );
// Forces the connection with this address to be reset
Reference<Peer> sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection );// { cancelReliable(sendReliable(what,destination)); }
int getEndpointCount();

@ -85,6 +85,12 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( TLOG_MAX_CREATE_DURATION, 10.0 );
init( PEEK_LOGGING_AMOUNT, 5 );
init( PEEK_LOGGING_DELAY, 5.0 );
init( PEEK_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PEEK_RESET_INTERVAL = 20.0;
init( PEEK_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PEEK_MAX_LATENCY = 0.0;
init( PEEK_COUNT_SMALL_MESSAGES, false ); if ( randomize && BUGGIFY ) PEEK_COUNT_SMALL_MESSAGES = true;
init( PEEK_STATS_INTERVAL, 10.0 );
init( PEEK_STATS_SLOW_AMOUNT, 0 );
init( PEEK_STATS_SLOW_RATIO, 0.5 );
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );

@ -87,6 +87,12 @@ public:
double TLOG_MAX_CREATE_DURATION;
int PEEK_LOGGING_AMOUNT;
double PEEK_LOGGING_DELAY;
double PEEK_RESET_INTERVAL;
double PEEK_MAX_LATENCY;
bool PEEK_COUNT_SMALL_MESSAGES;
double PEEK_STATS_INTERVAL;
double PEEK_STATS_SLOW_AMOUNT;
double PEEK_STATS_SLOW_RATIO;
// Data distribution queue
double HEALTH_POLL_TIME;

@ -396,6 +396,12 @@ struct ILogSystem {
Deque<Future<TLogPeekReply>> futureResults;
Future<Void> interfaceChanged;
double lastReset;
Future<Void> resetCheck;
int slowReplies;
int fastReplies;
int unknownReplies;
ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore );
ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, int32_t messageLength, int32_t rawLength, bool hasMsg, Version poppedVersion, Tag tag );

@ -25,14 +25,17 @@
#include "flow/actorcompiler.h" // has to be last include
ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore )
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore) {
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0),
returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void())
{
this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0;
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace();
}
ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, int32_t messageLength, int32_t rawLength, bool hasMsg, Version poppedVersion, Tag tag )
: results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageLength(messageLength), rawLength(rawLength), hasMsg(hasMsg), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false)
: results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageLength(messageLength), rawLength(rawLength), hasMsg(hasMsg),
randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void())
{
//TraceEvent("SPC_Clone", randomID);
this->results.maxKnownVersion = 0;
@ -130,6 +133,46 @@ void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) {
}
}
ACTOR Future<Void> resetChecker( ILogSystem::ServerPeekCursor* self, NetworkAddress addr ) {
self->slowReplies = 0;
self->unknownReplies = 0;
self->fastReplies = 0;
wait(delay(SERVER_KNOBS->PEEK_STATS_INTERVAL));
TraceEvent("SlowPeekStats").detail("SlowReplies", self->slowReplies).detail("FastReplies", self->fastReplies).detail("UnknownReplies", self->unknownReplies);
if(self->slowReplies >= SERVER_KNOBS->PEEK_STATS_SLOW_AMOUNT && self->slowReplies/double(self->slowReplies+self->fastReplies) >= SERVER_KNOBS->PEEK_STATS_SLOW_RATIO) {
FlowTransport::transport().resetConnection(addr);
self->lastReset = now();
}
return Void();
}
ACTOR Future<TLogPeekReply> recordRequestMetrics( ILogSystem::ServerPeekCursor* self, Future<TLogPeekReply> in, NetworkAddress addr ) {
try {
state double startTime = now();
TLogPeekReply t = wait(in);
if(now()-lastReset > SERVER_KNOBS->PEEK_RESET_INTERVAL) {
if(now()-startTime > SERVER_KNOBS->PEEK_MAX_LATENCY) {
if(t.messages.size() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES || SERVER_KNOBS->PEEK_COUNT_SMALL_MESSAGES) {
if(self->resetCheck.isReady()) {
self->resetCheck = resetChecker(self, addr);
}
self->slowReplies++;
} else {
self->unknownReplies++;
}
} else {
self->fastReplies++;
}
}
return t;
} catch (Error& e) {
if (e.code() != error_code_broken_promise)
throw;
wait(Never()); // never return
throw internal_error(); // does not happen
}
}
ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self, TaskPriority taskID ) {
if( !self->interf || self->messageVersion >= self->end ) {
if( self->hasMessage() )
@ -147,7 +190,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
try {
if (self->parallelGetMore || self->onlySpilled) {
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
self->futureResults.push_back( recordRequestMetrics( self, self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
}
if (self->sequence == std::numeric_limits<decltype(self->sequence)>::max()) {
throw timed_out();