diff --git a/cmake/ConfigureCompiler.cmake b/cmake/ConfigureCompiler.cmake index ddb2f38792..bc5bfd321a 100644 --- a/cmake/ConfigureCompiler.cmake +++ b/cmake/ConfigureCompiler.cmake @@ -4,6 +4,7 @@ env_set(USE_GPERFTOOLS OFF BOOL "Use gperfools for profiling") env_set(USE_DTRACE ON BOOL "Enable dtrace probes on supported platforms") env_set(USE_VALGRIND OFF BOOL "Compile for valgrind usage") env_set(USE_VALGRIND_FOR_CTEST ${USE_VALGRIND} BOOL "Use valgrind for ctest") +env_set(VALGRIND_ARENA OFF BOOL "Inform valgrind about arena-allocated memory. Makes valgrind slower but more precise.") env_set(ALLOC_INSTRUMENTATION OFF BOOL "Instrument alloc") env_set(WITH_UNDODB OFF BOOL "Use rr or undodb") env_set(USE_ASAN OFF BOOL "Compile with address sanitizer") @@ -210,7 +211,10 @@ else() # -msse4.2) if (USE_VALGRIND) - add_compile_options(-DVALGRIND -DUSE_VALGRIND) + add_compile_options(-DVALGRIND=1 -DUSE_VALGRIND=1) + endif() + if (VALGRIND_ARENA) + add_compile_options(-DVALGRIND_ARENA=1) endif() if (CLANG) add_compile_options() @@ -241,7 +245,10 @@ else() -Wno-delete-non-virtual-dtor -Wno-undefined-var-template -Wno-tautological-pointer-compare - -Wno-format) + -Wno-format + -Wredundant-move + -Wpessimizing-move + ) if (USE_CCACHE) add_compile_options( -Wno-register @@ -305,3 +312,4 @@ else() endif() endif() endif() + diff --git a/documentation/sphinx/source/api-python.rst b/documentation/sphinx/source/api-python.rst index 52962c6dd3..dafd2479d1 100644 --- a/documentation/sphinx/source/api-python.rst +++ b/documentation/sphinx/source/api-python.rst @@ -1158,7 +1158,7 @@ the most part, this also implies that ``T == fdb.tuple.unpack(fdb.tuple.pack(T)) .. method:: has_incomplete_versionstamp(tuple) Returns ``True`` if there is at least one element contained within the tuple that is a - :class`Versionstamp` instance that is incomplete. If there are multiple incomplete + :class:`Versionstamp` instance that is incomplete. If there are multiple incomplete :class:`Versionstamp` instances, this method will return ``True``, but trying to pack it into a byte string will result in an error. diff --git a/documentation/sphinx/source/mr-status-json-schemas.rst.inc b/documentation/sphinx/source/mr-status-json-schemas.rst.inc index 8c8363c981..294ddc560c 100644 --- a/documentation/sphinx/source/mr-status-json-schemas.rst.inc +++ b/documentation/sphinx/source/mr-status-json-schemas.rst.inc @@ -113,6 +113,44 @@ "counter":0, "roughness":0.0 }, + "grv_latency_statistics":{ + "default":{ + "count":0, + "min":0.0, + "max":0.0, + "median":0.0, + "mean":0.0, + "p25":0.0, + "p90":0.0, + "p95":0.0, + "p99":0.0, + "p99.9":0.0 + } + }, + "read_latency_statistics":{ + "count":0, + "min":0.0, + "max":0.0, + "median":0.0, + "mean":0.0, + "p25":0.0, + "p90":0.0, + "p95":0.0, + "p99":0.0, + "p99.9":0.0 + }, + "commit_latency_statistics":{ + "count":0, + "min":0.0, + "max":0.0, + "median":0.0, + "mean":0.0, + "p25":0.0, + "p90":0.0, + "p95":0.0, + "p99":0.0, + "p99.9":0.0 + }, "grv_latency_bands":{ // How many GRV requests belong to the latency (in seconds) band (e.g., How many requests belong to [0.01,0.1] latency band). The key is the upper bound of the band and the lower bound is the next smallest band (or 0, if none). Example: {0.01: 27, 0.1: 18, 1: 1, inf: 98,filtered: 10}, we have 18 requests in [0.01, 0.1) band. "$map_key=upperBoundOfBand": 1 }, diff --git a/documentation/sphinx/source/release-notes/release-notes-620.rst b/documentation/sphinx/source/release-notes/release-notes-620.rst index c3874babcb..f674fe0787 100644 --- a/documentation/sphinx/source/release-notes/release-notes-620.rst +++ b/documentation/sphinx/source/release-notes/release-notes-620.rst @@ -5,9 +5,17 @@ Release Notes 6.2.23 ====== +Fixes +----- + +* When configured with ``usable_regions=2`` data distribution could temporarily lower the replication of a shard when moving it. `(PR #3487) `_ +* Prevent data distribution from running out of memory by fetching the source servers for too many shards in parallel. `(PR #3487) `_ +* Reset network connections between log routers and satellite tlogs if the latencies are larger than 500ms. `(PR #3487) `_ + Status ------ +* Added per-process server request latency statistics reported in the role section of relevant processes. These are named ``grv_latency_statistics`` and ``commit_latency_statistics`` on proxy roles and ``read_latency_statistics`` on storage roles. `(PR #3480) `_ * Added ``cluster.active_primary_dc`` that indicates which datacenter is serving as the primary datacenter in multi-region setups. `(PR #3320) `_ 6.2.22 diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index 7216015535..066c235c8b 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -32,7 +32,7 @@ #include "fdbserver/RatekeeperInterface.h" #include "fdbclient/TagThrottle.h" -#include "flow/Stats.h" +#include "fdbrpc/Stats.h" #include "fdbrpc/TimedRequest.h" struct MasterProxyInterface { diff --git a/fdbclient/RestoreWorkerInterface.actor.h b/fdbclient/RestoreWorkerInterface.actor.h index 61690f951e..c7eda2af8c 100644 --- a/fdbclient/RestoreWorkerInterface.actor.h +++ b/fdbclient/RestoreWorkerInterface.actor.h @@ -30,10 +30,10 @@ #include #include -#include "flow/Stats.h" #include "flow/flow.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" +#include "fdbrpc/Stats.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/CommitTransaction.h" #include "fdbserver/CoordinationInterface.h" diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 63e4caed4a..c6d931ec9c 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -136,6 +136,44 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "counter":0, "roughness":0.0 }, + "grv_latency_statistics":{ + "default":{ + "count":0, + "min":0.0, + "max":0.0, + "median":0.0, + "mean":0.0, + "p25":0.0, + "p90":0.0, + "p95":0.0, + "p99":0.0, + "p99.9":0.0 + } + }, + "read_latency_statistics":{ + "count":0, + "min":0.0, + "max":0.0, + "median":0.0, + "mean":0.0, + "p25":0.0, + "p90":0.0, + "p95":0.0, + "p99":0.0, + "p99.9":0.0 + }, + "commit_latency_statistics":{ + "count":0, + "min":0.0, + "max":0.0, + "median":0.0, + "mean":0.0, + "p25":0.0, + "p90":0.0, + "p95":0.0, + "p99":0.0, + "p99.9":0.0 + }, "grv_latency_bands":{ "$map": 1 }, diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 7f88832a98..7492999d64 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -27,7 +27,7 @@ #include "fdbrpc/QueueModel.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/LoadBalance.actor.h" -#include "flow/Stats.h" +#include "fdbrpc/Stats.h" #include "fdbrpc/TimedRequest.h" #include "fdbclient/TagThrottle.h" diff --git a/fdbrpc/CMakeLists.txt b/fdbrpc/CMakeLists.txt index 5358221728..29d1371075 100644 --- a/fdbrpc/CMakeLists.txt +++ b/fdbrpc/CMakeLists.txt @@ -23,6 +23,8 @@ set(FDBRPC_SRCS ReplicationPolicy.cpp ReplicationTypes.cpp ReplicationUtils.cpp + Stats.actor.cpp + Stats.h sim2.actor.cpp sim_validation.cpp TimedRequest.h diff --git a/fdbrpc/ContinuousSample.h b/fdbrpc/ContinuousSample.h index 7f28928165..9b07e1c4ce 100644 --- a/fdbrpc/ContinuousSample.h +++ b/fdbrpc/ContinuousSample.h @@ -50,7 +50,7 @@ public: return *this; } - double mean() { + double mean() const { if (!samples.size()) return 0; T sum = 0; for( int c = 0; c < samples.size(); c++ ) @@ -70,8 +70,8 @@ public: return samples[ idx ]; } - T min() { return _min; } - T max() { return _max; } + T min() const { return _min; } + T max() const { return _max; } void clear() { samples.clear(); @@ -80,6 +80,10 @@ public: _min = _max = 0; // Doesn't work for all T } + uint64_t getPopulationSize() const { + return populationSize; + } + private: int sampleSize; uint64_t populationSize; diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 8b594ed519..5264b5b024 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -563,7 +563,9 @@ ACTOR Future connectionKeeper( Reference self, self->transport->countConnEstablished++; if (!delayedHealthUpdateF.isValid()) delayedHealthUpdateF = delayedHealthUpdate(self->destination); - wait(connectionWriter(self, conn) || reader || connectionMonitor(self)); + wait(connectionWriter(self, conn) || reader || connectionMonitor(self) || self->resetConnection.onTrigger()); + TraceEvent("ConnectionReset", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination); + throw connection_failed(); } catch (Error& e) { if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || e.code() == error_code_connection_unreferenced || @@ -574,8 +576,6 @@ ACTOR Future connectionKeeper( Reference self, throw e; } - - ASSERT( false ); } catch (Error& e) { delayedHealthUpdateF.cancel(); if(now() - self->lastConnectTime > FLOW_KNOBS->RECONNECTION_RESET_TIME) { @@ -1439,6 +1439,13 @@ Reference> FlowTransport::getDegraded() { return self->degraded; } +void FlowTransport::resetConnection( NetworkAddress address ) { + auto peer = self->getPeer(address); + if(peer) { + peer->resetConnection.trigger(); + } +} + bool FlowTransport::incompatibleOutgoingConnectionsPresent() { return self->numIncompatibleConnections > 0; } diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index fce0f61a9a..8b82e6b1ab 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -132,6 +132,7 @@ struct Peer : public ReferenceCounted { AsyncTrigger dataToSend; // Triggered when unsent.empty() becomes false Future connect; AsyncTrigger resetPing; + AsyncTrigger resetConnection; bool compatible; bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send double lastConnectTime; @@ -213,6 +214,9 @@ public: Reference> getDegraded(); // This async var will be set to true when the process cannot connect to a public network address that the failure monitor thinks is healthy. + void resetConnection( NetworkAddress address ); + // Forces the connection with this address to be reset + Reference sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection );// { cancelReliable(sendReliable(what,destination)); } bool incompatibleOutgoingConnectionsPresent(); diff --git a/flow/Stats.actor.cpp b/fdbrpc/Stats.actor.cpp similarity index 99% rename from flow/Stats.actor.cpp rename to fdbrpc/Stats.actor.cpp index 62774148d4..423ca8e0e5 100644 --- a/flow/Stats.actor.cpp +++ b/fdbrpc/Stats.actor.cpp @@ -18,7 +18,7 @@ * limitations under the License. */ -#include "flow/Stats.h" +#include "fdbrpc/Stats.h" #include "flow/actorcompiler.h" // has to be last include Counter::Counter(std::string const& name, CounterCollection& collection) diff --git a/flow/Stats.h b/fdbrpc/Stats.h similarity index 84% rename from flow/Stats.h rename to fdbrpc/Stats.h index 0120b5c208..f06bee00de 100644 --- a/flow/Stats.h +++ b/fdbrpc/Stats.h @@ -18,8 +18,8 @@ * limitations under the License. */ -#ifndef FLOW_STATS_H -#define FLOW_STATS_H +#ifndef FDBRPC_STATS_H +#define FDBRPC_STATS_H #pragma once // Yet another performance statistics interface @@ -37,6 +37,7 @@ MyCounters() : foo("foo", cc), bar("bar", cc), baz("baz", cc) {} #include #include "flow/flow.h" #include "flow/TDMetric.actor.h" +#include "fdbrpc/ContinuousSample.h" struct ICounter { // All counters have a name and value @@ -211,4 +212,43 @@ private: bands.insert(std::make_pair(value, new Counter(format("Band%f", value), *cc))); } }; + +class LatencySample { +public: + LatencySample(std::string name, UID id, double loggingInterval, int sampleSize) : name(name), id(id), sample(sampleSize), sampleStart(now()) { + logger = recurring([this](){ logSample(); }, loggingInterval); + } + + void addMeasurement(double measurement) { + sample.addSample(measurement); + } + +private: + std::string name; + UID id; + double sampleStart; + + ContinuousSample sample; + Future logger; + + void logSample() { + TraceEvent(name.c_str(), id) + .detail("Count", sample.getPopulationSize()) + .detail("Elapsed", now() - sampleStart) + .detail("Min", sample.min()) + .detail("Max", sample.max()) + .detail("Mean", sample.mean()) + .detail("Median", sample.median()) + .detail("P25", sample.percentile(0.25)) + .detail("P90", sample.percentile(0.9)) + .detail("P95", sample.percentile(0.95)) + .detail("P99", sample.percentile(0.99)) + .detail("P99.9", sample.percentile(0.999)) + .trackLatest(id.toString() + "/" + name); + + sample.clear(); + sampleStart = now(); + } +}; + #endif diff --git a/fdbrpc/batcher.actor.h b/fdbrpc/batcher.actor.h index 72a9bc9094..14e00f2ac4 100644 --- a/fdbrpc/batcher.actor.h +++ b/fdbrpc/batcher.actor.h @@ -27,8 +27,8 @@ #elif !defined(FLOW_BATCHER_ACTOR_H) #define FLOW_BATCHER_ACTOR_H +#include "fdbrpc/Stats.h" #include "flow/flow.h" -#include "flow/Stats.h" #include "flow/actorcompiler.h" // This must be the last #include. template diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 65bbe60f3b..fbfcb2eb77 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -768,6 +768,14 @@ struct DDTeamCollection : ReferenceCounted { } } + bool foundSrc = false; + for( int i = 0; i < req.src.size(); i++ ) { + if( self->server_info.count( req.src[i] ) ) { + foundSrc = true; + break; + } + } + // Select the best team // Currently the metric is minimum used disk space (adjusted for data in flight) // Only healthy teams may be selected. The team has to be healthy at the moment we update @@ -778,7 +786,7 @@ struct DDTeamCollection : ReferenceCounted { // self->teams.size() can be 0 under the ConfigureTest.txt test when we change configurations // The situation happens rarely. We may want to eliminate this situation someday if( !self->teams.size() ) { - req.reply.send( Optional>() ); + req.reply.send( std::make_pair(Optional>(), foundSrc) ); return Void(); } @@ -804,7 +812,8 @@ struct DDTeamCollection : ReferenceCounted { } } if(found && teamList[j]->isHealthy()) { - req.reply.send( teamList[j] ); + bestOption = teamList[j]; + req.reply.send( std::make_pair(bestOption, foundSrc) ); return Void(); } } @@ -895,7 +904,8 @@ struct DDTeamCollection : ReferenceCounted { } } if(found) { - req.reply.send( teamList[j] ); + bestOption = teamList[j]; + req.reply.send( std::make_pair(bestOption, foundSrc) ); return Void(); } } @@ -906,7 +916,7 @@ struct DDTeamCollection : ReferenceCounted { // self->traceAllInfo(true); // } - req.reply.send( bestOption ); + req.reply.send( std::make_pair(bestOption, foundSrc) ); return Void(); } catch( Error &e ) { diff --git a/fdbserver/DataDistribution.actor.h b/fdbserver/DataDistribution.actor.h index 116d6a9234..7dbefe337b 100644 --- a/fdbserver/DataDistribution.actor.h +++ b/fdbserver/DataDistribution.actor.h @@ -77,7 +77,8 @@ struct GetTeamRequest { bool teamMustHaveShards; double inflightPenalty; std::vector completeSources; - Promise< Optional< Reference > > reply; + std::vector src; + Promise< std::pair>,bool> > reply; GetTeamRequest() {} GetTeamRequest( bool wantsNewServers, bool wantsTrueBest, bool preferLowerUtilization, bool teamMustHaveShards, double inflightPenalty = 1.0 ) diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 6f189f4253..d9f178fbd0 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -358,6 +358,7 @@ struct DDQueueData { FlowLock startMoveKeysParallelismLock; FlowLock finishMoveKeysParallelismLock; + Reference fetchSourceLock; int activeRelocations; int queuedRelocations; @@ -425,7 +426,7 @@ struct DDQueueData { activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ), shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), distributorId( mid ), lock( lock ), cx( cx ), teamSize( teamSize ), singleRegionTeamSize( singleRegionTeamSize ), output( output ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), - finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited), + finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), fetchSourceLock( new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM) ), lastLimited(lastLimited), suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar(false) ) {} void validate() { @@ -531,7 +532,7 @@ struct DDQueueData { } } - ACTOR Future getSourceServersForRange( Database cx, RelocateData input, PromiseStream output ) { + ACTOR Future getSourceServersForRange( Database cx, RelocateData input, PromiseStream output, Reference fetchLock ) { state std::set servers; state Transaction tr(cx); @@ -542,6 +543,9 @@ struct DDQueueData { wait( delay( 0.0001, TaskPriority::DataDistributionLaunch ) ); } + wait( fetchLock->take( TaskPriority::DataDistributionLaunch ) ); + state FlowLock::Releaser releaser( *fetchLock ); + loop { servers.clear(); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); @@ -683,7 +687,7 @@ struct DDQueueData { startRelocation(rrs.priority, rrs.healthPriority); fetchingSourcesQueue.insert( rrs ); - getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete ) ); + getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete, fetchSourceLock ) ); } else { RelocateData newData( rrs ); newData.keys = affectedQueuedItems[r]; @@ -944,30 +948,26 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd if(rd.healthPriority == SERVER_KNOBS->PRIORITY_POPULATE_REGION || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT; auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, false, inflightPenalty); + req.src = rd.src; req.completeSources = rd.completeSources; - Optional> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req))); + std::pair>,bool> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req))); // If a DC has no healthy team, we stop checking the other DCs until // the unhealthy DC is healthy again or is excluded. - if(!bestTeam.present()) { + if(!bestTeam.first.present()) { foundTeams = false; break; } - if(!bestTeam.get()->isHealthy()) { + if(!bestTeam.first.get()->isHealthy()) { allHealthy = false; } else { anyHealthy = true; } - bool foundSource = false; - if(!rd.wantsNewServers && self->teamCollections.size() > 1) { - for(auto& it : bestTeam.get()->getServerIDs()) { - if(std::find(rd.src.begin(), rd.src.end(), it) != rd.src.end()) { - foundSource = true; - anyWithSource = true; - break; - } - } + + if(bestTeam.second) { + anyWithSource = true; } - bestTeams.push_back(std::make_pair(bestTeam.get(), foundSource)); + + bestTeams.push_back(std::make_pair(bestTeam.first.get(), bestTeam.second)); tciIndex++; } if (foundTeams && anyHealthy) { @@ -1223,7 +1223,7 @@ ACTOR Future BgDDMountainChopper( DDQueueData* self, int teamCollectionInd state double lastRead = 0; state bool skipCurrentLoop = false; loop { - state Optional> randomTeam; + state std::pair>, bool> randomTeam; state bool moved = false; state TraceEvent traceEvent("BgDDMountainChopper", self->distributorId); traceEvent.suppressFor(5.0) @@ -1259,26 +1259,26 @@ ACTOR Future BgDDMountainChopper( DDQueueData* self, int teamCollectionInd traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]); if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] < SERVER_KNOBS->DD_REBALANCE_PARALLELISM) { - Optional> _randomTeam = wait(brokenPromiseToNever( + std::pair>,bool> _randomTeam = wait(brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, true, false)))); randomTeam = _randomTeam; - traceEvent.detail("DestTeam", printable(randomTeam.map([](const Reference& team){ + traceEvent.detail("DestTeam", printable(randomTeam.first.map([](const Reference& team){ return team->getDesc(); }))); - if (randomTeam.present()) { - Optional> loadedTeam = + if (randomTeam.first.present()) { + std::pair>,bool> loadedTeam = wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply( GetTeamRequest(true, true, false, true)))); - traceEvent.detail("SourceTeam", printable(loadedTeam.map([](const Reference& team){ + traceEvent.detail("SourceTeam", printable(loadedTeam.first.map([](const Reference& team){ return team->getDesc(); }))); - if (loadedTeam.present()) { + if (loadedTeam.first.present()) { bool _moved = - wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(), - randomTeam.get(), teamCollectionIndex == 0, &traceEvent)); + wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.first.get(), + randomTeam.first.get(), teamCollectionIndex == 0, &traceEvent)); moved = _moved; if (moved) { resetCount = 0; @@ -1323,7 +1323,7 @@ ACTOR Future BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex) state bool skipCurrentLoop = false; loop { - state Optional> randomTeam; + state std::pair>, bool> randomTeam; state bool moved = false; state TraceEvent traceEvent("BgDDValleyFiller", self->distributorId); traceEvent.suppressFor(5.0) @@ -1359,25 +1359,25 @@ ACTOR Future BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex) traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]); if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] < SERVER_KNOBS->DD_REBALANCE_PARALLELISM) { - Optional> _randomTeam = wait(brokenPromiseToNever( + std::pair>,bool> _randomTeam = wait(brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, false, true)))); randomTeam = _randomTeam; - traceEvent.detail("SourceTeam", printable(randomTeam.map([](const Reference& team){ + traceEvent.detail("SourceTeam", printable(randomTeam.first.map([](const Reference& team){ return team->getDesc(); }))); - if (randomTeam.present()) { - Optional> unloadedTeam = wait(brokenPromiseToNever( + if (randomTeam.first.present()) { + std::pair>,bool> unloadedTeam = wait(brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, true, true, false)))); - traceEvent.detail("DestTeam", printable(unloadedTeam.map([](const Reference& team){ + traceEvent.detail("DestTeam", printable(unloadedTeam.first.map([](const Reference& team){ return team->getDesc(); }))); - if (unloadedTeam.present()) { + if (unloadedTeam.first.present()) { bool _moved = - wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(), - unloadedTeam.get(), teamCollectionIndex == 0, &traceEvent)); + wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.first.get(), + unloadedTeam.first.get(), teamCollectionIndex == 0, &traceEvent)); moved = _moved; if (moved) { resetCount = 0; diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index cc6614fcb7..6193d64150 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -90,6 +90,12 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( TLOG_MAX_CREATE_DURATION, 10.0 ); init( PEEK_LOGGING_AMOUNT, 5 ); init( PEEK_LOGGING_DELAY, 5.0 ); + init( PEEK_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PEEK_RESET_INTERVAL = 20.0; + init( PEEK_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PEEK_MAX_LATENCY = 0.0; + init( PEEK_COUNT_SMALL_MESSAGES, false ); if ( randomize && BUGGIFY ) PEEK_COUNT_SMALL_MESSAGES = true; + init( PEEK_STATS_INTERVAL, 10.0 ); + init( PEEK_STATS_SLOW_AMOUNT, 0 ); + init( PEEK_STATS_SLOW_RATIO, 0.5 ); // disk snapshot max timeout, to be put in TLog, storage and coordinator nodes init( SNAP_CREATE_MAX_TIMEOUT, 300.0 ); @@ -216,6 +222,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( DD_SHARD_SIZE_GRANULARITY, 5000000 ); init( DD_SHARD_SIZE_GRANULARITY_SIM, 500000 ); if( randomize && BUGGIFY ) DD_SHARD_SIZE_GRANULARITY_SIM = 0; init( DD_MOVE_KEYS_PARALLELISM, 15 ); if( randomize && BUGGIFY ) DD_MOVE_KEYS_PARALLELISM = 1; + init( DD_FETCH_SOURCE_PARALLELISM, 1000 ); if( randomize && BUGGIFY ) DD_FETCH_SOURCE_PARALLELISM = 1; init( DD_MERGE_LIMIT, 2000 ); if( randomize && BUGGIFY ) DD_MERGE_LIMIT = 2; init( DD_SHARD_METRICS_TIMEOUT, 60.0 ); if( randomize && BUGGIFY ) DD_SHARD_METRICS_TIMEOUT = 0.1; init( DD_LOCATION_CACHE_SIZE, 2000000 ); if( randomize && BUGGIFY ) DD_LOCATION_CACHE_SIZE = 3; @@ -639,6 +646,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( REDWOOD_REMAP_CLEANUP_VERSION_LAG_MIN, 4 ); init( REDWOOD_REMAP_CLEANUP_VERSION_LAG_MAX, 15 ); init( REDWOOD_LOGGING_INTERVAL, 5.0 ); + + // Server request latency measurement + init( LATENCY_SAMPLE_SIZE, 100000 ); + init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 ); // clang-format on diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 6ed5547257..397f727dbc 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -88,6 +88,12 @@ public: double TLOG_MAX_CREATE_DURATION; int PEEK_LOGGING_AMOUNT; double PEEK_LOGGING_DELAY; + double PEEK_RESET_INTERVAL; + double PEEK_MAX_LATENCY; + bool PEEK_COUNT_SMALL_MESSAGES; + double PEEK_STATS_INTERVAL; + double PEEK_STATS_SLOW_AMOUNT; + double PEEK_STATS_SLOW_RATIO; // Data distribution queue double HEALTH_POLL_TIME; @@ -165,6 +171,7 @@ public: int64_t DD_SHARD_SIZE_GRANULARITY; int64_t DD_SHARD_SIZE_GRANULARITY_SIM; int DD_MOVE_KEYS_PARALLELISM; + int DD_FETCH_SOURCE_PARALLELISM; int DD_MERGE_LIMIT; double DD_SHARD_METRICS_TIMEOUT; int64_t DD_LOCATION_CACHE_SIZE; @@ -571,6 +578,10 @@ public: int REDWOOD_REMAP_CLEANUP_VERSION_LAG_MIN; // Number of versions between head of remap queue and oldest retained version before remap cleanup starts int REDWOOD_REMAP_CLEANUP_VERSION_LAG_MAX; // Number of versions between head of remap queue and oldest retained version before remap cleanup may stop double REDWOOD_LOGGING_INTERVAL; + + // Server request latency measurement + int LATENCY_SAMPLE_SIZE; + double LATENCY_METRICS_LOGGING_INTERVAL; ServerKnobs(); void initialize(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false); diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 6b4d8411ec..3f12860cf1 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -20,6 +20,7 @@ #include "flow/ActorCollection.h" #include "fdbclient/NativeAPI.actor.h" +#include "fdbrpc/Stats.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/Knobs.h" @@ -30,7 +31,6 @@ #include "fdbserver/RecoveryState.h" #include "fdbclient/Atomic.h" #include "flow/TDMetric.actor.h" -#include "flow/Stats.h" #include "flow/actorcompiler.h" // This must be the last #include. struct LogRouterData { diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index b6ed3c0493..bf91f94e0d 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -406,6 +406,12 @@ struct ILogSystem { Deque> futureResults; Future interfaceChanged; + double lastReset; + Future resetCheck; + int slowReplies; + int fastReplies; + int unknownReplies; + ServerPeekCursor( Reference>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore ); ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, TagsAndMessage const& message, bool hasMsg, Version poppedVersion, Tag tag ); diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 51880f5064..dcc02344da 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -25,14 +25,17 @@ #include "flow/actorcompiler.h" // has to be last include ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore ) - : interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore) { + : interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0), + returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void()) +{ this->results.maxKnownVersion = 0; this->results.minKnownCommittedVersion = 0; //TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace(); } ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, TagsAndMessage const& message, bool hasMsg, Version poppedVersion, Tag tag ) - : results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageAndTags(message), hasMsg(hasMsg), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false) + : results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageAndTags(message), hasMsg(hasMsg), + randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void()) { //TraceEvent("SPC_Clone", randomID); this->results.maxKnownVersion = 0; @@ -131,6 +134,46 @@ void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) { } } +ACTOR Future resetChecker( ILogSystem::ServerPeekCursor* self, NetworkAddress addr ) { + self->slowReplies = 0; + self->unknownReplies = 0; + self->fastReplies = 0; + wait(delay(SERVER_KNOBS->PEEK_STATS_INTERVAL)); + TraceEvent("SlowPeekStats").detail("SlowReplies", self->slowReplies).detail("FastReplies", self->fastReplies).detail("UnknownReplies", self->unknownReplies); + if(self->slowReplies >= SERVER_KNOBS->PEEK_STATS_SLOW_AMOUNT && self->slowReplies/double(self->slowReplies+self->fastReplies) >= SERVER_KNOBS->PEEK_STATS_SLOW_RATIO) { + FlowTransport::transport().resetConnection(addr); + self->lastReset = now(); + } + return Void(); +} + +ACTOR Future recordRequestMetrics( ILogSystem::ServerPeekCursor* self, NetworkAddress addr, Future in ) { + try { + state double startTime = now(); + TLogPeekReply t = wait(in); + if(now()-self->lastReset > SERVER_KNOBS->PEEK_RESET_INTERVAL) { + if(now()-startTime > SERVER_KNOBS->PEEK_MAX_LATENCY) { + if(t.messages.size() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES || SERVER_KNOBS->PEEK_COUNT_SMALL_MESSAGES) { + if(self->resetCheck.isReady()) { + self->resetCheck = resetChecker(self, addr); + } + self->slowReplies++; + } else { + self->unknownReplies++; + } + } else { + self->fastReplies++; + } + } + return t; + } catch (Error& e) { + if (e.code() != error_code_broken_promise) + throw; + wait(Never()); // never return + throw internal_error(); // does not happen + } +} + ACTOR Future serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self, TaskPriority taskID ) { if( !self->interf || self->messageVersion >= self->end ) { if( self->hasMessage() ) @@ -148,7 +191,7 @@ ACTOR Future serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self try { if (self->parallelGetMore || self->onlySpilled) { while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) { - self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) ); + self->futureResults.push_back( recordRequestMetrics( self, self->interf->get().interf().peekMessages.getEndpoint().getPrimaryAddress(), self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) ); } if (self->sequence == std::numeric_limitssequence)>::max()) { throw operation_obsolete(); diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 97b55cc42d..7cd63bba7b 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -28,6 +28,7 @@ #include "fdbclient/Notified.h" #include "fdbclient/SystemData.h" #include "fdbrpc/sim_validation.h" +#include "fdbrpc/Stats.h" #include "fdbserver/ApplyMetadataMutation.h" #include "fdbserver/ConflictSet.h" #include "fdbserver/DataDistributorInterface.h" @@ -44,7 +45,6 @@ #include "fdbserver/WorkerInterface.actor.h" #include "flow/ActorCollection.h" #include "flow/Knobs.h" -#include "flow/Stats.h" #include "flow/TDMetric.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -88,6 +88,9 @@ struct ProxyStats { Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors; Version lastCommitVersionAssigned; + LatencySample commitLatencySample; + LatencySample grvLatencySample; + LatencyBands commitLatencyBands; LatencyBands grvLatencyBands; @@ -136,6 +139,8 @@ struct ProxyStats { conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc), keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc), lastCommitVersionAssigned(0), + commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;}); @@ -1331,9 +1336,11 @@ ACTOR Future commitBatch( for (int resolverInd : transactionResolverMap[t]) nextTr[resolverInd]++; // TODO: filter if pipelined with large commit + double duration = endTime - trs[t].requestTime(); + self->stats.commitLatencySample.addMeasurement(duration); if(self->latencyBandConfig.present()) { bool filter = maxTransactionBytes > self->latencyBandConfig.get().commitConfig.maxCommitBytes.orDefault(std::numeric_limits::max()); - self->stats.commitLatencyBands.addMeasurement(endTime - trs[t].requestTime(), filter); + self->stats.commitLatencyBands.addMeasurement(duration, filter); } } @@ -1439,8 +1446,12 @@ ACTOR Future sendGrvReplies(Future replyFuture, std:: double end = g_network->timer(); for(GetReadVersionRequest const& request : requests) { + double duration = end - request.requestTime(); + if(request.priority == TransactionPriority::DEFAULT) { + stats->grvLatencySample.addMeasurement(duration); + } if(request.priority >= TransactionPriority::DEFAULT) { - stats->grvLatencyBands.addMeasurement(end - request.requestTime()); + stats->grvLatencyBands.addMeasurement(duration); } if (request.flags & GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION) { diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index c0ae249076..459dcd3115 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -19,7 +19,6 @@ */ #include "flow/Hash3.h" -#include "flow/Stats.h" #include "flow/UnitTest.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" @@ -33,6 +32,7 @@ #include "fdbrpc/FailureMonitor.h" #include "fdbserver/IDiskQueue.h" #include "fdbrpc/sim_validation.h" +#include "fdbrpc/Stats.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/LogSystem.h" #include "fdbserver/WaitFailure.h" diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index d87daaee79..2577e02a06 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -19,7 +19,6 @@ */ #include "flow/Hash3.h" -#include "flow/Stats.h" #include "flow/UnitTest.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" @@ -35,6 +34,7 @@ #include "fdbserver/IDiskQueue.h" #include "fdbrpc/sim_validation.h" #include "fdbrpc/simulator.h" +#include "fdbrpc/Stats.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/LogSystem.h" #include "fdbserver/WaitFailure.h" diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index 5925010e78..6796573328 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -20,7 +20,6 @@ */ #include "flow/Hash3.h" -#include "flow/Stats.h" #include "flow/UnitTest.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" @@ -36,6 +35,7 @@ #include "fdbserver/IDiskQueue.h" #include "fdbrpc/sim_validation.h" #include "fdbrpc/simulator.h" +#include "fdbrpc/Stats.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/LogSystem.h" #include "fdbserver/WaitFailure.h" diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 570f2a6418..9faffd113b 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -28,12 +28,12 @@ #define FDBSERVER_RESTORE_APPLIER_H #include -#include "flow/Stats.h" #include "fdbclient/Atomic.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/CommitTransaction.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" +#include "fdbrpc/Stats.h" #include "fdbserver/CoordinationInterface.h" #include "fdbclient/RestoreWorkerInterface.actor.h" #include "fdbserver/RestoreUtil.h" diff --git a/fdbserver/RestoreLoader.actor.h b/fdbserver/RestoreLoader.actor.h index e9bcbe7819..c6000bba9b 100644 --- a/fdbserver/RestoreLoader.actor.h +++ b/fdbserver/RestoreLoader.actor.h @@ -28,10 +28,10 @@ #define FDBSERVER_RESTORE_LOADER_H #include -#include "flow/Stats.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/CommitTransaction.h" #include "fdbrpc/fdbrpc.h" +#include "fdbrpc/Stats.h" #include "fdbserver/CoordinationInterface.h" #include "fdbrpc/Locality.h" #include "fdbclient/RestoreWorkerInterface.actor.h" diff --git a/fdbserver/RestoreMaster.actor.h b/fdbserver/RestoreMaster.actor.h index 4012508e9c..c3b854aeae 100644 --- a/fdbserver/RestoreMaster.actor.h +++ b/fdbserver/RestoreMaster.actor.h @@ -28,12 +28,12 @@ #define FDBSERVER_RESTORE_MASTER_H #include -#include "flow/Stats.h" #include "flow/Platform.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/CommitTransaction.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" +#include "fdbrpc/Stats.h" #include "fdbserver/CoordinationInterface.h" #include "fdbserver/RestoreUtil.h" #include "fdbserver/RestoreRoleCommon.actor.h" diff --git a/fdbserver/RestoreRoleCommon.actor.h b/fdbserver/RestoreRoleCommon.actor.h index 6871346831..fe788a75bf 100644 --- a/fdbserver/RestoreRoleCommon.actor.h +++ b/fdbserver/RestoreRoleCommon.actor.h @@ -29,13 +29,13 @@ #define FDBSERVER_RestoreRoleCommon_H #include -#include "flow/Stats.h" #include "flow/SystemMonitor.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/CommitTransaction.h" #include "fdbclient/Notified.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" +#include "fdbrpc/Stats.h" #include "fdbserver/CoordinationInterface.h" #include "fdbclient/RestoreWorkerInterface.actor.h" #include "fdbserver/RestoreUtil.h" diff --git a/fdbserver/RestoreUtil.h b/fdbserver/RestoreUtil.h index e5833a9fca..b70fe08d6c 100644 --- a/fdbserver/RestoreUtil.h +++ b/fdbserver/RestoreUtil.h @@ -29,10 +29,10 @@ #include "fdbclient/Tuple.h" #include "fdbclient/CommitTransaction.h" #include "flow/flow.h" -#include "flow/Stats.h" #include "fdbrpc/TimedRequest.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/IAsyncFile.h" +#include "fdbrpc/Stats.h" #include #include diff --git a/fdbserver/RestoreWorker.actor.h b/fdbserver/RestoreWorker.actor.h index 856fb4f5bd..e4f7f49af9 100644 --- a/fdbserver/RestoreWorker.actor.h +++ b/fdbserver/RestoreWorker.actor.h @@ -28,9 +28,9 @@ #include "fdbclient/Tuple.h" #include "flow/flow.h" -#include "flow/Stats.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/IAsyncFile.h" +#include "fdbrpc/Stats.h" #include #include diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 2e7f17b21c..30194ad945 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -388,8 +388,24 @@ struct MachineMemoryInfo { struct RolesInfo { std::multimap roles; + JsonBuilderObject addLatencyStatistics(TraceEventFields const& metrics) { + JsonBuilderObject latencyStats; + latencyStats.setKeyRawNumber("count", metrics.getValue("Count")); + latencyStats.setKeyRawNumber("min", metrics.getValue("Min")); + latencyStats.setKeyRawNumber("max", metrics.getValue("Max")); + latencyStats.setKeyRawNumber("median", metrics.getValue("Median")); + latencyStats.setKeyRawNumber("mean", metrics.getValue("Mean")); + latencyStats.setKeyRawNumber("p25", metrics.getValue("P25")); + latencyStats.setKeyRawNumber("p90", metrics.getValue("P90")); + latencyStats.setKeyRawNumber("p95", metrics.getValue("P95")); + latencyStats.setKeyRawNumber("p99", metrics.getValue("P99")); + latencyStats.setKeyRawNumber("p99.9", metrics.getValue("P99.9")); + + return latencyStats; + } + JsonBuilderObject addLatencyBandInfo(TraceEventFields const& metrics) { - JsonBuilderObject latency; + JsonBuilderObject latencyBands; std::map bands; for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) { @@ -404,10 +420,10 @@ struct RolesInfo { continue; } - latency[band] = StatusCounter(itr->second).getCounter(); + latencyBands[band] = StatusCounter(itr->second).getCounter(); } - return latency; + return latencyBands; } JsonBuilderObject& addRole( NetworkAddress address, std::string const& role, UID id) { @@ -461,7 +477,12 @@ struct RolesInfo { TraceEventFields const& readLatencyMetrics = metrics.at("ReadLatencyMetrics"); if(readLatencyMetrics.size()) { - obj["read_latency_bands"] = addLatencyBandInfo(readLatencyMetrics); + obj["read_latency_statistics"] = addLatencyStatistics(readLatencyMetrics); + } + + TraceEventFields const& readLatencyBands = metrics.at("ReadLatencyBands"); + if(readLatencyBands.size()) { + obj["read_latency_bands"] = addLatencyBandInfo(readLatencyBands); } obj["data_lag"] = getLagObject(versionLag); @@ -536,12 +557,25 @@ struct RolesInfo { try { TraceEventFields const& grvLatencyMetrics = metrics.at("GRVLatencyMetrics"); if(grvLatencyMetrics.size()) { - obj["grv_latency_bands"] = addLatencyBandInfo(grvLatencyMetrics); + JsonBuilderObject priorityStats; + // We only report default priority now, but this allows us to add other priorities if we want them + priorityStats["default"] = addLatencyStatistics(grvLatencyMetrics); + obj["grv_latency_statistics"] = priorityStats; + } + + TraceEventFields const& grvLatencyBands = metrics.at("GRVLatencyBands"); + if(grvLatencyBands.size()) { + obj["grv_latency_bands"] = addLatencyBandInfo(grvLatencyBands); } TraceEventFields const& commitLatencyMetrics = metrics.at("CommitLatencyMetrics"); if(commitLatencyMetrics.size()) { - obj["commit_latency_bands"] = addLatencyBandInfo(commitLatencyMetrics); + obj["commit_latency_statistics"] = addLatencyStatistics(commitLatencyMetrics); + } + + TraceEventFields const& commitLatencyBands = metrics.at("CommitLatencyBands"); + if(commitLatencyBands.size()) { + obj["commit_latency_bands"] = addLatencyBandInfo(commitLatencyBands); } } catch (Error &e) { if(e.code() != error_code_attribute_not_found) { @@ -1552,7 +1586,7 @@ static Future>> getServerMetrics(vector ACTOR static Future>> getStorageServersAndMetrics(Database cx, std::unordered_map address_workers) { vector servers = wait(timeoutError(getStorageServers(cx, true), 5.0)); vector> results = wait( - getServerMetrics(servers, address_workers, std::vector{ "StorageMetrics", "ReadLatencyMetrics", "BusiestReadTag" })); + getServerMetrics(servers, address_workers, std::vector{ "StorageMetrics", "ReadLatencyMetrics", "ReadLatencyBands", "BusiestReadTag" })); return results; } @@ -1567,7 +1601,7 @@ ACTOR static Future>> getTLogsAndMetri ACTOR static Future>> getProxiesAndMetrics(Reference> db, std::unordered_map address_workers) { vector> results = wait(getServerMetrics( - db->get().client.proxies, address_workers, std::vector{ "GRVLatencyMetrics", "CommitLatencyMetrics" })); + db->get().client.proxies, address_workers, std::vector{ "GRVLatencyMetrics", "CommitLatencyMetrics", "GRVLatencyBands", "CommitLatencyBands" })); return results; } diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 5f22090664..bd96366758 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -19,7 +19,6 @@ */ #include "flow/Hash3.h" -#include "flow/Stats.h" #include "flow/UnitTest.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" @@ -36,6 +35,7 @@ #include "fdbserver/IDiskQueue.h" #include "fdbrpc/sim_validation.h" #include "fdbrpc/simulator.h" +#include "fdbrpc/Stats.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/LogSystem.h" #include "fdbserver/WaitFailure.h" diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 2a54e4cc80..c6ac3ef4b8 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -50,7 +50,7 @@ #include "fdbserver/WorkerInterface.actor.h" #include "fdbrpc/sim_validation.h" #include "fdbrpc/Smoother.h" -#include "flow/Stats.h" +#include "fdbrpc/Stats.h" #include "flow/TDMetric.actor.h" #include #include "flow/actorcompiler.h" // This must be the last #include. @@ -534,6 +534,7 @@ public: Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount; Counter readsRejected; + LatencySample readLatencySample; LatencyBands readLatencyBands; Counters(StorageServer* self) @@ -564,7 +565,8 @@ public: fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc), - readLatencyBands("ReadLatencyMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY) + readLatencySample("ReadLatencyMetrics", self->thisServerID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { specialCounter(cc, "LastTLogVersion", [self](){ return self->lastTLogVersion; }); specialCounter(cc, "Version", [self](){ return self->version.get(); }); @@ -994,9 +996,12 @@ ACTOR Future getValueQ( StorageServer* data, GetValueRequest req ) { ++data->counters.finishedQueries; --data->readQueueSizeMetric; + + double duration = timer() - req.requestTime(); + data->counters.readLatencySample.addMeasurement(duration); if(data->latencyBandConfig.present()) { int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits::max()); - data->counters.readLatencyBands.addMeasurement(timer() - req.requestTime(), resultSize > maxReadBytes); + data->counters.readLatencyBands.addMeasurement(duration, resultSize > maxReadBytes); } return Void(); @@ -1585,11 +1590,13 @@ ACTOR Future getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req ) ++data->counters.finishedQueries; --data->readQueueSizeMetric; + double duration = timer() - req.requestTime(); + data->counters.readLatencySample.addMeasurement(duration); if(data->latencyBandConfig.present()) { int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits::max()); int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits::max()); data->counters.readLatencyBands.addMeasurement( - timer() - req.requestTime(), resultSize > maxReadBytes || abs(req.begin.offset) > maxSelectorOffset || + duration, resultSize > maxReadBytes || abs(req.begin.offset) > maxSelectorOffset || abs(req.end.offset) > maxSelectorOffset); } @@ -1648,11 +1655,14 @@ ACTOR Future getKeyQ( StorageServer* data, GetKeyRequest req ) { ++data->counters.finishedQueries; --data->readQueueSizeMetric; + + double duration = timer() - req.requestTime(); + data->counters.readLatencySample.addMeasurement(duration); if(data->latencyBandConfig.present()) { int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits::max()); int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits::max()); data->counters.readLatencyBands.addMeasurement( - timer() - req.requestTime(), resultSize > maxReadBytes || abs(req.sel.offset) > maxSelectorOffset); + duration, resultSize > maxReadBytes || abs(req.sel.offset) > maxSelectorOffset); } return Void(); diff --git a/flow/Arena.cpp b/flow/Arena.cpp index 5e635640df..dc3b79d885 100644 --- a/flow/Arena.cpp +++ b/flow/Arena.cpp @@ -20,8 +20,98 @@ #include "Arena.h" +// See https://dox.ipxe.org/memcheck_8h_source.html and https://dox.ipxe.org/valgrind_8h_source.html for an explanation +// of valgrind client requests +#ifdef VALGRIND_ARENA +#include +#else +// Since VALGRIND_ARENA is not set, we don't want to pay the performance penalty for precise tracking of arenas. We'll +// make these macros noops just for this translation unit. +#undef VALGRIND_MAKE_MEM_NOACCESS +#undef VALGRIND_MAKE_MEM_DEFINED +#undef VALGRIND_MAKE_MEM_UNDEFINED +#define VALGRIND_MAKE_MEM_NOACCESS(addr, size) ((void)(addr), (void)(size)) +#define VALGRIND_MAKE_MEM_DEFINED(addr, size) ((void)(addr), (void)(size)) +#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size) ((void)(addr), (void)(size)) +#endif + +// For each use of arena-internal memory (e.g. ArenaBlock::getSize()), unpoison the memory before use and +// poison it when done. +// When creating a new ArenaBlock, poison the memory that will be later allocated to users. +// When allocating memory to a user, mark that memory as undefined. + +namespace { +void allow_access(ArenaBlock* b) { + if (b) { + VALGRIND_MAKE_MEM_DEFINED(b, ArenaBlock::TINY_HEADER); + int headerSize = b->isTiny() ? ArenaBlock::TINY_HEADER : sizeof(ArenaBlock); + VALGRIND_MAKE_MEM_DEFINED(b, headerSize); + } +} +void disallow_access(ArenaBlock* b) { + if (b) { + int headerSize = b->isTiny() ? ArenaBlock::TINY_HEADER : sizeof(ArenaBlock); + VALGRIND_MAKE_MEM_NOACCESS(b, headerSize); + } +} +} // namespace + +Arena::Arena() : impl(NULL) {} +Arena::Arena(size_t reservedSize) : impl(0) { + UNSTOPPABLE_ASSERT(reservedSize < std::numeric_limits::max()); + if (reservedSize) { + allow_access(impl.getPtr()); + ArenaBlock::create((int)reservedSize, impl); + disallow_access(impl.getPtr()); + } +} +Arena::Arena(const Arena& r) = default; +Arena::Arena(Arena&& r) noexcept = default; +Arena& Arena::operator=(const Arena& r) = default; +Arena& Arena::operator=(Arena&& r) noexcept = default; +void Arena::dependsOn(const Arena& p) { + if (p.impl) { + allow_access(impl.getPtr()); + allow_access(p.impl.getPtr()); + ArenaBlock::dependOn(impl, p.impl.getPtr()); + disallow_access(p.impl.getPtr()); + if (p.impl.getPtr() != impl.getPtr()) { + disallow_access(impl.getPtr()); + } + } +} +size_t Arena::getSize() const { + if (impl) { + allow_access(impl.getPtr()); + auto result = impl->totalSize(); + disallow_access(impl.getPtr()); + return result; + } + return 0; +} +bool Arena::hasFree(size_t size, const void* address) { + if (impl) { + allow_access(impl.getPtr()); + auto result = impl->unused() >= size && impl->getNextData() == address; + disallow_access(impl.getPtr()); + return result; + } + return false; +} + +void ArenaBlock::addref() { + VALGRIND_MAKE_MEM_DEFINED(this, sizeof(ThreadSafeReferenceCounted)); + ThreadSafeReferenceCounted::addref(); + VALGRIND_MAKE_MEM_NOACCESS(this, sizeof(ThreadSafeReferenceCounted)); +} + void ArenaBlock::delref() { - if (delref_no_destroy()) destroy(); + VALGRIND_MAKE_MEM_DEFINED(this, sizeof(ThreadSafeReferenceCounted)); + if (delref_no_destroy()) { + destroy(); + } else { + VALGRIND_MAKE_MEM_NOACCESS(this, sizeof(ThreadSafeReferenceCounted)); + } } bool ArenaBlock::isTiny() const { @@ -52,14 +142,20 @@ const void* ArenaBlock::getNextData() const { return (const uint8_t*)getData() + used(); } size_t ArenaBlock::totalSize() { - if (isTiny()) return size(); + if (isTiny()) { + return size(); + } size_t s = size(); int o = nextBlockOffset; while (o) { ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + o); + VALGRIND_MAKE_MEM_DEFINED(r, sizeof(ArenaBlockRef)); + allow_access(r->next); s += r->next->totalSize(); + disallow_access(r->next); o = r->nextBlockOffset; + VALGRIND_MAKE_MEM_NOACCESS(r, sizeof(ArenaBlockRef)); } return s; } @@ -71,8 +167,10 @@ void ArenaBlock::getUniqueBlocks(std::set& a) { int o = nextBlockOffset; while (o) { ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + o); + VALGRIND_MAKE_MEM_DEFINED(r, sizeof(ArenaBlockRef)); r->next->getUniqueBlocks(a); o = r->nextBlockOffset; + VALGRIND_MAKE_MEM_NOACCESS(r, sizeof(ArenaBlockRef)); } return; } @@ -91,8 +189,10 @@ int ArenaBlock::addUsed(int bytes) { void ArenaBlock::makeReference(ArenaBlock* next) { ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + bigUsed); + VALGRIND_MAKE_MEM_DEFINED(r, sizeof(ArenaBlockRef)); r->next = next; r->nextBlockOffset = nextBlockOffset; + VALGRIND_MAKE_MEM_NOACCESS(r, sizeof(ArenaBlockRef)); nextBlockOffset = bigUsed; bigUsed += sizeof(ArenaBlockRef); } @@ -107,9 +207,17 @@ void ArenaBlock::dependOn(Reference& self, ArenaBlock* other) { void* ArenaBlock::allocate(Reference& self, int bytes) { ArenaBlock* b = self.getPtr(); - if (!self || self->unused() < bytes) b = create(bytes, self); + allow_access(b); + if (!self || self->unused() < bytes) { + auto* tmp = b; + b = create(bytes, self); + disallow_access(tmp); + } - return (char*)b->getData() + b->addUsed(bytes); + void* result = (char*)b->getData() + b->addUsed(bytes); + disallow_access(b); + VALGRIND_MAKE_MEM_UNDEFINED(result, bytes); + return result; } // Return an appropriately-sized ArenaBlock to store the given data @@ -205,6 +313,7 @@ ArenaBlock* ArenaBlock::create(int dataSize, Reference& next) { } b->setrefCountUnsafe(1); next.setPtrUnsafe(b); + VALGRIND_MAKE_MEM_NOACCESS(reinterpret_cast(b) + b->used(), b->unused()); return b; } @@ -212,18 +321,23 @@ void ArenaBlock::destroy() { // If the stack never contains more than one item, nothing will be allocated from stackArena. // If stackArena is used, it will always be a linked list, so destroying *it* will not create another arena ArenaBlock* tinyStack = this; + allow_access(this); Arena stackArena; VectorRef stack(&tinyStack, 1); while (stack.size()) { ArenaBlock* b = stack.end()[-1]; stack.pop_back(); + allow_access(b); if (!b->isTiny()) { int o = b->nextBlockOffset; while (o) { ArenaBlockRef* br = (ArenaBlockRef*)((char*)b->getData() + o); + VALGRIND_MAKE_MEM_DEFINED(br, sizeof(ArenaBlockRef)); + allow_access(br->next); if (br->next->delref_no_destroy()) stack.push_back(stackArena, br->next); + disallow_access(br->next); o = br->nextBlockOffset; } } diff --git a/flow/Arena.h b/flow/Arena.h index 7673852db7..1b8e026686 100644 --- a/flow/Arena.h +++ b/flow/Arena.h @@ -90,23 +90,28 @@ class NonCopyable NonCopyable & operator = (const NonCopyable &); }; +// An Arena is a custom allocator that consists of a set of ArenaBlocks. Allocation is performed by bumping a pointer +// on the most recent ArenaBlock until the block is unable to service the next allocation request. When the current +// ArenaBlock is full, a new (larger) one is added to the Arena. Deallocation is not directly supported. Instead, +// memory is freed by deleting the entire Arena at once. See flow/README.md for details on using Arenas. class Arena { public: - inline Arena(); - inline explicit Arena( size_t reservedSize ); + Arena(); + explicit Arena(size_t reservedSize); //~Arena(); Arena(const Arena&); Arena(Arena && r) BOOST_NOEXCEPT; Arena& operator=(const Arena&); Arena& operator=(Arena&&) BOOST_NOEXCEPT; - inline void dependsOn( const Arena& p ); - inline size_t getSize() const; + void dependsOn(const Arena& p); + size_t getSize() const; - inline bool hasFree( size_t size, const void *address ); + bool hasFree(size_t size, const void* address); friend void* operator new ( size_t size, Arena& p ); friend void* operator new[] ( size_t size, Arena& p ); + //private: Reference impl; }; @@ -144,6 +149,7 @@ struct ArenaBlock : NonCopyable, ThreadSafeReferenceCounted uint32_t bigSize, bigUsed; // include block header uint32_t nextBlockOffset; + void addref(); void delref(); bool isTiny() const; int size() const; @@ -167,28 +173,6 @@ private: static void* operator new(size_t s); // not implemented }; -inline Arena::Arena() : impl( NULL ) {} -inline Arena::Arena(size_t reservedSize) : impl( 0 ) { - UNSTOPPABLE_ASSERT( reservedSize < std::numeric_limits::max() ); - if (reservedSize) - ArenaBlock::create((int)reservedSize,impl); -} -inline Arena::Arena( const Arena& r ) : impl( r.impl ) {} -inline Arena::Arena(Arena && r) BOOST_NOEXCEPT : impl(std::move(r.impl)) {} -inline Arena& Arena::operator=(const Arena& r) { - impl = r.impl; - return *this; -} -inline Arena& Arena::operator=(Arena&& r) BOOST_NOEXCEPT { - impl = std::move(r.impl); - return *this; -} -inline void Arena::dependsOn( const Arena& p ) { - if (p.impl) - ArenaBlock::dependOn( impl, p.impl.getPtr() ); -} -inline size_t Arena::getSize() const { return impl ? impl->totalSize() : 0; } -inline bool Arena::hasFree( size_t size, const void *address ) { return impl && impl->unused() >= size && impl->getNextData() == address; } inline void* operator new ( size_t size, Arena& p ) { UNSTOPPABLE_ASSERT( size < std::numeric_limits::max() ); return ArenaBlock::allocate( p.impl, (int)size ); diff --git a/flow/CMakeLists.txt b/flow/CMakeLists.txt index 6c3b8eca24..c60f990df0 100644 --- a/flow/CMakeLists.txt +++ b/flow/CMakeLists.txt @@ -46,8 +46,6 @@ set(FLOW_SRCS SignalSafeUnwind.cpp SignalSafeUnwind.h SimpleOpt.h - Stats.actor.cpp - Stats.h SystemMonitor.cpp SystemMonitor.h TDMetric.actor.h diff --git a/flow/FastAlloc.cpp b/flow/FastAlloc.cpp index 47846bc76d..33ff8446f9 100644 --- a/flow/FastAlloc.cpp +++ b/flow/FastAlloc.cpp @@ -333,6 +333,9 @@ void FastAllocator::release(void *ptr) { ASSERT(!thr.freelist == (thr.count == 0)); // freelist is empty if and only if count is 0 +#if VALGRIND + VALGRIND_MAKE_MEM_DEFINED(ptr, sizeof(void*)); +#endif ++thr.count; *(void**)ptr = thr.freelist; //check(ptr, false); diff --git a/flow/actorcompiler/ActorCompiler.cs b/flow/actorcompiler/ActorCompiler.cs index 06861a2044..b02d8d5c0a 100644 --- a/flow/actorcompiler/ActorCompiler.cs +++ b/flow/actorcompiler/ActorCompiler.cs @@ -615,7 +615,7 @@ namespace actorcompiler { LineNumber(cx.target, stmt.FirstSourceLine); if (stmt.decl.initializerConstructorSyntax || stmt.decl.initializer=="") - cx.target.WriteLine("{0} = std::move( {1}({2}) );", stmt.decl.name, stmt.decl.type, stmt.decl.initializer); + cx.target.WriteLine("{0} = {1}({2});", stmt.decl.name, stmt.decl.type, stmt.decl.initializer); else cx.target.WriteLine("{0} = {1};", stmt.decl.name, stmt.decl.initializer); }