ApiTester: retry a transaction after all futures are ready

This commit is contained in:
Vaidas Gasiunas 2022-03-13 21:37:46 +01:00
parent 8ba3c107ff
commit 09bb37ce3e
3 changed files with 54 additions and 22 deletions

View File

@ -102,12 +102,14 @@ private:
ASSERT(results->size() == kvPairs->size()); ASSERT(results->size() == kvPairs->size());
for (int i = 0; i < kvPairs->size(); i++) { for (int i = 0; i < kvPairs->size(); i++) {
auto expected = store.get((*kvPairs)[i].key); auto expected = store.get((*kvPairs)[i].key);
if ((*results)[i] != expected) { auto actual = (*results)[i];
if (actual != expected) {
error( error(
fmt::format("randomCommitReadOp mismatch. key: {} expected: {:.80} actual: {:.80}", fmt::format("randomCommitReadOp mismatch. key: {} expected: {:.80} actual: {:.80}",
(*kvPairs)[i].key, (*kvPairs)[i].key,
expected, expected,
(*results)[i])); actual));
ASSERT(false);
} }
} }
schedule(cont); schedule(cont);

View File

@ -38,19 +38,33 @@ void TransactionActorBase::complete(fdb_error_t err) {
void ITransactionContext::continueAfterAll(std::vector<Future> futures, TTaskFct cont) { void ITransactionContext::continueAfterAll(std::vector<Future> futures, TTaskFct cont) {
auto counter = std::make_shared<std::atomic<int>>(futures.size()); auto counter = std::make_shared<std::atomic<int>>(futures.size());
auto errorCode = std::make_shared<std::atomic<fdb_error_t>>(error_code_success);
auto thisPtr = shared_from_this();
for (auto& f : futures) { for (auto& f : futures) {
continueAfter(f, [counter, cont]() { continueAfter(
if (--(*counter) == 0) { f,
cont(); [thisPtr, f, counter, errorCode, cont]() {
} if (f.getError() != error_code_success) {
}); (*errorCode) = f.getError();
}
if (--(*counter) == 0) {
if (*errorCode == error_code_success) {
// all futures successful -> continue
cont();
} else {
// at least one future failed -> retry the transaction
thisPtr->onError(*errorCode);
}
}
},
false);
} }
} }
/** /**
* Transaction context base class, containing reusable functionality * Transaction context base class, containing reusable functionality
*/ */
class TransactionContextBase : public ITransactionContext, public std::enable_shared_from_this<TransactionContextBase> { class TransactionContextBase : public ITransactionContext {
public: public:
TransactionContextBase(FDBTransaction* tx, TransactionContextBase(FDBTransaction* tx,
std::shared_ptr<ITransactionActor> txActor, std::shared_ptr<ITransactionActor> txActor,
@ -65,7 +79,7 @@ public:
Transaction* tx() override { return &fdbTx; } Transaction* tx() override { return &fdbTx; }
// Set a continuation to be executed when a future gets ready // Set a continuation to be executed when a future gets ready
void continueAfter(Future f, TTaskFct cont) override { doContinueAfter(f, cont); } void continueAfter(Future f, TTaskFct cont, bool retryOnError) override { doContinueAfter(f, cont, retryOnError); }
// Complete the transaction with a commit // Complete the transaction with a commit
void commit() override { void commit() override {
@ -76,7 +90,8 @@ public:
lock.unlock(); lock.unlock();
Future f = fdbTx.commit(); Future f = fdbTx.commit();
auto thisRef = shared_from_this(); auto thisRef = shared_from_this();
doContinueAfter(f, [thisRef]() { thisRef->done(); }); doContinueAfter(
f, [thisRef]() { thisRef->done(); }, true);
} }
// Complete the transaction without a commit (for read transactions) // Complete the transaction without a commit (for read transactions)
@ -96,7 +111,7 @@ public:
} }
protected: protected:
virtual void doContinueAfter(Future f, TTaskFct cont) = 0; virtual void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) = 0;
// Clean up transaction state after completing the transaction // Clean up transaction state after completing the transaction
// Note that the object may live longer, because it is referenced // Note that the object may live longer, because it is referenced
@ -170,12 +185,13 @@ public:
: TransactionContextBase(tx, txActor, cont, scheduler) {} : TransactionContextBase(tx, txActor, cont, scheduler) {}
protected: protected:
void doContinueAfter(Future f, TTaskFct cont) override { void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
auto thisRef = std::static_pointer_cast<BlockingTransactionContext>(shared_from_this()); auto thisRef = std::static_pointer_cast<BlockingTransactionContext>(shared_from_this());
scheduler->schedule([thisRef, f, cont]() mutable { thisRef->blockingContinueAfter(f, cont); }); scheduler->schedule(
[thisRef, f, cont, retryOnError]() mutable { thisRef->blockingContinueAfter(f, cont, retryOnError); });
} }
void blockingContinueAfter(Future f, TTaskFct cont) { void blockingContinueAfter(Future f, TTaskFct cont, bool retryOnError) {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
if (txState != TxState::IN_PROGRESS) { if (txState != TxState::IN_PROGRESS) {
return; return;
@ -190,12 +206,16 @@ protected:
if (err == error_code_transaction_cancelled) { if (err == error_code_transaction_cancelled) {
return; return;
} }
if (err == error_code_success) { if (err == error_code_success || !retryOnError) {
scheduler->schedule([cont]() { cont(); }); scheduler->schedule([cont]() { cont(); });
return; return;
} }
lock.lock(); onError(err);
}
virtual void onError(fdb_error_t err) override {
std::unique_lock<std::mutex> lock(mutex);
if (txState != TxState::IN_PROGRESS) { if (txState != TxState::IN_PROGRESS) {
// Ignore further errors, if the transaction is in the error handing mode or completed // Ignore further errors, if the transaction is in the error handing mode or completed
return; return;
@ -227,12 +247,12 @@ public:
: TransactionContextBase(tx, txActor, cont, scheduler) {} : TransactionContextBase(tx, txActor, cont, scheduler) {}
protected: protected:
void doContinueAfter(Future f, TTaskFct cont) override { void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
if (txState != TxState::IN_PROGRESS) { if (txState != TxState::IN_PROGRESS) {
return; return;
} }
callbackMap[f.fdbFuture()] = CallbackInfo{ f, cont, shared_from_this() }; callbackMap[f.fdbFuture()] = CallbackInfo{ f, cont, shared_from_this(), retryOnError };
lock.unlock(); lock.unlock();
fdb_error_t err = fdb_future_set_callback(f.fdbFuture(), futureReadyCallback, this); fdb_error_t err = fdb_future_set_callback(f.fdbFuture(), futureReadyCallback, this);
if (err) { if (err) {
@ -263,12 +283,16 @@ protected:
if (err == error_code_transaction_cancelled) { if (err == error_code_transaction_cancelled) {
return; return;
} }
if (err == error_code_success) { if (err == error_code_success || !cbInfo.retryOnError) {
scheduler->schedule(cbInfo.cont); scheduler->schedule(cbInfo.cont);
return; return;
} }
lock.lock(); onError(err);
}
virtual void onError(fdb_error_t err) override {
std::unique_lock<std::mutex> lock(mutex);
if (txState != TxState::IN_PROGRESS) { if (txState != TxState::IN_PROGRESS) {
// Ignore further errors, if the transaction is in the error handing mode or completed // Ignore further errors, if the transaction is in the error handing mode or completed
return; return;
@ -326,6 +350,7 @@ protected:
Future future; Future future;
TTaskFct cont; TTaskFct cont;
std::shared_ptr<ITransactionContext> thisRef; std::shared_ptr<ITransactionContext> thisRef;
bool retryOnError;
}; };
// Map for keeping track of future waits and holding necessary object references // Map for keeping track of future waits and holding necessary object references

View File

@ -34,7 +34,7 @@ namespace FdbApiTester {
/** /**
* Interface to be used for implementation of a concrete transaction * Interface to be used for implementation of a concrete transaction
*/ */
class ITransactionContext { class ITransactionContext : public std::enable_shared_from_this<ITransactionContext> {
public: public:
virtual ~ITransactionContext() {} virtual ~ITransactionContext() {}
@ -42,11 +42,16 @@ public:
virtual Transaction* tx() = 0; virtual Transaction* tx() = 0;
// Schedule a continuation to be executed when the future gets ready // Schedule a continuation to be executed when the future gets ready
virtual void continueAfter(Future f, TTaskFct cont) = 0; // retryOnError controls whether transaction is retried in case of an error instead
// of calling the continuation
virtual void continueAfter(Future f, TTaskFct cont, bool retryOnError = true) = 0;
// Complete the transaction with a commit // Complete the transaction with a commit
virtual void commit() = 0; virtual void commit() = 0;
// retry transaction on error
virtual void onError(fdb_error_t err) = 0;
// Mark the transaction as completed without committing it (for read transactions) // Mark the transaction as completed without committing it (for read transactions)
virtual void done() = 0; virtual void done() = 0;