diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 7e50f82516..62df0225ec 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -139,8 +139,8 @@ public: bool sampleReadTags(); - Reference getMasterProxies(bool useProvisionalProxies); - Future> getMasterProxiesFuture(bool useProvisionalProxies); + Reference getMasterProxies(bool useProvisionalProxies, bool useGrvProxies = false); + Future> getMasterProxiesFuture(bool useProvisionalProxies, bool useGrvProxies = false); Future onMasterProxiesChanged(); Future getHealthMetrics(bool detailed); @@ -193,6 +193,7 @@ public: AsyncTrigger masterProxiesChangeTrigger; Future monitorMasterProxiesInfoChange; Reference masterProxies; + Reference grvProxies; bool provisional; UID masterProxiesLastChange; LocalityData clientLocality; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 0137a60c45..9f859da19e 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -905,8 +905,10 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional(value.get()) : Optional>(), clientLocality.machineId(), clientLocality.dcId() ); - if( clientInfo->get().proxies.size() ) - masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies ) ); + if( clientInfo->get().proxies.size() ) { + masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies, false ) ); + grvProxies = Reference( new ProxyInfo( clientInfo->get().proxies, true ) ); + } server_interf.clear(); locationCache.insert( allKeys, Reference() ); break; @@ -915,8 +917,10 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional(value.get()) : Optional>()); - if( clientInfo->get().proxies.size() ) - masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies )); + if( clientInfo->get().proxies.size() ) { + masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies, false ) ); + grvProxies = Reference( new ProxyInfo( clientInfo->get().proxies, true ) ); + } server_interf.clear(); locationCache.insert( allKeys, Reference() ); break; @@ -958,6 +962,7 @@ ACTOR static Future switchConnectionFileImpl(ReferencemasterProxies.clear(); + self->grvProxies.clear(); self->minAcceptableReadVersion = std::numeric_limits::max(); self->invalidateCache(allKeys); @@ -1298,25 +1303,30 @@ void stopNetwork() { closeTraceFile(); } -Reference DatabaseContext::getMasterProxies(bool useProvisionalProxies) { +Reference DatabaseContext::getMasterProxies(bool useProvisionalProxies, bool useGrvProxies) { if (masterProxiesLastChange != clientInfo->get().id) { masterProxiesLastChange = clientInfo->get().id; masterProxies.clear(); + grvProxies.clear(); if( clientInfo->get().proxies.size() ) { - masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies )); + masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies, false ) ); + grvProxies = Reference( new ProxyInfo( clientInfo->get().proxies, true ) ); provisional = clientInfo->get().proxies[0].provisional; } } if(provisional && !useProvisionalProxies) { return Reference(); } + if(useGrvProxies) { + return grvProxies; + } return masterProxies; } //Actor which will wait until the MultiInterface returned by the DatabaseContext cx is not NULL -ACTOR Future> getMasterProxiesFuture(DatabaseContext *cx, bool useProvisionalProxies) { +ACTOR Future> getMasterProxiesFuture(DatabaseContext *cx, bool useProvisionalProxies, bool useGrvProxies) { loop{ - Reference proxies = cx->getMasterProxies(useProvisionalProxies); + Reference proxies = cx->getMasterProxies(useProvisionalProxies, useGrvProxies); if (proxies) return proxies; wait( cx->onMasterProxiesChanged() ); @@ -3467,7 +3477,7 @@ ACTOR Future getConsistentReadVersion( DatabaseContext *cx, state GetReadVersionRequest req( transactionCount, priority, flags, tags, debugID ); choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} - when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) { + when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES, true), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) { if(tags.size() != 0) { auto &priorityThrottledTags = cx->throttledTags[priority]; for(auto& tag : tags) { diff --git a/fdbrpc/MultiInterface.h b/fdbrpc/MultiInterface.h index 1535c6ef03..171f78f7e9 100644 --- a/fdbrpc/MultiInterface.h +++ b/fdbrpc/MultiInterface.h @@ -81,7 +81,7 @@ struct AlternativeInfo { template class ModelInterface : public ReferenceCounted> { public: - ModelInterface( const vector& v ) { + ModelInterface( const vector& v, bool balanceOnRequests ) : balanceOnRequests(balanceOnRequests) { for(int i = 0; i < v.size(); i++) { alternatives.push_back(AlternativeInfo(v[i], 1.0/v.size(), (i+1.0)/v.size())); } @@ -106,22 +106,24 @@ public: } void updateProbabilities() { - double totalBusyTime = 0; + double totalBusy = 0; for(auto& it : alternatives) { - totalBusyTime += it.processBusyTime; + int busyMetric = balanceOnRequests ? it.processBusyTime/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION : it.processBusyTime%FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION; + totalBusy += busyMetric; if(now() - it.lastUpdate > FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/2.0) { return; } } - //Do not update probabilities if the average proxy busyness is less than 5% - if(totalBusyTime < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_AMOUNT*alternatives.size()) { + if((balanceOnRequests && totalBusy < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_REQUESTS*alternatives.size()) || + (!balanceOnRequests && totalBusy < FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION*FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_CPU*alternatives.size())) { return; } double totalProbability = 0; for(auto& it : alternatives) { - it.probability += (1.0/alternatives.size()-(it.processBusyTime/totalBusyTime))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE; + int busyMetric = balanceOnRequests ? it.processBusyTime/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION : it.processBusyTime%FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION; + it.probability += (1.0/alternatives.size()-(busyMetric/totalBusy))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE; it.probability = std::max(it.probability, 1/(FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB*alternatives.size())); it.probability = std::min(it.probability, FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB/alternatives.size()); totalProbability += it.probability; @@ -155,6 +157,7 @@ public: private: vector> alternatives; Future updater; + bool balanceOnRequests; }; template diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index c0113e0aaa..bf71892934 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -425,7 +425,7 @@ struct BackupData { ACTOR static Future _getMinKnownCommittedVersion(BackupData* self) { loop { - GetReadVersionRequest request(1, TransactionPriority::DEFAULT, + GetReadVersionRequest request(0, TransactionPriority::DEFAULT, GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION); choose { when(wait(self->cx->onMasterProxiesChanged())) {} diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index a9645e4280..663a521042 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -96,8 +96,34 @@ struct ProxyStats { Future logger; + int recentRequests; + Deque requestBuckets; + double lastBucketBegin; + double bucketInterval; + + void updateRequestBuckets() { + while(now() - lastBucketBegin > bucketInterval) { + lastBucketBegin += bucketInterval; + recentRequests -= requestBuckets.front(); + requestBuckets.pop_front(); + requestBuckets.push_back(0); + } + } + + void addRequest(int transactionCount) { + updateRequestBuckets(); + recentRequests += transactionCount; + requestBuckets.back() += transactionCount; + } + + int getRecentRequests() { + updateRequestBuckets(); + return recentRequests*FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now())); + } + explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr) - : cc("ProxyStats", id.toString()), + : cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), + bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS), txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc), @@ -123,6 +149,9 @@ struct ProxyStats { specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); }); specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; }); logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics"); + for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) { + requestBuckets.push_back(0); + } } }; @@ -291,6 +320,7 @@ ACTOR Future queueTransactionStartRequests( req.reply.send(rep); TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60); } else { + stats->addRequest(req.transactionCount); // TODO: check whether this is reasonable to do in the fast path for(auto tag : req.tags) { (*transactionTagCounter)[tag.first] += tag.second; @@ -1409,7 +1439,8 @@ ACTOR Future getLiveCommittedVersion(ProxyCommitData* commi rep = v; } } - rep.processBusyTime = 1e6 * (g_network->isSimulated() ? deterministicRandom()->random01() : g_network->networkInfo.metrics.lastRunLoopBusyness); + rep.processBusyTime = FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION*std::min((std::numeric_limits::max()/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION)-1,commitData->stats.getRecentRequests()); + rep.processBusyTime += FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION*(g_network->isSimulated() ? deterministicRandom()->random01() : g_network->networkInfo.metrics.lastRunLoopBusyness); if (debugID.present()) { g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After"); diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 5019923e8c..7cce22b0ac 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -214,7 +214,10 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) { init( BASIC_LOAD_BALANCE_UPDATE_RATE, 10.0 ); //should be longer than the rate we log network metrics init( BASIC_LOAD_BALANCE_MAX_CHANGE, 0.10 ); init( BASIC_LOAD_BALANCE_MAX_PROB, 2.0 ); - init( BASIC_LOAD_BALANCE_MIN_AMOUNT, 50000 ); //Will not update probabilities if the average proxy busyness is less than 5% + init( BASIC_LOAD_BALANCE_MIN_REQUESTS, 50 ); + init( BASIC_LOAD_BALANCE_MIN_CPU, 0.05 ); + init( BASIC_LOAD_BALANCE_BUCKETS, 40 ); + init( BASIC_LOAD_BALANCE_COMPUTE_PRECISION, 1000 ); // Health Monitor init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0; diff --git a/flow/Knobs.h b/flow/Knobs.h index bb719c2686..ff95df21fd 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -233,7 +233,10 @@ public: double BASIC_LOAD_BALANCE_UPDATE_RATE; double BASIC_LOAD_BALANCE_MAX_CHANGE; double BASIC_LOAD_BALANCE_MAX_PROB; - double BASIC_LOAD_BALANCE_MIN_AMOUNT; + int BASIC_LOAD_BALANCE_BUCKETS; + int BASIC_LOAD_BALANCE_COMPUTE_PRECISION; + double BASIC_LOAD_BALANCE_MIN_REQUESTS; + double BASIC_LOAD_BALANCE_MIN_CPU; // Health Monitor int FAILURE_DETECTION_DELAY; diff --git a/flow/network.h b/flow/network.h index 5aa083185b..d2efa76d7a 100644 --- a/flow/network.h +++ b/flow/network.h @@ -341,7 +341,7 @@ struct NetworkMetrics { static const std::vector starvationBins; - NetworkMetrics() { + NetworkMetrics() : lastRunLoopBusyness(0) { for(int priority : starvationBins) { starvationTrackers.emplace_back(static_cast(priority)); }