fixed a performance regression related to broadcasting a read version to too many transactions simultaneously

This commit is contained in:
Evan Tschannen 2019-03-22 16:05:20 -07:00
parent 6905db816e
commit efbcd18987
4 changed files with 26 additions and 5 deletions

View File

@ -62,6 +62,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;
init( BROADCAST_BATCH_SIZE, 20 ); if( randomize && BUGGIFY ) BROADCAST_BATCH_SIZE = 1;
init( LOCATION_CACHE_EVICTION_SIZE, 300000 );
init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3;

View File

@ -60,6 +60,7 @@ public:
int MAX_BATCH_SIZE;
double GRV_BATCH_TIMEOUT;
int BROADCAST_BATCH_SIZE;
// When locationCache in DatabaseContext gets to be this size, items will be evicted
int LOCATION_CACHE_EVICTION_SIZE;

View File

@ -2927,9 +2927,9 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::p
addActor.send(timeReply(GRVReply.getFuture(), replyTimes));
Future<Void> batch =
broadcast(
incrementalBroadcast(
getConsistentReadVersion(cx, count, flags, std::move(debugID)),
std::vector< Promise<GetReadVersionReply> >(std::move(requests)));
std::vector< Promise<GetReadVersionReply> >(std::move(requests)), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
debugID = Optional<UID>();
requests = std::vector< Promise<GetReadVersionReply> >();
addActor.send(batch);

View File

@ -112,8 +112,6 @@ void forwardPromise( PromiseStream<T> output, Future<T> input ) {
}
}
ACTOR template <class T> Future<Void> broadcast(Future<T> input, std::vector<Promise<T>> output) {
T value = wait(input);
for (int i = 0; i<output.size(); i++)
@ -128,8 +126,29 @@ ACTOR template <class T> Future<Void> broadcast( Future<T> input, std::vector<Re
return Void();
}
ACTOR template <class T> Future<Void> incrementalBroadcast(Future<T> input, std::vector<Promise<T>> output, int batchSize) {
state T value = wait(input);
state int i = 0;
for (; i<output.size(); i++) {
output[i].send(value);
if((i+1)%batchSize==0) {
wait(delay(0));
}
}
return Void();
}
ACTOR template <class T> Future<Void> incrementalBroadcast( Future<T> input, std::vector<ReplyPromise<T>> output, int batchSize) {
state T value = wait( input );
state int i = 0;
for(; i<output.size(); i++) {
output[i].send(value);
if((i+1)%batchSize==0) {
wait(delay(0));
}
}
return Void();
}
// Needed for the call to endpointNotFound()
#include "fdbrpc/FailureMonitor.h"