diff --git a/bindings/c/CMakeLists.txt b/bindings/c/CMakeLists.txt index 57a17a766a..64ed877f30 100644 --- a/bindings/c/CMakeLists.txt +++ b/bindings/c/CMakeLists.txt @@ -252,6 +252,25 @@ endif() --test-dir ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests ) + + add_test(NAME fdb_c_upgrade_single_threaded + COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py + --build-dir ${CMAKE_BINARY_DIR} + --disable-log-dump + --test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml + --upgrade-path "6.3.23" "7.0.0" "6.3.23" + --process-number 1 + ) + + add_test(NAME fdb_c_upgrade_multi_threaded + COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py + --build-dir ${CMAKE_BINARY_DIR} + --disable-log-dump + --test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml + --upgrade-path "6.3.23" "7.0.0" "6.3.23" + --process-number 3 + ) + endif() set(c_workloads_srcs diff --git a/bindings/c/test/apitester/TesterApiWorkload.cpp b/bindings/c/test/apitester/TesterApiWorkload.cpp index 899329f2f6..be4c7059f0 100644 --- a/bindings/c/test/apitester/TesterApiWorkload.cpp +++ b/bindings/c/test/apitester/TesterApiWorkload.cpp @@ -32,7 +32,33 @@ ApiWorkload::ApiWorkload(const WorkloadConfig& config) : WorkloadBase(config) { maxKeysPerTransaction = config.getIntOption("maxKeysPerTransaction", 50); initialSize = config.getIntOption("initialSize", 1000); readExistingKeysRatio = config.getFloatOption("readExistingKeysRatio", 0.9); + runUntilStop = config.getBoolOption("runUntilStop", false); + numRandomOperations = config.getIntOption("numRandomOperations", 1000); + numOperationsForProgressCheck = config.getIntOption("numOperationsForProgressCheck", 10); keyPrefix = fmt::format("{}/", workloadId); + numRandomOpLeft = 0; + stopReceived = false; + checkingProgress = false; + apiVersion = config.apiVersion; +} + +IWorkloadControlIfc* ApiWorkload::getControlIfc() { + if (runUntilStop) { + return this; + } else { + return nullptr; + } +} + +void ApiWorkload::stop() { + ASSERT(runUntilStop); + stopReceived = true; +} + +void ApiWorkload::checkProgress() { + ASSERT(runUntilStop); + numRandomOpLeft = numOperationsForProgressCheck; + checkingProgress = true; } void ApiWorkload::start() { @@ -48,6 +74,37 @@ void ApiWorkload::start() { }); } +void ApiWorkload::runTests() { + if (!runUntilStop) { + numRandomOpLeft = numRandomOperations; + } + randomOperations(); +} + +void ApiWorkload::randomOperations() { + if (runUntilStop) { + if (stopReceived) + return; + if (checkingProgress) { + int numLeft = numRandomOpLeft--; + if (numLeft == 0) { + checkingProgress = false; + confirmProgress(); + } + } + } else { + int numLeft = numRandomOpLeft--; + if (numLeft == 0) + return; + } + randomOperation([this]() { randomOperations(); }); +} + +void ApiWorkload::randomOperation(TTaskFct cont) { + // Must be overridden if used + ASSERT(false); +} + std::string ApiWorkload::randomKeyName() { return keyPrefix + Random::get().randomStringLowerCase(minKeyLength, maxKeyLength); } diff --git a/bindings/c/test/apitester/TesterApiWorkload.h b/bindings/c/test/apitester/TesterApiWorkload.h index fb9df2dcda..3e150911fc 100644 --- a/bindings/c/test/apitester/TesterApiWorkload.h +++ b/bindings/c/test/apitester/TesterApiWorkload.h @@ -23,6 +23,7 @@ #include "TesterWorkload.h" #include "TesterKeyValueStore.h" +#include <atomic> namespace FdbApiTester { @@ -30,14 +31,28 @@ namespace FdbApiTester { * Base class for implementing API testing workloads. * Provides various helper methods and reusable configuration parameters */ -class ApiWorkload : public WorkloadBase { +class ApiWorkload : public WorkloadBase, IWorkloadControlIfc { public: void start() override; - // Method to be overridden to run specific tests - virtual void runTests() = 0; + IWorkloadControlIfc* getControlIfc() override; + + virtual void stop() override; + + virtual void checkProgress() override; + + // Running specific tests + // The default implementation generates a workload consisting of + // random operations generated by randomOperation + virtual void runTests(); + + // Generate a random operation and schedule the continuation when done + virtual void randomOperation(TTaskFct cont); protected: + // Selected FDB API version + int apiVersion; + // The minimum length of a key int minKeyLength; @@ -59,6 +74,25 @@ protected: // The ratio of reading existing keys double readExistingKeysRatio; + // Run the workload until explicit stop + bool runUntilStop; + + // The number of operations to be executed (for runUntilStop=false) + int numRandomOperations; + + // The number of transactions to be completed for + // a successful test progress check + int numOperationsForProgressCheck; + + // Stop command received (for runUntilStop=true) + std::atomic<bool> stopReceived; + + // Progress check is active (for runUntilStop=true) + std::atomic<bool> checkingProgress; + + // Number of random operations left (for runUntilStop=false) + std::atomic<int> numRandomOpLeft; + // Key prefix std::string keyPrefix; @@ -82,6 +116,8 @@ protected: private: void populateDataTx(TTaskFct cont); + + void randomOperations(); }; } // namespace FdbApiTester diff --git a/bindings/c/test/apitester/TesterApiWrapper.cpp b/bindings/c/test/apitester/TesterApiWrapper.cpp index 4fc3b79c9a..403c060f00 100644 --- a/bindings/c/test/apitester/TesterApiWrapper.cpp +++ b/bindings/c/test/apitester/TesterApiWrapper.cpp @@ -28,7 +28,7 @@ namespace { void fdb_check(fdb_error_t e) { if (e) { - fmt::print(stderr, "Unexpected error: %s\n", fdb_get_error(e)); + fmt::print(stderr, "Unexpected error: {}\n", fdb_get_error(e)); std::abort(); } } diff --git a/bindings/c/test/apitester/TesterCancelTransactionWorkload.cpp b/bindings/c/test/apitester/TesterCancelTransactionWorkload.cpp index 6812841f80..f77918cce3 100644 --- a/bindings/c/test/apitester/TesterCancelTransactionWorkload.cpp +++ b/bindings/c/test/apitester/TesterCancelTransactionWorkload.cpp @@ -24,22 +24,11 @@ namespace FdbApiTester { class CancelTransactionWorkload : public ApiWorkload { public: - CancelTransactionWorkload(const WorkloadConfig& config) : ApiWorkload(config) { - numRandomOperations = config.getIntOption("numRandomOperations", 1000); - numOpLeft = numRandomOperations; - } - - void runTests() override { randomOperations(); } + CancelTransactionWorkload(const WorkloadConfig& config) : ApiWorkload(config) {} private: enum OpType { OP_CANCEL_GET, OP_CANCEL_AFTER_FIRST_GET, OP_LAST = OP_CANCEL_AFTER_FIRST_GET }; - // The number of operations to be executed - int numRandomOperations; - - // Operations counter - int numOpLeft; - // Start multiple concurrent gets and cancel the transaction void randomCancelGetTx(TTaskFct cont) { int numKeys = Random::get().randomInt(1, maxKeysPerTransaction); @@ -87,7 +76,7 @@ private: [this, cont]() { schedule(cont); }); } - void randomOperation(TTaskFct cont) { + void randomOperation(TTaskFct cont) override { OpType txType = (OpType)Random::get().randomInt(0, OP_LAST); switch (txType) { case OP_CANCEL_GET: @@ -98,14 +87,6 @@ private: break; } } - - void randomOperations() { - if (numOpLeft == 0) - return; - - numOpLeft--; - randomOperation([this]() { randomOperations(); }); - } }; WorkloadFactory<CancelTransactionWorkload> MiscTestWorkloadFactory("CancelTransaction"); diff --git a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp index bcddcd9f86..29d42c6ade 100644 --- a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp +++ b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp @@ -26,22 +26,11 @@ namespace FdbApiTester { class ApiCorrectnessWorkload : public ApiWorkload { public: - ApiCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) { - numRandomOperations = config.getIntOption("numRandomOperations", 1000); - numOpLeft = numRandomOperations; - } - - void runTests() override { randomOperations(); } + ApiCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) {} private: enum OpType { OP_INSERT, OP_GET, OP_CLEAR, OP_CLEAR_RANGE, OP_COMMIT_READ, OP_LAST = OP_COMMIT_READ }; - // The number of operations to be executed - int numRandomOperations; - - // Operations counter - int numOpLeft; - void randomInsertOp(TTaskFct cont) { int numKeys = Random::get().randomInt(1, maxKeysPerTransaction); auto kvPairs = std::make_shared<std::vector<KeyValue>>(); @@ -82,8 +71,11 @@ private: } auto results = std::make_shared<std::vector<std::optional<std::string>>>(); execTransaction( - [kvPairs, results](auto ctx) { - ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE); + [kvPairs, results, this](auto ctx) { + if (apiVersion >= 710) { + // Test GRV caching in 7.1 and later + ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE); + } auto futures = std::make_shared<std::vector<Future>>(); for (const auto& kv : *kvPairs) { futures->push_back(ctx->tx()->get(kv.key, false)); @@ -211,14 +203,6 @@ private: break; } } - - void randomOperations() { - if (numOpLeft == 0) - return; - - numOpLeft--; - randomOperation([this]() { randomOperations(); }); - } }; WorkloadFactory<ApiCorrectnessWorkload> ApiCorrectnessWorkloadFactory("ApiCorrectness"); diff --git a/bindings/c/test/apitester/TesterOptions.h b/bindings/c/test/apitester/TesterOptions.h index 0f60ae436f..4e448f1222 100644 --- a/bindings/c/test/apitester/TesterOptions.h +++ b/bindings/c/test/apitester/TesterOptions.h @@ -29,13 +29,20 @@ namespace FdbApiTester { class TesterOptions { public: + // FDB API version, using the latest version by default + int apiVersion = FDB_API_VERSION; std::string clusterFile; bool trace = false; std::string traceDir; - std::string traceFormat; + std::string traceFormat = "xml"; std::string logGroup; std::string externalClientLibrary; + std::string externalClientDir; + bool disableLocalClient = false; std::string testFile; + std::string inputPipeName; + std::string outputPipeName; + int transactionRetryLimit = 0; int numFdbThreads; int numClientThreads; int numDatabases; diff --git a/bindings/c/test/apitester/TesterTestSpec.cpp b/bindings/c/test/apitester/TesterTestSpec.cpp index 6b1fb9f8c6..86a89c9116 100644 --- a/bindings/c/test/apitester/TesterTestSpec.cpp +++ b/bindings/c/test/apitester/TesterTestSpec.cpp @@ -45,10 +45,6 @@ std::unordered_map<std::string, std::function<void(const std::string& value, Tes [](const std::string& value, TestSpec* spec) { // spec->title = value; } }, - { "apiVersion", - [](const std::string& value, TestSpec* spec) { // - processIntOption(value, "apiVersion", spec->apiVersion, 700, 720); - } }, { "blockOnFutures", [](const std::string& value, TestSpec* spec) { // spec->blockOnFutures = (value == "true"); diff --git a/bindings/c/test/apitester/TesterTestSpec.h b/bindings/c/test/apitester/TesterTestSpec.h index edf413c0cf..be7a573033 100644 --- a/bindings/c/test/apitester/TesterTestSpec.h +++ b/bindings/c/test/apitester/TesterTestSpec.h @@ -42,9 +42,6 @@ struct TestSpec { // Title of the test std::string title; - // FDB API version, using the latest version by default - int apiVersion = FDB_API_VERSION; - // Use blocking waits on futures instead of scheduling callbacks bool blockOnFutures = false; diff --git a/bindings/c/test/apitester/TesterTransactionExecutor.cpp b/bindings/c/test/apitester/TesterTransactionExecutor.cpp index 36350eea02..095ba40afd 100644 --- a/bindings/c/test/apitester/TesterTransactionExecutor.cpp +++ b/bindings/c/test/apitester/TesterTransactionExecutor.cpp @@ -20,8 +20,10 @@ #include "TesterTransactionExecutor.h" #include "TesterUtil.h" +#include "foundationdb/fdb_c_types.h" #include "test/apitester/TesterScheduler.h" #include <memory> +#include <stdexcept> #include <unordered_map> #include <mutex> #include <atomic> @@ -31,6 +33,9 @@ namespace FdbApiTester { +constexpr int LONG_WAIT_TIME_US = 1000000; +constexpr int LARGE_NUMBER_OF_RETRIES = 5; + void TransactionActorBase::complete(fdb_error_t err) { error = err; context = {}; @@ -69,8 +74,10 @@ public: TransactionContextBase(FDBTransaction* tx, std::shared_ptr<ITransactionActor> txActor, TTaskFct cont, - IScheduler* scheduler) - : fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), txState(TxState::IN_PROGRESS) {} + IScheduler* scheduler, + int retryLimit) + : fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit), + txState(TxState::IN_PROGRESS), commitCalled(false) {} // A state machine: // IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE @@ -87,6 +94,7 @@ public: if (txState != TxState::IN_PROGRESS) { return; } + commitCalled = true; lock.unlock(); Future f = fdbTx.commit(); auto thisRef = shared_from_this(); @@ -102,6 +110,11 @@ public: } txState = TxState::DONE; lock.unlock(); + if (retriedErrors.size() >= LARGE_NUMBER_OF_RETRIES) { + fmt::print("Transaction succeeded after {} retries on errors: {}\n", + retriedErrors.size(), + fmt::join(retriedErrors, ", ")); + } // cancel transaction so that any pending operations on it // fail gracefully fdbTx.cancel(); @@ -146,11 +159,29 @@ protected: } else { std::unique_lock<std::mutex> lock(mutex); txState = TxState::IN_PROGRESS; + commitCalled = false; lock.unlock(); txActor->start(); } } + // Checks if a transaction can be retried. Fails the transaction if the check fails + bool canRetry(fdb_error_t lastErr) { + ASSERT(txState == TxState::ON_ERROR); + retriedErrors.push_back(lastErr); + if (retryLimit == 0 || retriedErrors.size() <= retryLimit) { + if (retriedErrors.size() == LARGE_NUMBER_OF_RETRIES) { + fmt::print("Transaction already retried {} times, on errors: {}\n", + retriedErrors.size(), + fmt::join(retriedErrors, ", ")); + } + return true; + } + fmt::print("Transaction retry limit reached. Retried on errors: {}\n", fmt::join(retriedErrors, ", ")); + transactionFailed(lastErr); + return false; + } + // FDB transaction Transaction fdbTx; @@ -166,11 +197,26 @@ protected: // Reference to the scheduler IScheduler* scheduler; + // Retry limit + int retryLimit; + // Transaction execution state TxState txState; // onError future used in ON_ERROR state Future onErrorFuture; + + // The error code on which onError was called + fdb_error_t onErrorArg; + + // The time point of calling onError + TimePoint onErrorCallTimePoint; + + // Transaction is committed or being committed + bool commitCalled; + + // A history of errors on which the transaction was retried + std::vector<fdb_error_t> retriedErrors; }; /** @@ -181,8 +227,9 @@ public: BlockingTransactionContext(FDBTransaction* tx, std::shared_ptr<ITransactionActor> txActor, TTaskFct cont, - IScheduler* scheduler) - : TransactionContextBase(tx, txActor, cont, scheduler) {} + IScheduler* scheduler, + int retryLimit) + : TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {} protected: void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override { @@ -197,12 +244,21 @@ protected: return; } lock.unlock(); + auto start = timeNow(); fdb_error_t err = fdb_future_block_until_ready(f.fdbFuture()); if (err) { transactionFailed(err); return; } err = f.getError(); + auto waitTimeUs = timeElapsedInUs(start); + if (waitTimeUs > LONG_WAIT_TIME_US) { + fmt::print("Long waiting time on a future: {:.3f}s, return code {} ({}), commit called: {}\n", + microsecToSec(waitTimeUs), + err, + fdb_get_error(err), + commitCalled); + } if (err == error_code_transaction_cancelled) { return; } @@ -223,13 +279,29 @@ protected: txState = TxState::ON_ERROR; lock.unlock(); + if (!canRetry(err)) { + return; + } + ASSERT(!onErrorFuture); onErrorFuture = fdbTx.onError(err); + onErrorArg = err; + + auto start = timeNow(); fdb_error_t err2 = fdb_future_block_until_ready(onErrorFuture.fdbFuture()); if (err2) { transactionFailed(err2); return; } + auto waitTimeUs = timeElapsedInUs(start); + if (waitTimeUs > LONG_WAIT_TIME_US) { + fdb_error_t err3 = onErrorFuture.getError(); + fmt::print("Long waiting time on onError({}) future: {:.3f}s, return code {} ({})\n", + onErrorArg, + microsecToSec(waitTimeUs), + err3, + fdb_get_error(err3)); + } auto thisRef = std::static_pointer_cast<BlockingTransactionContext>(shared_from_this()); scheduler->schedule([thisRef]() { thisRef->handleOnErrorResult(); }); } @@ -243,8 +315,9 @@ public: AsyncTransactionContext(FDBTransaction* tx, std::shared_ptr<ITransactionActor> txActor, TTaskFct cont, - IScheduler* scheduler) - : TransactionContextBase(tx, txActor, cont, scheduler) {} + IScheduler* scheduler, + int retryLimit) + : TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {} protected: void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override { @@ -252,7 +325,7 @@ protected: if (txState != TxState::IN_PROGRESS) { return; } - callbackMap[f.fdbFuture()] = CallbackInfo{ f, cont, shared_from_this(), retryOnError }; + callbackMap[f.fdbFuture()] = CallbackInfo{ f, cont, shared_from_this(), retryOnError, timeNow() }; lock.unlock(); fdb_error_t err = fdb_future_set_callback(f.fdbFuture(), futureReadyCallback, this); if (err) { @@ -264,11 +337,20 @@ protected: } static void futureReadyCallback(FDBFuture* f, void* param) { - AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param; - txCtx->onFutureReady(f); + try { + AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param; + txCtx->onFutureReady(f); + } catch (std::runtime_error& err) { + fmt::print("Unexpected exception in callback {}\n", err.what()); + abort(); + } catch (...) { + fmt::print("Unknown error in callback\n"); + abort(); + } } void onFutureReady(FDBFuture* f) { + auto endTime = timeNow(); injectRandomSleep(); // Hold a reference to this to avoid it to be // destroyed before releasing the mutex @@ -283,6 +365,13 @@ protected: } lock.unlock(); fdb_error_t err = fdb_future_get_error(f); + auto waitTimeUs = timeElapsedInUs(cbInfo.startTime, endTime); + if (waitTimeUs > LONG_WAIT_TIME_US) { + fmt::print("Long waiting time on a future: {:.3f}s, return code {} ({})\n", + microsecToSec(waitTimeUs), + err, + fdb_get_error(err)); + } if (err == error_code_transaction_cancelled) { return; } @@ -302,8 +391,14 @@ protected: txState = TxState::ON_ERROR; lock.unlock(); + if (!canRetry(err)) { + return; + } + ASSERT(!onErrorFuture); + onErrorArg = err; onErrorFuture = tx()->onError(err); + onErrorCallTimePoint = timeNow(); onErrorThisRef = std::static_pointer_cast<AsyncTransactionContext>(shared_from_this()); fdb_error_t err2 = fdb_future_set_callback(onErrorFuture.fdbFuture(), onErrorReadyCallback, this); if (err2) { @@ -313,11 +408,28 @@ protected: } static void onErrorReadyCallback(FDBFuture* f, void* param) { - AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param; - txCtx->onErrorReady(f); + try { + AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param; + txCtx->onErrorReady(f); + } catch (std::runtime_error& err) { + fmt::print("Unexpected exception in callback {}\n", err.what()); + abort(); + } catch (...) { + fmt::print("Unknown error in callback\n"); + abort(); + } } void onErrorReady(FDBFuture* f) { + auto waitTimeUs = timeElapsedInUs(onErrorCallTimePoint); + if (waitTimeUs > LONG_WAIT_TIME_US) { + fdb_error_t err = onErrorFuture.getError(); + fmt::print("Long waiting time on onError({}): {:.3f}s, return code {} ({})\n", + onErrorArg, + microsecToSec(waitTimeUs), + err, + fdb_get_error(err)); + } injectRandomSleep(); auto thisRef = onErrorThisRef; onErrorThisRef = {}; @@ -353,6 +465,7 @@ protected: TTaskFct cont; std::shared_ptr<ITransactionContext> thisRef; bool retryOnError; + TimePoint startTime; }; // Map for keeping track of future waits and holding necessary object references @@ -385,9 +498,11 @@ protected: } else { std::shared_ptr<ITransactionContext> ctx; if (options.blockOnFutures) { - ctx = std::make_shared<BlockingTransactionContext>(tx, txActor, cont, scheduler); + ctx = std::make_shared<BlockingTransactionContext>( + tx, txActor, cont, scheduler, options.transactionRetryLimit); } else { - ctx = std::make_shared<AsyncTransactionContext>(tx, txActor, cont, scheduler); + ctx = std::make_shared<AsyncTransactionContext>( + tx, txActor, cont, scheduler, options.transactionRetryLimit); } txActor->init(ctx); txActor->start(); diff --git a/bindings/c/test/apitester/TesterTransactionExecutor.h b/bindings/c/test/apitester/TesterTransactionExecutor.h index f8f9234e50..a8fb438d14 100644 --- a/bindings/c/test/apitester/TesterTransactionExecutor.h +++ b/bindings/c/test/apitester/TesterTransactionExecutor.h @@ -123,6 +123,9 @@ struct TransactionExecutorOptions { // The size of the database instance pool int numDatabases = 1; + + // Maximum number of retries per transaction (0 - unlimited) + int transactionRetryLimit = 0; }; /** diff --git a/bindings/c/test/apitester/TesterUtil.cpp b/bindings/c/test/apitester/TesterUtil.cpp index 42f9eb218f..a7e5414f85 100644 --- a/bindings/c/test/apitester/TesterUtil.cpp +++ b/bindings/c/test/apitester/TesterUtil.cpp @@ -20,9 +20,18 @@ #include "TesterUtil.h" #include <cstdio> +#include <algorithm> +#include <ctype.h> +#include <chrono> namespace FdbApiTester { +std::string lowerCase(const std::string& str) { + std::string res = str; + std::transform(res.begin(), res.end(), res.begin(), ::tolower); + return res; +} + Random::Random() { std::random_device dev; random.seed(dev()); @@ -53,6 +62,7 @@ bool Random::randomBool(double trueRatio) { void print_internal_error(const char* msg, const char* file, int line) { fprintf(stderr, "Assertion %s failed @ %s %d:\n", msg, file, line); + fflush(stderr); } } // namespace FdbApiTester \ No newline at end of file diff --git a/bindings/c/test/apitester/TesterUtil.h b/bindings/c/test/apitester/TesterUtil.h index c6d7de92bc..491f6ec565 100644 --- a/bindings/c/test/apitester/TesterUtil.h +++ b/bindings/c/test/apitester/TesterUtil.h @@ -27,9 +27,11 @@ #include <ostream> #include <optional> #include <fmt/format.h> +#include <chrono> namespace fmt { +// fmt::format formatting for std::optional<T> template <typename T> struct formatter<std::optional<T>> : fmt::formatter<T> { @@ -47,6 +49,8 @@ struct formatter<std::optional<T>> : fmt::formatter<T> { namespace FdbApiTester { +std::string lowerCase(const std::string& str); + class Random { public: Random(); @@ -82,6 +86,25 @@ void print_internal_error(const char* msg, const char* file, int line); } \ } while (false) // For use in destructors, where throwing exceptions is extremely dangerous +using TimePoint = std::chrono::steady_clock::time_point; +using TimeDuration = std::chrono::microseconds::rep; + +static inline TimePoint timeNow() { + return std::chrono::steady_clock::now(); +} + +static inline TimeDuration timeElapsedInUs(const TimePoint& start, const TimePoint& end) { + return std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(); +} + +static inline TimeDuration timeElapsedInUs(const TimePoint& start) { + return timeElapsedInUs(start, timeNow()); +} + +static inline double microsecToSec(TimeDuration timeUs) { + return timeUs / 1000000.0; +} + } // namespace FdbApiTester #endif \ No newline at end of file diff --git a/bindings/c/test/apitester/TesterWorkload.cpp b/bindings/c/test/apitester/TesterWorkload.cpp index 19b28731e9..6bf0c6fff9 100644 --- a/bindings/c/test/apitester/TesterWorkload.cpp +++ b/bindings/c/test/apitester/TesterWorkload.cpp @@ -20,11 +20,14 @@ #include "TesterWorkload.h" #include "TesterUtil.h" +#include "fmt/core.h" #include "test/apitester/TesterScheduler.h" #include <cstdlib> #include <memory> #include <fmt/format.h> #include <vector> +#include <iostream> +#include <cstdio> namespace FdbApiTester { @@ -58,6 +61,23 @@ double WorkloadConfig::getFloatOption(const std::string& name, double defaultVal } } +bool WorkloadConfig::getBoolOption(const std::string& name, bool defaultVal) const { + auto iter = options.find(name); + if (iter == options.end()) { + return defaultVal; + } else { + std::string val = lowerCase(iter->second); + if (val == "true") { + return true; + } else if (val == "false") { + return false; + } else { + throw TesterError( + fmt::format("Invalid workload configuration. Invalid value {} for {}", iter->second, name)); + } + } +} + WorkloadBase::WorkloadBase(const WorkloadConfig& config) : manager(nullptr), tasksScheduled(0), numErrors(0), clientId(config.clientId), numClients(config.numClients), failed(false) { @@ -90,7 +110,7 @@ void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskF if (tx->getErrorCode() == error_code_success) { cont(); } else { - std::string msg = fmt::format("Transaction failed with error: {} ({}})", err, fdb_get_error(err)); + std::string msg = fmt::format("Transaction failed with error: {} ({})", err, fdb_get_error(err)); if (failOnError) { error(msg); failed = true; @@ -127,9 +147,15 @@ void WorkloadBase::scheduledTaskDone() { } } +void WorkloadBase::confirmProgress() { + info("Progress confirmed"); + manager->confirmProgress(this); +} + void WorkloadManager::add(std::shared_ptr<IWorkload> workload, TTaskFct cont) { std::unique_lock<std::mutex> lock(mutex); - workloads[workload.get()] = WorkloadInfo{ workload, cont }; + workloads[workload.get()] = WorkloadInfo{ workload, cont, workload->getControlIfc(), false }; + fmt::print(stderr, "Workload {} added\n", workload->getWorkloadId()); } void WorkloadManager::run() { @@ -144,6 +170,13 @@ void WorkloadManager::run() { iter->start(); } scheduler->join(); + if (ctrlInputThread.joinable()) { + ctrlInputThread.join(); + } + if (outputPipe.is_open()) { + outputPipe << "DONE" << std::endl; + outputPipe.close(); + } if (failed()) { fmt::print(stderr, "{} workloads failed\n", numWorkloadsFailed); } else { @@ -169,6 +202,90 @@ void WorkloadManager::workloadDone(IWorkload* workload, bool failed) { } } +void WorkloadManager::openControlPipes(const std::string& inputPipeName, const std::string& outputPipeName) { + if (!inputPipeName.empty()) { + ctrlInputThread = std::thread(&WorkloadManager::readControlInput, this, inputPipeName); + } + if (!outputPipeName.empty()) { + fmt::print(stderr, "Opening pipe {} for writing\n", outputPipeName); + outputPipe.open(outputPipeName, std::ofstream::out); + } +} + +void WorkloadManager::readControlInput(std::string pipeName) { + fmt::print(stderr, "Opening pipe {} for reading\n", pipeName); + // Open in binary mode and read char-by-char to avoid + // any kind of buffering + FILE* f = fopen(pipeName.c_str(), "rb"); + setbuf(f, NULL); + std::string line; + while (true) { + int ch = fgetc(f); + if (ch == EOF) { + return; + } + if (ch != '\n') { + line += ch; + continue; + } + if (line.empty()) { + continue; + } + fmt::print(stderr, "Received {} command\n", line); + if (line == "STOP") { + handleStopCommand(); + } else if (line == "CHECK") { + handleCheckCommand(); + } + line.clear(); + } +} + +void WorkloadManager::handleStopCommand() { + std::unique_lock<std::mutex> lock(mutex); + for (auto& iter : workloads) { + IWorkloadControlIfc* controlIfc = iter.second.controlIfc; + if (controlIfc) { + controlIfc->stop(); + } + } +} + +void WorkloadManager::handleCheckCommand() { + std::unique_lock<std::mutex> lock(mutex); + // Request to confirm progress from all workloads + // providing the control interface + for (auto& iter : workloads) { + IWorkloadControlIfc* controlIfc = iter.second.controlIfc; + if (controlIfc) { + iter.second.progressConfirmed = false; + controlIfc->checkProgress(); + } + } +} + +void WorkloadManager::confirmProgress(IWorkload* workload) { + std::unique_lock<std::mutex> lock(mutex); + // Save the progress confirmation of the workload + auto iter = workloads.find(workload); + ASSERT(iter != workloads.end()); + iter->second.progressConfirmed = true; + // Check if all workloads have confirmed progress + bool allConfirmed = true; + for (auto& iter : workloads) { + if (iter.second.controlIfc && !iter.second.progressConfirmed) { + allConfirmed = false; + break; + } + } + lock.unlock(); + if (allConfirmed) { + // Notify the test controller about the successful progress check + ASSERT(outputPipe.is_open()); + outputPipe << "CHECK_OK" << std::endl; + } +} + std::shared_ptr<IWorkload> IWorkloadFactory::create(std::string const& name, const WorkloadConfig& config) { auto it = factories().find(name); if (it == factories().end()) diff --git a/bindings/c/test/apitester/TesterWorkload.h b/bindings/c/test/apitester/TesterWorkload.h index d1e9f94c3d..4c7946d973 100644 --- a/bindings/c/test/apitester/TesterWorkload.h +++ b/bindings/c/test/apitester/TesterWorkload.h @@ -29,11 +29,23 @@ #include <atomic> #include <unordered_map> #include <mutex> +#include <thread> +#include <fstream> namespace FdbApiTester { class WorkloadManager; +class IWorkloadControlIfc { +public: + // Stop the workload + virtual void stop() = 0; + + // Check if the test is progressing from the point of calling + // Progress must be confirmed by calling confirmProgress on the workload manager + virtual void checkProgress() = 0; +}; + // Workoad interface class IWorkload { public: @@ -44,6 +56,12 @@ public: // Start executing the workload virtual void start() = 0; + + // Get workload identifier + virtual std::string getWorkloadId() = 0; + + // Get workload control interface if supported, nullptr otherwise + virtual IWorkloadControlIfc* getControlIfc() = 0; }; // Workload configuration @@ -57,12 +75,16 @@ struct WorkloadConfig { // Total number of clients int numClients; + // Selected FDB API version + int apiVersion; + // Workload options: as key-value pairs std::unordered_map<std::string, std::string> options; // Get option of a certain type by name. Throws an exception if the values is of a wrong type int getIntOption(const std::string& name, int defaultVal) const; double getFloatOption(const std::string& name, double defaultVal) const; + bool getBoolOption(const std::string& name, bool defaultVal) const; }; // A base class for test workloads @@ -74,6 +96,10 @@ public: // Initialize the workload void init(WorkloadManager* manager) override; + IWorkloadControlIfc* getControlIfc() override { return nullptr; } + + std::string getWorkloadId() override { return workloadId; } + protected: // Schedule the a task as a part of the workload void schedule(TTaskFct task); @@ -92,6 +118,9 @@ protected: // Log an info message void info(const std::string& msg); + // Confirm a successfull progress check + void confirmProgress(); + private: WorkloadManager* manager; @@ -130,6 +159,9 @@ public: WorkloadManager(ITransactionExecutor* txExecutor, IScheduler* scheduler) : txExecutor(txExecutor), scheduler(scheduler), numWorkloadsFailed(0) {} + // Open named pipes for communication with the test controller + void openControlPipes(const std::string& inputPipeName, const std::string& outputPipeName); + // Add a workload // A continuation is to be specified for subworkloads void add(std::shared_ptr<IWorkload> workload, TTaskFct cont = NO_OP_TASK); @@ -152,11 +184,27 @@ private: std::shared_ptr<IWorkload> ref; // Continuation to be executed after completing the workload TTaskFct cont; + // Control interface of the workload (optional) + IWorkloadControlIfc* controlIfc; + // Progress check confirmation status + bool progressConfirmed; }; // To be called by a workload to notify that it is done void workloadDone(IWorkload* workload, bool failed); + // To be called by a workload to confirm a successful progress check + void confirmProgress(IWorkload* workload); + + // Receive and handle control commands from the input pipe + void readControlInput(std::string pipeName); + + // Handle STOP command received from the test controller + void handleStopCommand(); + + // Handle CHECK command received from the test controller + void handleCheckCommand(); + // Transaction executor to be used by the workloads ITransactionExecutor* txExecutor; @@ -171,6 +219,12 @@ private: // Number of workloads failed int numWorkloadsFailed; + + // Thread for receiving test control commands + std::thread ctrlInputThread; + + // Output pipe for emitting test control events + std::ofstream outputPipe; }; // A workload factory diff --git a/bindings/c/test/apitester/fdb_c_api_tester.cpp b/bindings/c/test/apitester/fdb_c_api_tester.cpp index 062ffb95f7..a3de942226 100644 --- a/bindings/c/test/apitester/fdb_c_api_tester.cpp +++ b/bindings/c/test/apitester/fdb_c_api_tester.cpp @@ -45,7 +45,13 @@ enum TesterOptionId { OPT_TRACE_FORMAT, OPT_KNOB, OPT_EXTERNAL_CLIENT_LIBRARY, - OPT_TEST_FILE + OPT_EXTERNAL_CLIENT_DIRECTORY, + OPT_DISABLE_LOCAL_CLIENT, + OPT_TEST_FILE, + OPT_INPUT_PIPE, + OPT_OUTPUT_PIPE, + OPT_FDB_API_VERSION, + OPT_TRANSACTION_RETRY_LIMIT }; CSimpleOpt::SOption TesterOptionDefs[] = // @@ -59,8 +65,14 @@ CSimpleOpt::SOption TesterOptionDefs[] = // { OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP }, { OPT_KNOB, "--knob-", SO_REQ_SEP }, { OPT_EXTERNAL_CLIENT_LIBRARY, "--external-client-library", SO_REQ_SEP }, + { OPT_EXTERNAL_CLIENT_DIRECTORY, "--external-client-dir", SO_REQ_SEP }, + { OPT_DISABLE_LOCAL_CLIENT, "--disable-local-client", SO_NONE }, { OPT_TEST_FILE, "-f", SO_REQ_SEP }, { OPT_TEST_FILE, "--test-file", SO_REQ_SEP }, + { OPT_INPUT_PIPE, "--input-pipe", SO_REQ_SEP }, + { OPT_OUTPUT_PIPE, "--output-pipe", SO_REQ_SEP }, + { OPT_FDB_API_VERSION, "--api-version", SO_REQ_SEP }, + { OPT_TRANSACTION_RETRY_LIMIT, "--transaction-retry-limit", SO_REQ_SEP }, SO_END_OF_OPTIONS }; void printProgramUsage(const char* execName) { @@ -84,9 +96,22 @@ void printProgramUsage(const char* execName) { " Changes a knob option. KNOBNAME should be lowercase.\n" " --external-client-library FILE\n" " Path to the external client library.\n" + " --external-client-dir DIR\n" + " Directory containing external client libraries.\n" + " --disable-local-client DIR\n" + " Disable the local client, i.e. use only external client libraries.\n" + " --input-pipe NAME\n" + " Name of the input pipe for communication with the test controller.\n" + " --output-pipe NAME\n" + " Name of the output pipe for communication with the test controller.\n" + " --api-version VERSION\n" + " Required FDB API version (default %d).\n" + " --transaction-retry-limit NUMBER\n" + " Maximum number of retries per tranaction (default: 0 - unlimited)\n" " -f, --test-file FILE\n" " Test file to run.\n" - " -h, --help Display this help and exit.\n"); + " -h, --help Display this help and exit.\n", + FDB_API_VERSION); } // Extracts the key for command line arguments that are specified with a prefix (e.g. --knob-). @@ -106,6 +131,19 @@ bool validateTraceFormat(std::string_view format) { return format == "xml" || format == "json"; } +const int MIN_TESTABLE_API_VERSION = 400; + +void processIntOption(const std::string& optionName, const std::string& value, int minValue, int maxValue, int& res) { + char* endptr; + res = strtol(value.c_str(), &endptr, 10); + if (*endptr != '\0') { + throw TesterError(fmt::format("Invalid value {} for {}", value, optionName)); + } + if (res < minValue || res > maxValue) { + throw TesterError(fmt::format("Value for {} must be between {} and {}", optionName, minValue, maxValue)); + } +} + bool processArg(TesterOptions& options, const CSimpleOpt& args) { switch (args.OptionId()) { case OPT_CONNFILE: @@ -139,11 +177,29 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) { case OPT_EXTERNAL_CLIENT_LIBRARY: options.externalClientLibrary = args.OptionArg(); break; - + case OPT_EXTERNAL_CLIENT_DIRECTORY: + options.externalClientDir = args.OptionArg(); + break; + case OPT_DISABLE_LOCAL_CLIENT: + options.disableLocalClient = true; + break; case OPT_TEST_FILE: options.testFile = args.OptionArg(); options.testSpec = readTomlTestSpec(options.testFile); break; + case OPT_INPUT_PIPE: + options.inputPipeName = args.OptionArg(); + break; + case OPT_OUTPUT_PIPE: + options.outputPipeName = args.OptionArg(); + break; + case OPT_FDB_API_VERSION: + processIntOption( + args.OptionText(), args.OptionArg(), MIN_TESTABLE_API_VERSION, FDB_API_VERSION, options.apiVersion); + break; + case OPT_TRANSACTION_RETRY_LIMIT: + processIntOption(args.OptionText(), args.OptionArg(), 0, 1000, options.transactionRetryLimit); + break; } return true; } @@ -184,6 +240,16 @@ void applyNetworkOptions(TesterOptions& options) { fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT)); fdb_check( FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_LIBRARY, options.externalClientLibrary)); + } else if (!options.externalClientDir.empty()) { + if (options.disableLocalClient) { + fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT)); + } + fdb_check( + FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_DIRECTORY, options.externalClientDir)); + } else { + if (options.disableLocalClient) { + throw TesterError("Invalid options: Cannot disable local client if no external library is provided"); + } } if (options.testSpec.multiThreaded) { @@ -220,34 +286,44 @@ void randomizeOptions(TesterOptions& options) { } bool runWorkloads(TesterOptions& options) { - TransactionExecutorOptions txExecOptions; - txExecOptions.blockOnFutures = options.testSpec.blockOnFutures; - txExecOptions.numDatabases = options.numDatabases; - txExecOptions.databasePerTransaction = options.testSpec.databasePerTransaction; + try { + TransactionExecutorOptions txExecOptions; + txExecOptions.blockOnFutures = options.testSpec.blockOnFutures; + txExecOptions.numDatabases = options.numDatabases; + txExecOptions.databasePerTransaction = options.testSpec.databasePerTransaction; + txExecOptions.transactionRetryLimit = options.transactionRetryLimit; - std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads); - std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions); - scheduler->start(); - txExecutor->init(scheduler.get(), options.clusterFile.c_str()); + std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads); + std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions); + txExecutor->init(scheduler.get(), options.clusterFile.c_str()); - WorkloadManager workloadMgr(txExecutor.get(), scheduler.get()); - for (const auto& workloadSpec : options.testSpec.workloads) { - for (int i = 0; i < options.numClients; i++) { - WorkloadConfig config; - config.name = workloadSpec.name; - config.options = workloadSpec.options; - config.clientId = i; - config.numClients = options.numClients; - std::shared_ptr<IWorkload> workload = IWorkloadFactory::create(workloadSpec.name, config); - if (!workload) { - throw TesterError(fmt::format("Unknown workload '{}'", workloadSpec.name)); + WorkloadManager workloadMgr(txExecutor.get(), scheduler.get()); + for (const auto& workloadSpec : options.testSpec.workloads) { + for (int i = 0; i < options.numClients; i++) { + WorkloadConfig config; + config.name = workloadSpec.name; + config.options = workloadSpec.options; + config.clientId = i; + config.numClients = options.numClients; + config.apiVersion = options.apiVersion; + std::shared_ptr<IWorkload> workload = IWorkloadFactory::create(workloadSpec.name, config); + if (!workload) { + throw TesterError(fmt::format("Unknown workload '{}'", workloadSpec.name)); + } + workloadMgr.add(workload); } - workloadMgr.add(workload); } - } + if (!options.inputPipeName.empty() || !options.outputPipeName.empty()) { + workloadMgr.openControlPipes(options.inputPipeName, options.outputPipeName); + } - workloadMgr.run(); - return !workloadMgr.failed(); + scheduler->start(); + workloadMgr.run(); + return !workloadMgr.failed(); + } catch (const std::runtime_error& err) { + fmt::print(stderr, "ERROR: {}\n", err.what()); + return false; + } } } // namespace @@ -264,7 +340,7 @@ int main(int argc, char** argv) { } randomizeOptions(options); - fdb_check(fdb_select_api_version(options.testSpec.apiVersion)); + fdb_check(fdb_select_api_version(options.apiVersion)); applyNetworkOptions(options); fdb_check(fdb_setup_network()); diff --git a/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml b/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml new file mode 100644 index 0000000000..86e65c5918 --- /dev/null +++ b/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml @@ -0,0 +1,35 @@ +[[test]] +title = 'Mixed Workload for Upgrade Tests with a Multi-Threaded Client' +multiThreaded = true +buggify = true +databasePerTransaction = false +minFdbThreads = 2 +maxFdbThreads = 8 +minDatabases = 2 +maxDatabases = 8 +minClientThreads = 2 +maxClientThreads = 8 +minClients = 2 +maxClients = 8 + + [[test.workload]] + name = 'ApiCorrectness' + minKeyLength = 1 + maxKeyLength = 64 + minValueLength = 1 + maxValueLength = 1000 + maxKeysPerTransaction = 50 + initialSize = 100 + runUntilStop = true + readExistingKeysRatio = 0.9 + + [[test.workload]] + name = 'CancelTransaction' + minKeyLength = 1 + maxKeyLength = 64 + minValueLength = 1 + maxValueLength = 1000 + maxKeysPerTransaction = 50 + initialSize = 100 + runUntilStop = true + readExistingKeysRatio = 0.9 \ No newline at end of file diff --git a/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml b/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml new file mode 100644 index 0000000000..42df76521b --- /dev/null +++ b/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml @@ -0,0 +1,33 @@ +[[test]] +title = 'Mixed Workload for Upgrade Tests with a Single FDB Thread' +multiThreaded = false +buggify = true +databasePerTransaction = false +minDatabases = 2 +maxDatabases = 8 +minClientThreads = 2 +maxClientThreads = 8 +minClients = 2 +maxClients = 8 + + [[test.workload]] + name = 'ApiCorrectness' + minKeyLength = 1 + maxKeyLength = 64 + minValueLength = 1 + maxValueLength = 1000 + maxKeysPerTransaction = 50 + initialSize = 100 + runUntilStop = true + readExistingKeysRatio = 0.9 + + [[test.workload]] + name = 'CancelTransaction' + minKeyLength = 1 + maxKeyLength = 64 + minValueLength = 1 + maxValueLength = 1000 + maxKeysPerTransaction = 50 + initialSize = 100 + runUntilStop = true + readExistingKeysRatio = 0.9 \ No newline at end of file diff --git a/tests/TestRunner/fake_cluster.py b/tests/TestRunner/fake_cluster.py index eed4653532..bbd4b41ee9 100755 --- a/tests/TestRunner/fake_cluster.py +++ b/tests/TestRunner/fake_cluster.py @@ -4,10 +4,10 @@ import os import shutil import subprocess import sys -from local_cluster import LocalCluster +from local_cluster import random_secret_string from argparse import ArgumentParser, RawDescriptionHelpFormatter from pathlib import Path -from random import choice + class ClusterFileGenerator: def __init__(self, output_dir: str): @@ -15,8 +15,7 @@ class ClusterFileGenerator: assert self.output_dir.exists(), "{} does not exist".format(output_dir) assert self.output_dir.is_dir(), "{} is not a directory".format(output_dir) self.tmp_dir = self.output_dir.joinpath( - 'tmp', - ''.join(choice(LocalCluster.valid_letters_for_secret) for i in range(16))) + 'tmp', random_secret_string(16)) self.tmp_dir.mkdir(parents=True) self.cluster_file_path = self.tmp_dir.joinpath('fdb.cluster') @@ -43,11 +42,13 @@ if __name__ == '__main__': Before the command is executed, the following arguments will be preprocessed: - All occurrences of @CLUSTER_FILE@ will be replaced with the path to the generated cluster file. - The environment variable FDB_CLUSTER_FILE is set to the generated cluster file for the command if + The environment variable FDB_CLUSTER_FILE is set to the generated cluster file for the command if it is not set already. """) - parser.add_argument('--output-dir', '-o', metavar='OUTPUT_DIRECTORY', help='Directory where output files are written', required=True) - parser.add_argument('cmd', metavar="COMMAND", nargs="+", help="The command to run") + parser.add_argument('--output-dir', '-o', metavar='OUTPUT_DIRECTORY', + help='Directory where output files are written', required=True) + parser.add_argument('cmd', metavar="COMMAND", + nargs="+", help="The command to run") args = parser.parse_args() errcode = 1 @@ -61,7 +62,9 @@ if __name__ == '__main__': cmd_args.append(cmd) env = dict(**os.environ) - env['FDB_CLUSTER_FILE'] = env.get('FDB_CLUSTER_FILE', generator.cluster_file_path) - errcode = subprocess.run(cmd_args, stdout=sys.stdout, stderr=sys.stderr, env=env).returncode + env['FDB_CLUSTER_FILE'] = env.get( + 'FDB_CLUSTER_FILE', generator.cluster_file_path) + errcode = subprocess.run( + cmd_args, stdout=sys.stdout, stderr=sys.stderr, env=env).returncode sys.exit(errcode) diff --git a/tests/TestRunner/local_cluster.py b/tests/TestRunner/local_cluster.py index 30162147fe..2e56457770 100644 --- a/tests/TestRunner/local_cluster.py +++ b/tests/TestRunner/local_cluster.py @@ -1,10 +1,11 @@ +import json from pathlib import Path -from argparse import ArgumentParser import random import string import subprocess -import sys +import os import socket +import time def _get_free_port_internal(): @@ -25,6 +26,19 @@ def get_free_port(): return port +def is_port_in_use(port): + import socket + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex(('localhost', port)) == 0 + + +valid_letters_for_secret = string.ascii_letters + string.digits + + +def random_secret_string(len): + return ''.join(random.choice(valid_letters_for_secret) for i in range(len)) + + class LocalCluster: configuration_template = """ ## foundationdb.conf @@ -48,7 +62,7 @@ cluster-file = {etcdir}/fdb.cluster ## Default parameters for individual fdbserver processes [fdbserver] command = {fdbserver_bin} -public-address = auto:$ID +public-address = {ip_address}:$ID listen-address = public datadir = {datadir}/$ID logdir = {logdir} @@ -65,76 +79,148 @@ logdir = {logdir} ## An individual fdbserver process with id 4000 ## Parameters set here override defaults from the [fdbserver] section - """ - valid_letters_for_secret = string.ascii_letters + string.digits - - def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str, - fdbcli_binary: str, process_number: int, create_config=True, port=None, ip_address=None): + def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str, fdbcli_binary: str, + process_number: int, create_config=True, port=None, ip_address=None): self.basedir = Path(basedir) + self.etc = self.basedir.joinpath('etc') + self.log = self.basedir.joinpath('log') + self.data = self.basedir.joinpath('data') + self.conf_file = self.etc.joinpath('foundationdb.conf') + self.cluster_file = self.etc.joinpath('fdb.cluster') self.fdbserver_binary = Path(fdbserver_binary) self.fdbmonitor_binary = Path(fdbmonitor_binary) self.fdbcli_binary = Path(fdbcli_binary) for b in (self.fdbserver_binary, self.fdbmonitor_binary, self.fdbcli_binary): assert b.exists(), "{} does not exist".format(b) - if not self.basedir.exists(): - self.basedir.mkdir() - self.etc = self.basedir.joinpath('etc') - self.log = self.basedir.joinpath('log') - self.data = self.basedir.joinpath('data') self.etc.mkdir(exist_ok=True) self.log.mkdir(exist_ok=True) self.data.mkdir(exist_ok=True) - self.port = get_free_port() if port is None else port + self.process_number = process_number self.ip_address = '127.0.0.1' if ip_address is None else ip_address + self.first_port = port + if (self.first_port is not None): + self.last_used_port = int(self.first_port)-1 + self.server_ports = [self.__next_port() + for _ in range(self.process_number)] + self.cluster_desc = random_secret_string(8) + self.cluster_secret = random_secret_string(8) + self.env_vars = {} self.running = False self.process = None self.fdbmonitor_logfile = None + self.use_legacy_conf_syntax = False if create_config: - with open(self.etc.joinpath('fdb.cluster'), 'x') as f: - random_string = lambda len : ''.join(random.choice(LocalCluster.valid_letters_for_secret) for i in range(len)) - f.write('{desc}:{secret}@{ip_addr}:{server_port}'.format( - desc=random_string(8), - secret=random_string(8), - ip_addr=self.ip_address, - server_port=self.port - )) - with open(self.etc.joinpath('foundationdb.conf'), 'x') as f: - f.write(LocalCluster.configuration_template.format( - etcdir=self.etc, - fdbserver_bin=self.fdbserver_binary, - datadir=self.data, - logdir=self.log - )) - # By default, the cluster only has one process - # If a port number is given and process_number > 1, we will use subsequent numbers - # E.g., port = 4000, process_number = 5 - # Then 4000,4001,4002,4003,4004 will be used as ports - # If port number is not given, we will randomly pick free ports - for index, _ in enumerate(range(process_number)): - f.write('[fdbserver.{server_port}]\n'.format(server_port=self.port)) - self.port = get_free_port() if port is None else str(int(self.port) + 1) + self.create_cluster_file() + self.save_config() - def __enter__(self): + def __next_port(self): + if (self.first_port is None): + return get_free_port() + else: + self.last_used_port += 1 + return self.last_used_port + + def save_config(self): + new_conf_file = self.conf_file.parent / (self.conf_file.name + '.new') + with open(new_conf_file, 'x') as f: + conf_template = LocalCluster.configuration_template + if (self.use_legacy_conf_syntax): + conf_template = conf_template.replace("-", "_") + f.write(conf_template.format( + etcdir=self.etc, + fdbserver_bin=self.fdbserver_binary, + datadir=self.data, + logdir=self.log, + ip_address=self.ip_address + )) + # By default, the cluster only has one process + # If a port number is given and process_number > 1, we will use subsequent numbers + # E.g., port = 4000, process_number = 5 + # Then 4000,4001,4002,4003,4004 will be used as ports + # If port number is not given, we will randomly pick free ports + for port in self.server_ports: + f.write('[fdbserver.{server_port}]\n'.format( + server_port=port)) + f.flush() + os.fsync(f.fileno()) + + os.replace(new_conf_file, self.conf_file) + + def create_cluster_file(self): + with open(self.cluster_file, 'x') as f: + f.write('{desc}:{secret}@{ip_addr}:{server_port}'.format( + desc=self.cluster_desc, + secret=self.cluster_secret, + ip_addr=self.ip_address, + server_port=self.server_ports[0] + )) + + def start_cluster(self): assert not self.running, "Can't start a server that is already running" args = [str(self.fdbmonitor_binary), '--conffile', str(self.etc.joinpath('foundationdb.conf')), '--lockfile', str(self.etc.joinpath('fdbmonitor.lock'))] - self.fdbmonitor_logfile = open(self.log.joinpath('fdbmonitor.log'), 'w') - self.process = subprocess.Popen(args, stdout=self.fdbmonitor_logfile, stderr=self.fdbmonitor_logfile) + self.fdbmonitor_logfile = open( + self.log.joinpath('fdbmonitor.log'), 'w') + self.process = subprocess.Popen( + args, stdout=self.fdbmonitor_logfile, stderr=self.fdbmonitor_logfile, env=self.process_env()) self.running = True - return self - def __exit__(self, xc_type, exc_value, traceback): + def stop_cluster(self): assert self.running, "Server is not running" if self.process.poll() is None: self.process.terminate() self.running = False - def create_database(self, storage='ssd'): - args = [self.fdbcli_binary, '-C', self.etc.joinpath('fdb.cluster'), '--exec', - 'configure new single {} tenant_mode=optional_experimental'.format(storage)] - subprocess.run(args) + def ensure_ports_released(self, timeout_sec=5): + sec = 0 + while (sec < timeout_sec): + in_use = False + for port in self.server_ports: + if is_port_in_use(port): + print("Port {} in use. Waiting for it to be released".format(port)) + in_use = True + break + if not in_use: + return + time.sleep(0.5) + sec += 0.5 + assert False, "Failed to release ports in {}s".format(timeout_sec) + + def __enter__(self): + self.start_cluster() + return self + + def __exit__(self, xc_type, exc_value, traceback): + self.stop_cluster() + + def create_database(self, storage='ssd', enable_tenants=True): + db_config = 'configure new single {}'.format(storage) + if (enable_tenants): + db_config += " tenant_mode=optional_experimental" + args = [self.fdbcli_binary, '-C', + self.cluster_file, '--exec', db_config] + res = subprocess.run(args, env=self.process_env()) + assert res.returncode == 0, "Create database failed with {}".format( + res.returncode) + + def get_status(self): + args = [self.fdbcli_binary, '-C', self.cluster_file, '--exec', + 'status json'] + res = subprocess.run(args, env=self.process_env(), + stdout=subprocess.PIPE) + assert res.returncode == 0, "Get status failed with {}".format( + res.returncode) + return json.loads(res.stdout) + + def process_env(self): + env = dict(os.environ) + env.update(self.env_vars) + return env + + def set_env_var(self, var_name, var_val): + self.env_vars[var_name] = var_val diff --git a/tests/TestRunner/tmp_cluster.py b/tests/TestRunner/tmp_cluster.py index 21ec4547ad..faa221f6da 100755 --- a/tests/TestRunner/tmp_cluster.py +++ b/tests/TestRunner/tmp_cluster.py @@ -5,9 +5,8 @@ import os import shutil import subprocess import sys -from local_cluster import LocalCluster +from local_cluster import LocalCluster, random_secret_string from argparse import ArgumentParser, RawDescriptionHelpFormatter -from random import choice from pathlib import Path @@ -18,8 +17,7 @@ class TempCluster: assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir) tmp_dir = self.build_dir.joinpath( "tmp", - "".join(choice(LocalCluster.valid_letters_for_secret) - for i in range(16)), + random_secret_string(16) ) tmp_dir.mkdir(parents=True) self.cluster = LocalCluster( diff --git a/tests/TestRunner/upgrade_test.py b/tests/TestRunner/upgrade_test.py new file mode 100755 index 0000000000..a4a201e3db --- /dev/null +++ b/tests/TestRunner/upgrade_test.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python3 + +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import glob +import os +from pathlib import Path +import platform +import random +import shutil +import stat +import subprocess +import sys +from threading import Thread, Event +import traceback +import time +from urllib import request + +from local_cluster import LocalCluster, random_secret_string + + +SUPPORTED_PLATFORMS = ["x86_64"] +SUPPORTED_VERSIONS = ["7.2.0", "7.1.0", "7.0.0", "6.3.24", "6.3.23", + "6.3.22", "6.3.18", "6.3.17", "6.3.16", "6.3.15", "6.3.13", "6.3.12", "6.3.9", "6.2.30", + "6.2.29", "6.2.28", "6.2.27", "6.2.26", "6.2.25", "6.2.24", "6.2.23", "6.2.22", "6.2.21", + "6.2.20", "6.2.19", "6.2.18", "6.2.17", "6.2.16", "6.2.15", "6.2.10", "6.1.13", "6.1.12", + "6.1.11", "6.1.10", "6.0.18", "6.0.17", "6.0.16", "6.0.15", "6.0.14", "5.2.8", "5.2.7", + "5.1.7", "5.1.6"] +FDB_DOWNLOAD_ROOT = "https://github.com/apple/foundationdb/releases/download/" +CURRENT_VERSION = "7.2.0" +HEALTH_CHECK_TIMEOUT_SEC = 5 +PROGRESS_CHECK_TIMEOUT_SEC = 30 +TRANSACTION_RETRY_LIMIT = 100 +RUN_WITH_GDB = False + + +def make_executable(path): + st = os.stat(path) + os.chmod(path, st.st_mode | stat.S_IEXEC) + + +def remove_file_no_fail(filename): + try: + os.remove(filename) + except OSError: + pass + + +def version_from_str(ver_str): + ver = [int(s) for s in ver_str.split(".")] + assert len(ver) == 3, "Invalid version string {}".format(ver_str) + return ver + + +def api_version_from_str(ver_str): + ver_tuple = version_from_str(ver_str) + return ver_tuple[0]*100+ver_tuple[1]*10 + + +def version_before(ver_str1, ver_str2): + return version_from_str(ver_str1) < version_from_str(ver_str2) + + +def random_sleep(minSec, maxSec): + timeSec = random.uniform(minSec, maxSec) + print("Sleeping for {0:.3f}s".format(timeSec)) + time.sleep(timeSec) + + +class UpgradeTest: + def __init__(self, build_dir: str, upgrade_path: list, process_number: int = 1, port: str = None): + self.build_dir = Path(build_dir).resolve() + assert self.build_dir.exists(), "{} does not exist".format(build_dir) + assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir) + self.upgrade_path = upgrade_path + for version in upgrade_path: + assert version in SUPPORTED_VERSIONS, "Unsupported version {}".format( + version) + self.platform = platform.machine() + assert self.platform in SUPPORTED_PLATFORMS, "Unsupported platform {}".format( + self.platform) + self.tmp_dir = self.build_dir.joinpath( + "tmp", + random_secret_string(16) + ) + self.tmp_dir.mkdir(parents=True) + self.download_dir = self.build_dir.joinpath( + "tmp", + "old_binaries" + ) + self.download_old_binaries() + self.create_external_lib_dir() + init_version = upgrade_path[0] + self.cluster = LocalCluster( + self.tmp_dir, + self.binary_path(init_version, "fdbserver"), + self.binary_path(init_version, "fdbmonitor"), + self.binary_path(init_version, "fdbcli"), + process_number, + port=port, + create_config=False + ) + self.cluster.create_cluster_file() + self.configure_version(init_version) + self.log = self.cluster.log + self.etc = self.cluster.etc + self.data = self.cluster.data + self.input_pipe_path = self.tmp_dir.joinpath( + "input.{}".format(random_secret_string(8))) + self.output_pipe_path = self.tmp_dir.joinpath( + "output.{}".format(random_secret_string(8))) + os.mkfifo(self.input_pipe_path) + os.mkfifo(self.output_pipe_path) + self.progress_event = Event() + + def binary_path(self, version, bin_name): + if version == CURRENT_VERSION: + return self.build_dir.joinpath("bin", bin_name) + else: + return self.download_dir.joinpath(version, bin_name) + + def lib_dir(self, version): + if version == CURRENT_VERSION: + return self.build_dir.joinpath("lib") + else: + return self.download_dir.joinpath(version) + + # Download an old binary of a given version from a remote repository + def download_old_binary(self, version, target_bin_name, remote_bin_name, makeExecutable): + local_file = self.binary_path(version, target_bin_name) + if (local_file.exists()): + return + self.download_dir.joinpath(version).mkdir( + parents=True, exist_ok=True) + remote_file = "{}{}/{}".format(FDB_DOWNLOAD_ROOT, + version, remote_bin_name) + print("Downloading '{}' to '{}'...".format(remote_file, local_file)) + request.urlretrieve(remote_file, local_file) + print("Download complete") + assert local_file.exists(), "{} does not exist".format(local_file) + if makeExecutable: + make_executable(local_file) + + # Download all old binaries required for testing the specified upgrade path + def download_old_binaries(self): + for version in self.upgrade_path: + if version == CURRENT_VERSION: + continue + self.download_old_binary(version, + "fdbserver", "fdbserver.{}".format(self.platform), True) + self.download_old_binary(version, + "fdbmonitor", "fdbmonitor.{}".format(self.platform), True) + self.download_old_binary(version, + "fdbcli", "fdbcli.{}".format(self.platform), True) + self.download_old_binary(version, + "libfdb_c.so", "libfdb_c.{}.so".format(self.platform), False) + + # Create a directory for external client libraries for MVC and fill it + # with the libraries necessary for the specified upgrade path + def create_external_lib_dir(self): + self.external_lib_dir = self.tmp_dir.joinpath("client_libs") + self.external_lib_dir.mkdir(parents=True) + for version in self.upgrade_path: + src_file_path = self.lib_dir(version).joinpath("libfdb_c.so") + assert src_file_path.exists(), "{} does not exist".format(src_file_path) + target_file_path = self.external_lib_dir.joinpath( + "libfdb_c.{}.so".format(version)) + shutil.copyfile(src_file_path, target_file_path) + + # Perform a health check of the cluster: Use fdbcli status command to check if the number of + # server processes and their versions are as expected + def health_check(self, timeout_sec=HEALTH_CHECK_TIMEOUT_SEC): + retries = 0 + while retries < timeout_sec: + retries += 1 + status = self.cluster.get_status() + if not "processes" in status["cluster"]: + print("Health check: no processes found. Retrying") + time.sleep(1) + continue + num_proc = len(status["cluster"]["processes"]) + if (num_proc < self.cluster.process_number): + print("Health check: {} of {} processes found. Retrying".format( + num_proc, self.cluster.process_number)) + time.sleep(1) + continue + assert num_proc == self.cluster.process_number, "Number of processes: expected: {}, actual: {}".format( + self.cluster.process_number, num_proc) + for (_, proc_stat) in status["cluster"]["processes"].items(): + proc_ver = proc_stat["version"] + assert proc_ver == self.cluster_version, "Process version: expected: {}, actual: {}".format( + self.cluster_version, proc_ver) + print("Health check: OK") + return + assert False, "Health check: Failed" + + # Create and save a cluster configuration for the given version + def configure_version(self, version): + self.cluster.fdbmonitor_binary = self.binary_path( + version, "fdbmonitor") + self.cluster.fdbserver_binary = self.binary_path(version, "fdbserver") + self.cluster.fdbcli_binary = self.binary_path(version, "fdbcli") + self.cluster.set_env_var = "LD_LIBRARY_PATH", self.lib_dir(version) + if (version_before(version, "7.1.0")): + self.cluster.use_legacy_conf_syntax = True + self.cluster.save_config() + self.cluster_version = version + + # Upgrade the cluster to the given version + def upgrade_to(self, version): + print("Upgrading to version {}".format(version)) + self.cluster.stop_cluster() + self.configure_version(version) + self.cluster.ensure_ports_released() + self.cluster.start_cluster() + print("Upgraded to {}".format(version)) + + def __enter__(self): + print("Starting cluster version {}".format(self.cluster_version)) + self.cluster.start_cluster() + self.cluster.create_database(enable_tenants=False) + return self + + def __exit__(self, xc_type, exc_value, traceback): + self.cluster.stop_cluster() + shutil.rmtree(self.tmp_dir) + + # Determine FDB API version matching the upgrade path + def determine_api_version(self): + self.api_version = api_version_from_str(CURRENT_VERSION) + for version in self.upgrade_path: + self.api_version = min( + api_version_from_str(version), self.api_version) + + # Start the tester to generate the workload specified by the test file + def exec_workload(self, test_file): + self.tester_retcode = 1 + try: + self.determine_api_version() + cmd_args = [self.tester_bin, + '--cluster-file', self.cluster.cluster_file, + '--test-file', test_file, + '--external-client-dir', self.external_lib_dir, + '--disable-local-client', + '--input-pipe', self.input_pipe_path, + '--output-pipe', self.output_pipe_path, + '--api-version', str(self.api_version), + '--log', + '--log-dir', self.log, + '--transaction-retry-limit', str(TRANSACTION_RETRY_LIMIT)] + if (RUN_WITH_GDB): + cmd_args = ['gdb', '-ex', 'run', '--args'] + cmd_args + print("Executing test command: {}".format( + " ".join([str(c) for c in cmd_args]))) + + self.tester_proc = subprocess.Popen( + cmd_args, stdout=sys.stdout, stderr=sys.stderr) + self.tester_retcode = self.tester_proc.wait() + self.tester_proc = None + + if (self.tester_retcode != 0): + print("Tester failed with return code {}".format( + self.tester_retcode)) + except Exception: + print("Execution of test workload failed") + print(traceback.format_exc()) + + # Perform a progress check: Trigger it and wait until it is completed + + def progress_check(self, ctrl_pipe): + self.progress_event.clear() + os.write(ctrl_pipe, b"CHECK\n") + self.progress_event.wait( + None if RUN_WITH_GDB else PROGRESS_CHECK_TIMEOUT_SEC) + if (self.progress_event.is_set()): + print("Progress check: OK") + else: + assert False, "Progress check failed after upgrade to version {}".format( + self.cluster_version) + + # The main function of a thread for reading and processing + # the notifications received from the tester + def output_pipe_reader(self): + try: + print("Opening pipe {} for reading".format(self.output_pipe_path)) + self.output_pipe = open(self.output_pipe_path, 'r') + for line in self.output_pipe: + msg = line.strip() + print("Received {}".format(msg)) + if (msg == "CHECK_OK"): + self.progress_event.set() + self.output_pipe.close() + except Exception as e: + print("Error while reading output pipe", e) + print(traceback.format_exc()) + + # Execute the upgrade test workflow according to the specified + # upgrade path: perform the upgrade steps and check success after each step + def exec_upgrade_test(self): + print("Opening pipe {} for writing".format(self.input_pipe_path)) + ctrl_pipe = os.open(self.input_pipe_path, os.O_WRONLY) + try: + self.health_check() + self.progress_check(ctrl_pipe) + for version in self.upgrade_path[1:]: + random_sleep(0.0, 2.0) + self.upgrade_to(version) + self.health_check() + self.progress_check(ctrl_pipe) + os.write(ctrl_pipe, b"STOP\n") + finally: + os.close(ctrl_pipe) + + # Kill the tester process if it is still alive + def kill_tester_if_alive(self, workload_thread): + if not workload_thread.is_alive(): + return + if self.tester_proc is not None: + try: + print("Killing the tester process") + self.tester_proc.kill() + workload_thread.join(5) + except: + print("Failed to kill the tester process") + + # The main method implementing the test: + # - Start a thread for generating the workload using a tester binary + # - Start a thread for reading notifications from the tester + # - Trigger the upgrade steps and checks in the main thread + def exec_test(self, args): + self.tester_bin = self.build_dir.joinpath("bin", "fdb_c_api_tester") + assert self.tester_bin.exists(), "{} does not exist".format(self.tester_bin) + self.tester_proc = None + test_retcode = 1 + try: + workload_thread = Thread( + target=self.exec_workload, args=(args.test_file,)) + workload_thread.start() + + reader_thread = Thread(target=self.output_pipe_reader) + reader_thread.start() + + self.exec_upgrade_test() + test_retcode = 0 + except Exception: + print("Upgrade test failed") + print(traceback.format_exc()) + self.kill_tester_if_alive(workload_thread) + finally: + workload_thread.join(5) + reader_thread.join(5) + self.kill_tester_if_alive(workload_thread) + if test_retcode == 0: + test_retcode = self.tester_retcode + return test_retcode + + # Check the cluster log for errors + def check_cluster_logs(self, error_limit=100): + sev40s = ( + subprocess.getoutput( + "grep -r 'Severity=\"40\"' {}".format( + self.cluster.log.as_posix()) + ) + .rstrip() + .splitlines() + ) + + err_cnt = 0 + for line in sev40s: + # When running ASAN we expect to see this message. Boost coroutine should be using the + # correct asan annotations so that it shouldn't produce any false positives. + if line.endswith( + "WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false positives in some cases!" + ): + continue + if (err_cnt < error_limit): + print(line) + err_cnt += 1 + + if err_cnt > 0: + print( + ">>>>>>>>>>>>>>>>>>>> Found {} severity 40 events - the test fails", err_cnt) + else: + print("No error found in logs") + return err_cnt == 0 + + # Dump the last cluster configuration and cluster logs + def dump_cluster_logs(self): + for etc_file in glob.glob(os.path.join(self.cluster.etc, "*")): + print(">>>>>>>>>>>>>>>>>>>> Contents of {}:".format(etc_file)) + with open(etc_file, "r") as f: + print(f.read()) + for log_file in glob.glob(os.path.join(self.cluster.log, "*")): + print(">>>>>>>>>>>>>>>>>>>> Contents of {}:".format(log_file)) + with open(log_file, "r") as f: + print(f.read()) + + +if __name__ == "__main__": + parser = ArgumentParser( + formatter_class=RawDescriptionHelpFormatter, + description=""" + A script for testing FDB multi-version client in upgrade scenarios. Creates a local cluster, + generates a workload using fdb_c_api_tester with a specified test file, and performs + cluster upgrade according to the specified upgrade path. Checks if the workload successfully + progresses after each upgrade step. + """, + ) + parser.add_argument( + "--build-dir", + "-b", + metavar="BUILD_DIRECTORY", + help="FDB build directory", + required=True, + ) + parser.add_argument( + '--upgrade-path', + nargs='+', + help='Cluster upgrade path: a space separated list of versions', + default=[CURRENT_VERSION] + ) + parser.add_argument( + '--test-file', + help='A .toml file describing a test workload to be generated with fdb_c_api_tester', + required=True, + ) + parser.add_argument( + "--process-number", + "-p", + help="Number of fdb processes running (default: 0 - random)", + type=int, + default=0, + ) + parser.add_argument( + '--disable-log-dump', + help='Do not dump cluster log on error', + action="store_true" + ) + parser.add_argument( + '--run-with-gdb', + help='Execute the tester binary from gdb', + action="store_true" + ) + args = parser.parse_args() + if (args.process_number == 0): + args.process_number = random.randint(1, 5) + print("Testing with {} processes".format(args.process_number)) + + if (args.run_with_gdb): + RUN_WITH_GDB = True + + errcode = 1 + with UpgradeTest(args.build_dir, args.upgrade_path, args.process_number) as test: + print("log-dir: {}".format(test.log)) + print("etc-dir: {}".format(test.etc)) + print("data-dir: {}".format(test.data)) + print("cluster-file: {}".format(test.etc.joinpath("fdb.cluster"))) + errcode = test.exec_test(args) + if not test.check_cluster_logs(): + errcode = 1 if errcode == 0 else errcode + if errcode != 0 and not args.disable_log_dump: + test.dump_cluster_logs() + + sys.exit(errcode)