SysTester: Use reference counting for transactions and futures; Define transaction workflow by lambdas

This commit is contained in:
Vaidas Gasiunas 2022-02-24 13:59:12 +01:00
parent b61adc10be
commit de46144af7
7 changed files with 74 additions and 102 deletions

View File

@ -34,56 +34,45 @@ void fdb_check(fdb_error_t e) {
} // namespace
Future::~Future() {
if (future_) {
fdb_future_destroy(future_);
}
}
Future::Future(FDBFuture* f) : future_(f, fdb_future_destroy) {}
void Future::reset() {
if (future_) {
fdb_future_destroy(future_);
future_ = nullptr;
}
future_.reset();
}
fdb_error_t Future::getError() {
return fdb_future_get_error(future_);
fdb_error_t Future::getError() const {
return fdb_future_get_error(future_.get());
}
std::optional<std::string_view> ValueFuture::getValue() {
std::optional<std::string_view> ValueFuture::getValue() const {
int out_present;
const std::uint8_t* val;
int vallen;
fdb_check(fdb_future_get_value(future_, &out_present, &val, &vallen));
fdb_check(fdb_future_get_value(future_.get(), &out_present, &val, &vallen));
return out_present ? std::make_optional(std::string((const char*)val, vallen)) : std::nullopt;
}
// Given an FDBDatabase, initializes a new transaction.
Transaction::Transaction(FDBTransaction* tx) : tx_(tx) {}
Transaction::Transaction(FDBTransaction* tx) : tx_(tx, fdb_transaction_destroy) {}
ValueFuture Transaction::get(std::string_view key, fdb_bool_t snapshot) {
return ValueFuture(fdb_transaction_get(tx_, (const uint8_t*)key.data(), key.size(), snapshot));
return ValueFuture(fdb_transaction_get(tx_.get(), (const uint8_t*)key.data(), key.size(), snapshot));
}
void Transaction::set(std::string_view key, std::string_view value) {
fdb_transaction_set(tx_, (const uint8_t*)key.data(), key.size(), (const uint8_t*)value.data(), value.size());
fdb_transaction_set(tx_.get(), (const uint8_t*)key.data(), key.size(), (const uint8_t*)value.data(), value.size());
}
EmptyFuture Transaction::commit() {
return EmptyFuture(fdb_transaction_commit(tx_));
Future Transaction::commit() {
return Future(fdb_transaction_commit(tx_.get()));
}
EmptyFuture Transaction::onError(fdb_error_t err) {
return EmptyFuture(fdb_transaction_on_error(tx_, err));
Future Transaction::onError(fdb_error_t err) {
return Future(fdb_transaction_on_error(tx_.get(), err));
}
void Transaction::reset() {
fdb_transaction_reset(tx_);
}
Transaction::~Transaction() {
fdb_transaction_destroy(tx_);
fdb_transaction_reset(tx_.get());
}
fdb_error_t FdbApi::setOption(FDBNetworkOption option, std::string_view value) {

View File

@ -25,6 +25,7 @@
#include <string_view>
#include <optional>
#include <memory>
#define FDB_API_VERSION 710
#include "bindings/c/foundationdb/fdb_c.h"
@ -36,54 +37,36 @@ namespace FDBSystemTester {
class Future {
public:
Future() : future_(nullptr) {}
Future(FDBFuture* f) : future_(f) {}
virtual ~Future();
Future(FDBFuture* f);
Future& operator=(Future&& other) {
if (future_) {
reset();
}
future_ = other.future_;
other.future_ = nullptr;
return *this;
}
FDBFuture* fdbFuture() { return future_.get(); };
FDBFuture* fdbFuture() { return future_; };
fdb_error_t getError();
fdb_error_t getError() const;
void reset();
protected:
FDBFuture* future_;
std::shared_ptr<FDBFuture> future_;
};
class ValueFuture : public Future {
public:
ValueFuture() = default;
ValueFuture(FDBFuture* f) : Future(f) {}
std::optional<std::string_view> getValue();
};
class EmptyFuture : public Future {
public:
EmptyFuture() = default;
EmptyFuture(FDBFuture* f) : Future(f) {}
std::optional<std::string_view> getValue() const;
};
class Transaction {
public:
// Given an FDBDatabase, initializes a new transaction.
Transaction();
Transaction(FDBTransaction* tx);
~Transaction();
ValueFuture get(std::string_view key, fdb_bool_t snapshot);
void set(std::string_view key, std::string_view value);
EmptyFuture commit();
EmptyFuture onError(fdb_error_t err);
Future commit();
Future onError(fdb_error_t err);
void reset();
private:
FDBTransaction* tx_;
std::shared_ptr<FDBTransaction> tx_;
};
class FdbApi {

View File

@ -24,28 +24,6 @@
namespace FDBSystemTester {
namespace {
class UpdateTxActor : public TransactionActorBase {
public:
ValueFuture fGet;
void start() override {
fGet = tx()->get(dbKey("foo"), false);
ctx()->continueAfter(fGet, [this]() { this->step1(); });
}
void step1() {
std::optional<std::string_view> optStr = fGet.getValue();
tx()->set(dbKey("foo"), optStr.value_or("bar"));
commit();
}
void reset() override { fGet.reset(); }
};
} // namespace
class ApiCorrectnessWorkload : public WorkloadBase {
public:
ApiCorrectnessWorkload() : numTxLeft(10) {}
@ -56,19 +34,21 @@ public:
private:
void nextTransaction() {
if (numTxLeft > 0) {
numTxLeft--;
UpdateTxActor* tx = new UpdateTxActor();
execTransaction(tx, [this, tx]() { transactionDone(tx); });
std::cout << numTxLeft << " transactions left" << std::endl;
} else {
std::cout << "Last transaction completed" << std::endl;
}
}
std::cout << numTxLeft << " transactions left" << std::endl;
if (numTxLeft == 0)
return;
void transactionDone(UpdateTxActor* tx) {
delete tx;
nextTransaction();
numTxLeft--;
execTransaction(
[](auto ctx) {
ValueFuture fGet = ctx->tx()->get(ctx->dbKey("foo"), false);
ctx->continueAfter(fGet, [fGet, ctx]() {
std::optional<std::string_view> optStr = fGet.getValue();
ctx->tx()->set(ctx->dbKey("foo"), optStr.value_or("bar"));
ctx->commit();
});
},
[this]() { nextTransaction(); });
}
int numTxLeft;

View File

@ -40,14 +40,14 @@ void fdb_check(fdb_error_t e) {
class TransactionContext : public ITransactionContext {
public:
TransactionContext(FDBTransaction* tx,
ITransactionActor* txActor,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
const TransactionExecutorOptions& options,
IScheduler* scheduler)
: options(options), fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), finalError(0) {}
Transaction* tx() override { return &fdbTx; }
void continueAfter(Future& f, TTaskFct cont) override { doContinueAfter(f, cont); }
void continueAfter(Future f, TTaskFct cont) override { doContinueAfter(f, cont); }
void commit() override {
currFuture = fdbTx.commit();
doContinueAfter(currFuture, [this]() { done(); });
@ -64,7 +64,7 @@ public:
}
private:
void doContinueAfter(Future& f, TTaskFct cont) {
void doContinueAfter(Future f, TTaskFct cont) {
if (options.blockOnFutures) {
blockingContinueAfter(f, cont);
} else {
@ -72,11 +72,10 @@ private:
}
}
void blockingContinueAfter(Future& f, TTaskFct cont) {
Future* fptr = &f;
scheduler->schedule([this, fptr, cont]() {
fdb_check(fdb_future_block_until_ready(fptr->fdbFuture()));
fdb_error_t err = fptr->getError();
void blockingContinueAfter(Future f, TTaskFct cont) {
scheduler->schedule([this, f, cont]() mutable {
fdb_check(fdb_future_block_until_ready(f.fdbFuture()));
fdb_error_t err = f.getError();
if (err) {
currFuture = fdbTx.onError(err);
fdb_check(fdb_future_block_until_ready(currFuture.fdbFuture()));
@ -87,8 +86,9 @@ private:
});
}
void asyncContinueAfter(Future& f, TTaskFct cont) {
void asyncContinueAfter(Future f, TTaskFct cont) {
currCont = cont;
currFuture = f;
fdb_check(fdb_future_set_callback(f.fdbFuture(), futureReadyCallback, this));
}
@ -104,6 +104,8 @@ private:
fdb_check(fdb_future_set_callback(currFuture.fdbFuture(), onErrorReadyCallback, this));
} else {
scheduler->schedule(currCont);
currFuture.reset();
currCont = TTaskFct();
}
}
@ -118,6 +120,8 @@ private:
void handleOnErrorResult() {
fdb_error_t err = currFuture.getError();
currFuture.reset();
currCont = TTaskFct();
if (err) {
finalError = err;
done();
@ -129,12 +133,12 @@ private:
const TransactionExecutorOptions& options;
Transaction fdbTx;
ITransactionActor* txActor;
std::shared_ptr<ITransactionActor> txActor;
TTaskFct currCont;
TTaskFct contAfterDone;
IScheduler* scheduler;
fdb_error_t finalError;
EmptyFuture currFuture;
Future currFuture;
};
class TransactionExecutor : public ITransactionExecutor {
@ -155,7 +159,7 @@ public:
random.seed(dev());
}
void execute(ITransactionActor* txActor, TTaskFct cont) override {
void execute(std::shared_ptr<ITransactionActor> txActor, TTaskFct cont) override {
int idx = std::uniform_int_distribution<>(0, options.numDatabases - 1)(random);
FDBTransaction* tx;
fdb_check(fdb_database_create_transaction(databases[idx], &tx));

View File

@ -35,7 +35,7 @@ class ITransactionContext {
public:
virtual ~ITransactionContext() {}
virtual Transaction* tx() = 0;
virtual void continueAfter(Future& f, TTaskFct cont) = 0;
virtual void continueAfter(Future f, TTaskFct cont) = 0;
virtual void commit() = 0;
virtual void done() = 0;
virtual std::string_view dbKey(std::string_view key) = 0;
@ -58,11 +58,23 @@ protected:
Transaction* tx() { return ctx()->tx(); }
std::string_view dbKey(std::string_view key) { return ctx()->dbKey(key); }
void commit() { ctx()->commit(); }
void reset() override {}
private:
ITransactionContext* context = nullptr;
};
using TTxStartFct = std::function<void(ITransactionContext*)>;
class TransactionFct : public TransactionActorBase {
public:
TransactionFct(TTxStartFct startFct) : startFct(startFct) {}
void start() override { startFct(this->ctx()); }
private:
TTxStartFct startFct;
};
struct TransactionExecutorOptions {
std::string prefix = "";
bool blockOnFutures = false;
@ -73,7 +85,7 @@ class ITransactionExecutor {
public:
virtual ~ITransactionExecutor() {}
virtual void init(IScheduler* sched, const char* clusterFile, const TransactionExecutorOptions& options) = 0;
virtual void execute(ITransactionActor* tx, TTaskFct cont) = 0;
virtual void execute(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) = 0;
virtual void release() = 0;
};

View File

@ -1,4 +1,5 @@
#include "SysTestWorkload.h"
#include <memory>
namespace FDBSystemTester {
@ -17,7 +18,7 @@ void WorkloadBase::schedule(TTaskFct task) {
});
}
void WorkloadBase::execTransaction(ITransactionActor* tx, TTaskFct cont) {
void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskFct cont) {
txRunning++;
txExecutor->execute(tx, [this, cont]() {
txRunning--;

View File

@ -42,7 +42,10 @@ public:
protected:
void schedule(TTaskFct task);
void execTransaction(ITransactionActor* tx, TTaskFct cont);
void execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskFct cont);
void execTransaction(TTxStartFct start, TTaskFct cont) {
execTransaction(std::make_shared<TransactionFct>(start), cont);
}
void contIfDone();
private: