SysTester: Load balance over multiple databases

This commit is contained in:
Vaidas Gasiunas 2022-02-21 18:54:51 +01:00
parent 11527a18d3
commit 45d0815218
4 changed files with 50 additions and 26 deletions

View File

@ -46,6 +46,7 @@ public:
int api_version = FDB_API_VERSION;
bool blockOnFutures = false;
int numClientThreads = 1;
int numDatabases = 1;
bool parseArgs(int argc, char** argv);

View File

@ -19,6 +19,7 @@
*/
#include "SysTestTransactionExecutor.h"
#include "flow/IRandom.h"
#include <iostream>
#include <cassert>
@ -137,28 +138,37 @@ private:
class TransactionExecutor : public ITransactionExecutor {
public:
TransactionExecutor() : db(nullptr), scheduler(nullptr) {}
TransactionExecutor() : scheduler(nullptr) {}
~TransactionExecutor() { release(); }
void init(IScheduler* scheduler, const char* clusterFile, const TransactionExecutorOptions& options) override {
this->scheduler = scheduler;
this->options = options;
fdb_check(fdb_create_database(clusterFile, &db));
for (int i = 0; i < options.numDatabases; i++) {
FDBDatabase* db;
fdb_check(fdb_create_database(clusterFile, &db));
databases.push_back(db);
}
}
void execute(ITransactionActor* txActor, TTaskFct cont) override {
int idx = deterministicRandom()->randomInt(0, options.numDatabases);
FDBTransaction* tx;
fdb_check(fdb_database_create_transaction(db, &tx));
fdb_check(fdb_database_create_transaction(databases[idx], &tx));
TransactionContext* ctx = new TransactionContext(tx, txActor, cont, options, scheduler);
txActor->init(ctx);
txActor->start();
}
void release() override { fdb_database_destroy(db); }
void release() override {
for (FDBDatabase* db : databases) {
fdb_database_destroy(db);
}
}
private:
FDBDatabase* db;
std::vector<FDBDatabase*> databases;
TransactionExecutorOptions options;
IScheduler* scheduler;
};

View File

@ -65,6 +65,7 @@ private:
struct TransactionExecutorOptions {
std::string prefix = "";
bool blockOnFutures = false;
int numDatabases = 1;
};
class ITransactionExecutor {

View File

@ -45,7 +45,8 @@ enum TesterOptionId {
OPT_KNOB,
OPT_API_VERSION,
OPT_BLOCK_ON_FUTURES,
OPT_NUM_CLIENT_THREADS
OPT_NUM_CLIENT_THREADS,
OPT_NUM_DATABASES
};
CSimpleOpt::SOption TesterOptionDefs[] = //
@ -60,7 +61,8 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
{ OPT_KNOB, "--knob-", SO_REQ_SEP },
{ OPT_API_VERSION, "--api-version", SO_REQ_SEP },
{ OPT_BLOCK_ON_FUTURES, "--block-on-futures", SO_NONE },
{ OPT_NUM_CLIENT_THREADS, "--num-client-threads", SO_REQ_SEP } };
{ OPT_NUM_CLIENT_THREADS, "--num-client-threads", SO_REQ_SEP },
{ OPT_NUM_DATABASES, "--num-databases", SO_REQ_SEP } };
} // namespace
@ -91,6 +93,8 @@ void TesterOptions::printProgramUsage(const char* execName) {
" Use blocking waits on futures instead of scheduling callbacks.\n"
" --num-client-threads NUM_THREADS\n"
" Number of threads to be used for execution of client workloads.\n"
" --num-databases NUM_DB\n"
" Number of database connections to be used concurrently.\n"
" -h, --help Display this help and exit.\n");
}
@ -118,25 +122,32 @@ bool TesterOptions::parseArgs(int argc, char** argv) {
return true;
}
namespace {
bool processIntArg(const CSimpleOpt& args, int& res, int minVal, int maxVal) {
char* endptr;
res = strtol(args.OptionArg(), &endptr, 10);
if (*endptr != '\0') {
fprintf(stderr, "ERROR: invalid value %s for %s\n", args.OptionArg(), args.OptionText());
return false;
}
if (res < minVal || res > maxVal) {
fprintf(stderr, "ERROR: value for %s must be between %d and %d\n", args.OptionText(), minVal, maxVal);
return false;
}
return true;
}
} // namespace
bool TesterOptions::processArg(const CSimpleOpt& args) {
switch (args.OptionId()) {
case OPT_CONNFILE:
clusterFile = args.OptionArg();
break;
case OPT_API_VERSION: {
char* endptr;
api_version = strtoul((char*)args.OptionArg(), &endptr, 10);
if (*endptr != '\0') {
fprintf(stderr, "ERROR: invalid client version %s\n", args.OptionArg());
return false;
} else if (api_version < 700 || api_version > FDB_API_VERSION) {
// multi-version fdbcli only available after 7.0
fprintf(stderr,
"ERROR: api version %s is not supported. (Min: 700, Max: %d)\n",
args.OptionArg(),
FDB_API_VERSION);
return false;
}
// multi-version fdbcli only available after 7.0
processIntArg(args, api_version, 700, FDB_API_VERSION);
break;
}
case OPT_TRACE:
@ -168,12 +179,11 @@ bool TesterOptions::processArg(const CSimpleOpt& args) {
break;
case OPT_NUM_CLIENT_THREADS:
char* endptr;
numClientThreads = strtoul((char*)args.OptionArg(), &endptr, 10);
if (*endptr != '\0' || numClientThreads <= 0 || numClientThreads > 1000) {
fprintf(stderr, "ERROR: number of threads %s\n", args.OptionArg());
return false;
}
processIntArg(args, numClientThreads, 1, 1000);
break;
case OPT_NUM_DATABASES:
processIntArg(args, numDatabases, 1, 1000);
break;
}
return true;
@ -197,6 +207,7 @@ using namespace FDBSystemTester;
void runApiCorrectness(TesterOptions& options) {
TransactionExecutorOptions txExecOptions;
txExecOptions.blockOnFutures = options.blockOnFutures;
txExecOptions.numDatabases = options.numDatabases;
IScheduler* scheduler = createScheduler(options.numClientThreads);
ITransactionExecutor* txExecutor = createTransactionExecutor();
@ -206,6 +217,7 @@ void runApiCorrectness(TesterOptions& options) {
workload->init(txExecutor, scheduler, [scheduler]() { scheduler->stop(); });
workload->start();
scheduler->join();
delete workload;
delete txExecutor;
delete scheduler;