Get read versions requests must be load balanced on the number of requests because ratekeeper gives out an equal budget to each proxy

This commit is contained in:
Evan Tschannen 2020-10-04 16:20:24 -07:00
parent 48ed8861f6
commit 614c8bc895
8 changed files with 74 additions and 23 deletions

View File

@ -139,8 +139,8 @@ public:
bool sampleReadTags();
Reference<ProxyInfo> getMasterProxies(bool useProvisionalProxies);
Future<Reference<ProxyInfo>> getMasterProxiesFuture(bool useProvisionalProxies);
Reference<ProxyInfo> getMasterProxies(bool useProvisionalProxies, bool useGrvProxies = false);
Future<Reference<ProxyInfo>> getMasterProxiesFuture(bool useProvisionalProxies, bool useGrvProxies = false);
Future<Void> onMasterProxiesChanged();
Future<HealthMetrics> getHealthMetrics(bool detailed);
@ -193,6 +193,7 @@ public:
AsyncTrigger masterProxiesChangeTrigger;
Future<Void> monitorMasterProxiesInfoChange;
Reference<ProxyInfo> masterProxies;
Reference<ProxyInfo> grvProxies;
bool provisional;
UID masterProxiesLastChange;
LocalityData clientLocality;

View File

@ -905,8 +905,10 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
break;
case FDBDatabaseOptions::MACHINE_ID:
clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies ) );
if( clientInfo->get().proxies.size() ) {
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, false ) );
grvProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, true ) );
}
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
@ -915,8 +917,10 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
break;
case FDBDatabaseOptions::DATACENTER_ID:
clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies ));
if( clientInfo->get().proxies.size() ) {
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, false ) );
grvProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, true ) );
}
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
@ -958,6 +962,7 @@ ACTOR static Future<Void> switchConnectionFileImpl(Reference<ClusterConnectionFi
// Reset state from former cluster.
self->masterProxies.clear();
self->grvProxies.clear();
self->minAcceptableReadVersion = std::numeric_limits<Version>::max();
self->invalidateCache(allKeys);
@ -1298,25 +1303,30 @@ void stopNetwork() {
closeTraceFile();
}
Reference<ProxyInfo> DatabaseContext::getMasterProxies(bool useProvisionalProxies) {
Reference<ProxyInfo> DatabaseContext::getMasterProxies(bool useProvisionalProxies, bool useGrvProxies) {
if (masterProxiesLastChange != clientInfo->get().id) {
masterProxiesLastChange = clientInfo->get().id;
masterProxies.clear();
grvProxies.clear();
if( clientInfo->get().proxies.size() ) {
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies ));
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, false ) );
grvProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, true ) );
provisional = clientInfo->get().proxies[0].provisional;
}
}
if(provisional && !useProvisionalProxies) {
return Reference<ProxyInfo>();
}
if(useGrvProxies) {
return grvProxies;
}
return masterProxies;
}
//Actor which will wait until the MultiInterface<MasterProxyInterface> returned by the DatabaseContext cx is not NULL
ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx, bool useProvisionalProxies) {
ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx, bool useProvisionalProxies, bool useGrvProxies) {
loop{
Reference<ProxyInfo> proxies = cx->getMasterProxies(useProvisionalProxies);
Reference<ProxyInfo> proxies = cx->getMasterProxies(useProvisionalProxies, useGrvProxies);
if (proxies)
return proxies;
wait( cx->onMasterProxiesChanged() );
@ -3467,7 +3477,7 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx,
state GetReadVersionRequest req( transactionCount, priority, flags, tags, debugID );
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES, true), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
if(tags.size() != 0) {
auto &priorityThrottledTags = cx->throttledTags[priority];
for(auto& tag : tags) {

View File

@ -81,7 +81,7 @@ struct AlternativeInfo {
template <class T>
class ModelInterface : public ReferenceCounted<ModelInterface<T>> {
public:
ModelInterface( const vector<T>& v ) {
ModelInterface( const vector<T>& v, bool balanceOnRequests ) : balanceOnRequests(balanceOnRequests) {
for(int i = 0; i < v.size(); i++) {
alternatives.push_back(AlternativeInfo(v[i], 1.0/v.size(), (i+1.0)/v.size()));
}
@ -106,22 +106,24 @@ public:
}
void updateProbabilities() {
double totalBusyTime = 0;
double totalBusy = 0;
for(auto& it : alternatives) {
totalBusyTime += it.processBusyTime;
int busyMetric = balanceOnRequests ? it.processBusyTime/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION : it.processBusyTime%FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION;
totalBusy += busyMetric;
if(now() - it.lastUpdate > FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/2.0) {
return;
}
}
//Do not update probabilities if the average proxy busyness is less than 5%
if(totalBusyTime < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_AMOUNT*alternatives.size()) {
if((balanceOnRequests && totalBusy < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_REQUESTS*alternatives.size()) ||
(!balanceOnRequests && totalBusy < FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION*FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_CPU*alternatives.size())) {
return;
}
double totalProbability = 0;
for(auto& it : alternatives) {
it.probability += (1.0/alternatives.size()-(it.processBusyTime/totalBusyTime))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
int busyMetric = balanceOnRequests ? it.processBusyTime/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION : it.processBusyTime%FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION;
it.probability += (1.0/alternatives.size()-(busyMetric/totalBusy))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
it.probability = std::max(it.probability, 1/(FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB*alternatives.size()));
it.probability = std::min(it.probability, FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB/alternatives.size());
totalProbability += it.probability;
@ -155,6 +157,7 @@ public:
private:
vector<AlternativeInfo<T>> alternatives;
Future<Void> updater;
bool balanceOnRequests;
};
template <class T>

View File

@ -425,7 +425,7 @@ struct BackupData {
ACTOR static Future<Version> _getMinKnownCommittedVersion(BackupData* self) {
loop {
GetReadVersionRequest request(1, TransactionPriority::DEFAULT,
GetReadVersionRequest request(0, TransactionPriority::DEFAULT,
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(self->cx->onMasterProxiesChanged())) {}

View File

@ -96,8 +96,34 @@ struct ProxyStats {
Future<Void> logger;
int recentRequests;
Deque<int> requestBuckets;
double lastBucketBegin;
double bucketInterval;
void updateRequestBuckets() {
while(now() - lastBucketBegin > bucketInterval) {
lastBucketBegin += bucketInterval;
recentRequests -= requestBuckets.front();
requestBuckets.pop_front();
requestBuckets.push_back(0);
}
}
void addRequest(int transactionCount) {
updateRequestBuckets();
recentRequests += transactionCount;
requestBuckets.back() += transactionCount;
}
int getRecentRequests() {
updateRequestBuckets();
return recentRequests*FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now()));
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()),
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
@ -123,6 +149,9 @@ struct ProxyStats {
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
}
}
};
@ -291,6 +320,7 @@ ACTOR Future<Void> queueTransactionStartRequests(
req.reply.send(rep);
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
} else {
stats->addRequest(req.transactionCount);
// TODO: check whether this is reasonable to do in the fast path
for(auto tag : req.tags) {
(*transactionTagCounter)[tag.first] += tag.second;
@ -1409,7 +1439,8 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
rep = v;
}
}
rep.processBusyTime = 1e6 * (g_network->isSimulated() ? deterministicRandom()->random01() : g_network->networkInfo.metrics.lastRunLoopBusyness);
rep.processBusyTime = FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION*std::min((std::numeric_limits<int>::max()/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION)-1,commitData->stats.getRecentRequests());
rep.processBusyTime += FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION*(g_network->isSimulated() ? deterministicRandom()->random01() : g_network->networkInfo.metrics.lastRunLoopBusyness);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");

View File

@ -214,7 +214,10 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( BASIC_LOAD_BALANCE_UPDATE_RATE, 10.0 ); //should be longer than the rate we log network metrics
init( BASIC_LOAD_BALANCE_MAX_CHANGE, 0.10 );
init( BASIC_LOAD_BALANCE_MAX_PROB, 2.0 );
init( BASIC_LOAD_BALANCE_MIN_AMOUNT, 50000 ); //Will not update probabilities if the average proxy busyness is less than 5%
init( BASIC_LOAD_BALANCE_MIN_REQUESTS, 50 );
init( BASIC_LOAD_BALANCE_MIN_CPU, 0.05 );
init( BASIC_LOAD_BALANCE_BUCKETS, 40 );
init( BASIC_LOAD_BALANCE_COMPUTE_PRECISION, 1000 );
// Health Monitor
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;

View File

@ -233,7 +233,10 @@ public:
double BASIC_LOAD_BALANCE_UPDATE_RATE;
double BASIC_LOAD_BALANCE_MAX_CHANGE;
double BASIC_LOAD_BALANCE_MAX_PROB;
double BASIC_LOAD_BALANCE_MIN_AMOUNT;
int BASIC_LOAD_BALANCE_BUCKETS;
int BASIC_LOAD_BALANCE_COMPUTE_PRECISION;
double BASIC_LOAD_BALANCE_MIN_REQUESTS;
double BASIC_LOAD_BALANCE_MIN_CPU;
// Health Monitor
int FAILURE_DETECTION_DELAY;

View File

@ -341,7 +341,7 @@ struct NetworkMetrics {
static const std::vector<int> starvationBins;
NetworkMetrics() {
NetworkMetrics() : lastRunLoopBusyness(0) {
for(int priority : starvationBins) {
starvationTrackers.emplace_back(static_cast<TaskPriority>(priority));
}