diff --git a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp index d65fd3d1d3..732f1778cb 100644 --- a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp +++ b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp @@ -102,12 +102,14 @@ private: ASSERT(results->size() == kvPairs->size()); for (int i = 0; i < kvPairs->size(); i++) { auto expected = store.get((*kvPairs)[i].key); - if ((*results)[i] != expected) { + auto actual = (*results)[i]; + if (actual != expected) { error( fmt::format("randomCommitReadOp mismatch. key: {} expected: {:.80} actual: {:.80}", (*kvPairs)[i].key, expected, - (*results)[i])); + actual)); + ASSERT(false); } } schedule(cont); diff --git a/bindings/c/test/apitester/TesterTransactionExecutor.cpp b/bindings/c/test/apitester/TesterTransactionExecutor.cpp index 563350acb9..505b55251e 100644 --- a/bindings/c/test/apitester/TesterTransactionExecutor.cpp +++ b/bindings/c/test/apitester/TesterTransactionExecutor.cpp @@ -38,19 +38,33 @@ void TransactionActorBase::complete(fdb_error_t err) { void ITransactionContext::continueAfterAll(std::vector futures, TTaskFct cont) { auto counter = std::make_shared>(futures.size()); + auto errorCode = std::make_shared>(error_code_success); + auto thisPtr = shared_from_this(); for (auto& f : futures) { - continueAfter(f, [counter, cont]() { - if (--(*counter) == 0) { - cont(); - } - }); + continueAfter( + f, + [thisPtr, f, counter, errorCode, cont]() { + if (f.getError() != error_code_success) { + (*errorCode) = f.getError(); + } + if (--(*counter) == 0) { + if (*errorCode == error_code_success) { + // all futures successful -> continue + cont(); + } else { + // at least one future failed -> retry the transaction + thisPtr->onError(*errorCode); + } + } + }, + false); } } /** * Transaction context base class, containing reusable functionality */ -class TransactionContextBase : public ITransactionContext, public std::enable_shared_from_this { +class TransactionContextBase : public ITransactionContext { public: TransactionContextBase(FDBTransaction* tx, std::shared_ptr txActor, @@ -65,7 +79,7 @@ public: Transaction* tx() override { return &fdbTx; } // Set a continuation to be executed when a future gets ready - void continueAfter(Future f, TTaskFct cont) override { doContinueAfter(f, cont); } + void continueAfter(Future f, TTaskFct cont, bool retryOnError) override { doContinueAfter(f, cont, retryOnError); } // Complete the transaction with a commit void commit() override { @@ -76,7 +90,8 @@ public: lock.unlock(); Future f = fdbTx.commit(); auto thisRef = shared_from_this(); - doContinueAfter(f, [thisRef]() { thisRef->done(); }); + doContinueAfter( + f, [thisRef]() { thisRef->done(); }, true); } // Complete the transaction without a commit (for read transactions) @@ -96,7 +111,7 @@ public: } protected: - virtual void doContinueAfter(Future f, TTaskFct cont) = 0; + virtual void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) = 0; // Clean up transaction state after completing the transaction // Note that the object may live longer, because it is referenced @@ -170,12 +185,13 @@ public: : TransactionContextBase(tx, txActor, cont, scheduler) {} protected: - void doContinueAfter(Future f, TTaskFct cont) override { + void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override { auto thisRef = std::static_pointer_cast(shared_from_this()); - scheduler->schedule([thisRef, f, cont]() mutable { thisRef->blockingContinueAfter(f, cont); }); + scheduler->schedule( + [thisRef, f, cont, retryOnError]() mutable { thisRef->blockingContinueAfter(f, cont, retryOnError); }); } - void blockingContinueAfter(Future f, TTaskFct cont) { + void blockingContinueAfter(Future f, TTaskFct cont, bool retryOnError) { std::unique_lock lock(mutex); if (txState != TxState::IN_PROGRESS) { return; @@ -190,12 +206,16 @@ protected: if (err == error_code_transaction_cancelled) { return; } - if (err == error_code_success) { + if (err == error_code_success || !retryOnError) { scheduler->schedule([cont]() { cont(); }); return; } - lock.lock(); + onError(err); + } + + virtual void onError(fdb_error_t err) override { + std::unique_lock lock(mutex); if (txState != TxState::IN_PROGRESS) { // Ignore further errors, if the transaction is in the error handing mode or completed return; @@ -227,12 +247,12 @@ public: : TransactionContextBase(tx, txActor, cont, scheduler) {} protected: - void doContinueAfter(Future f, TTaskFct cont) override { + void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override { std::unique_lock lock(mutex); if (txState != TxState::IN_PROGRESS) { return; } - callbackMap[f.fdbFuture()] = CallbackInfo{ f, cont, shared_from_this() }; + callbackMap[f.fdbFuture()] = CallbackInfo{ f, cont, shared_from_this(), retryOnError }; lock.unlock(); fdb_error_t err = fdb_future_set_callback(f.fdbFuture(), futureReadyCallback, this); if (err) { @@ -263,12 +283,16 @@ protected: if (err == error_code_transaction_cancelled) { return; } - if (err == error_code_success) { + if (err == error_code_success || !cbInfo.retryOnError) { scheduler->schedule(cbInfo.cont); return; } - lock.lock(); + onError(err); + } + + virtual void onError(fdb_error_t err) override { + std::unique_lock lock(mutex); if (txState != TxState::IN_PROGRESS) { // Ignore further errors, if the transaction is in the error handing mode or completed return; @@ -326,6 +350,7 @@ protected: Future future; TTaskFct cont; std::shared_ptr thisRef; + bool retryOnError; }; // Map for keeping track of future waits and holding necessary object references diff --git a/bindings/c/test/apitester/TesterTransactionExecutor.h b/bindings/c/test/apitester/TesterTransactionExecutor.h index e78223bf63..f8f9234e50 100644 --- a/bindings/c/test/apitester/TesterTransactionExecutor.h +++ b/bindings/c/test/apitester/TesterTransactionExecutor.h @@ -34,7 +34,7 @@ namespace FdbApiTester { /** * Interface to be used for implementation of a concrete transaction */ -class ITransactionContext { +class ITransactionContext : public std::enable_shared_from_this { public: virtual ~ITransactionContext() {} @@ -42,11 +42,16 @@ public: virtual Transaction* tx() = 0; // Schedule a continuation to be executed when the future gets ready - virtual void continueAfter(Future f, TTaskFct cont) = 0; + // retryOnError controls whether transaction is retried in case of an error instead + // of calling the continuation + virtual void continueAfter(Future f, TTaskFct cont, bool retryOnError = true) = 0; // Complete the transaction with a commit virtual void commit() = 0; + // retry transaction on error + virtual void onError(fdb_error_t err) = 0; + // Mark the transaction as completed without committing it (for read transactions) virtual void done() = 0;