Merge pull request #6651 from sfc-gh-vgasiunas/vgasiunas-upgrade-test

Automated end-to-end upgrade tests
This commit is contained in:
A.J. Beamon 2022-04-14 09:36:52 -07:00 committed by GitHub
commit 218ab6377c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1249 additions and 156 deletions

View File

@ -252,6 +252,25 @@ endif()
--test-dir
${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests
)
add_test(NAME fdb_c_upgrade_single_threaded
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "6.3.23" "7.0.0" "6.3.23"
--process-number 1
)
add_test(NAME fdb_c_upgrade_multi_threaded
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "6.3.23" "7.0.0" "6.3.23"
--process-number 3
)
endif()
set(c_workloads_srcs

View File

@ -32,7 +32,33 @@ ApiWorkload::ApiWorkload(const WorkloadConfig& config) : WorkloadBase(config) {
maxKeysPerTransaction = config.getIntOption("maxKeysPerTransaction", 50);
initialSize = config.getIntOption("initialSize", 1000);
readExistingKeysRatio = config.getFloatOption("readExistingKeysRatio", 0.9);
runUntilStop = config.getBoolOption("runUntilStop", false);
numRandomOperations = config.getIntOption("numRandomOperations", 1000);
numOperationsForProgressCheck = config.getIntOption("numOperationsForProgressCheck", 10);
keyPrefix = fmt::format("{}/", workloadId);
numRandomOpLeft = 0;
stopReceived = false;
checkingProgress = false;
apiVersion = config.apiVersion;
}
IWorkloadControlIfc* ApiWorkload::getControlIfc() {
if (runUntilStop) {
return this;
} else {
return nullptr;
}
}
void ApiWorkload::stop() {
ASSERT(runUntilStop);
stopReceived = true;
}
void ApiWorkload::checkProgress() {
ASSERT(runUntilStop);
numRandomOpLeft = numOperationsForProgressCheck;
checkingProgress = true;
}
void ApiWorkload::start() {
@ -48,6 +74,37 @@ void ApiWorkload::start() {
});
}
void ApiWorkload::runTests() {
if (!runUntilStop) {
numRandomOpLeft = numRandomOperations;
}
randomOperations();
}
void ApiWorkload::randomOperations() {
if (runUntilStop) {
if (stopReceived)
return;
if (checkingProgress) {
int numLeft = numRandomOpLeft--;
if (numLeft == 0) {
checkingProgress = false;
confirmProgress();
}
}
} else {
int numLeft = numRandomOpLeft--;
if (numLeft == 0)
return;
}
randomOperation([this]() { randomOperations(); });
}
void ApiWorkload::randomOperation(TTaskFct cont) {
// Must be overridden if used
ASSERT(false);
}
std::string ApiWorkload::randomKeyName() {
return keyPrefix + Random::get().randomStringLowerCase(minKeyLength, maxKeyLength);
}

View File

@ -23,6 +23,7 @@
#include "TesterWorkload.h"
#include "TesterKeyValueStore.h"
#include <atomic>
namespace FdbApiTester {
@ -30,14 +31,28 @@ namespace FdbApiTester {
* Base class for implementing API testing workloads.
* Provides various helper methods and reusable configuration parameters
*/
class ApiWorkload : public WorkloadBase {
class ApiWorkload : public WorkloadBase, IWorkloadControlIfc {
public:
void start() override;
// Method to be overridden to run specific tests
virtual void runTests() = 0;
IWorkloadControlIfc* getControlIfc() override;
virtual void stop() override;
virtual void checkProgress() override;
// Running specific tests
// The default implementation generates a workload consisting of
// random operations generated by randomOperation
virtual void runTests();
// Generate a random operation and schedule the continuation when done
virtual void randomOperation(TTaskFct cont);
protected:
// Selected FDB API version
int apiVersion;
// The minimum length of a key
int minKeyLength;
@ -59,6 +74,25 @@ protected:
// The ratio of reading existing keys
double readExistingKeysRatio;
// Run the workload until explicit stop
bool runUntilStop;
// The number of operations to be executed (for runUntilStop=false)
int numRandomOperations;
// The number of transactions to be completed for
// a successful test progress check
int numOperationsForProgressCheck;
// Stop command received (for runUntilStop=true)
std::atomic<bool> stopReceived;
// Progress check is active (for runUntilStop=true)
std::atomic<bool> checkingProgress;
// Number of random operations left (for runUntilStop=false)
std::atomic<int> numRandomOpLeft;
// Key prefix
std::string keyPrefix;
@ -82,6 +116,8 @@ protected:
private:
void populateDataTx(TTaskFct cont);
void randomOperations();
};
} // namespace FdbApiTester

View File

@ -28,7 +28,7 @@ namespace {
void fdb_check(fdb_error_t e) {
if (e) {
fmt::print(stderr, "Unexpected error: %s\n", fdb_get_error(e));
fmt::print(stderr, "Unexpected error: {}\n", fdb_get_error(e));
std::abort();
}
}

View File

@ -24,22 +24,11 @@ namespace FdbApiTester {
class CancelTransactionWorkload : public ApiWorkload {
public:
CancelTransactionWorkload(const WorkloadConfig& config) : ApiWorkload(config) {
numRandomOperations = config.getIntOption("numRandomOperations", 1000);
numOpLeft = numRandomOperations;
}
void runTests() override { randomOperations(); }
CancelTransactionWorkload(const WorkloadConfig& config) : ApiWorkload(config) {}
private:
enum OpType { OP_CANCEL_GET, OP_CANCEL_AFTER_FIRST_GET, OP_LAST = OP_CANCEL_AFTER_FIRST_GET };
// The number of operations to be executed
int numRandomOperations;
// Operations counter
int numOpLeft;
// Start multiple concurrent gets and cancel the transaction
void randomCancelGetTx(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
@ -87,7 +76,7 @@ private:
[this, cont]() { schedule(cont); });
}
void randomOperation(TTaskFct cont) {
void randomOperation(TTaskFct cont) override {
OpType txType = (OpType)Random::get().randomInt(0, OP_LAST);
switch (txType) {
case OP_CANCEL_GET:
@ -98,14 +87,6 @@ private:
break;
}
}
void randomOperations() {
if (numOpLeft == 0)
return;
numOpLeft--;
randomOperation([this]() { randomOperations(); });
}
};
WorkloadFactory<CancelTransactionWorkload> MiscTestWorkloadFactory("CancelTransaction");

View File

@ -26,22 +26,11 @@ namespace FdbApiTester {
class ApiCorrectnessWorkload : public ApiWorkload {
public:
ApiCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) {
numRandomOperations = config.getIntOption("numRandomOperations", 1000);
numOpLeft = numRandomOperations;
}
void runTests() override { randomOperations(); }
ApiCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) {}
private:
enum OpType { OP_INSERT, OP_GET, OP_CLEAR, OP_CLEAR_RANGE, OP_COMMIT_READ, OP_LAST = OP_COMMIT_READ };
// The number of operations to be executed
int numRandomOperations;
// Operations counter
int numOpLeft;
void randomInsertOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
auto kvPairs = std::make_shared<std::vector<KeyValue>>();
@ -82,8 +71,11 @@ private:
}
auto results = std::make_shared<std::vector<std::optional<std::string>>>();
execTransaction(
[kvPairs, results](auto ctx) {
ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE);
[kvPairs, results, this](auto ctx) {
if (apiVersion >= 710) {
// Test GRV caching in 7.1 and later
ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE);
}
auto futures = std::make_shared<std::vector<Future>>();
for (const auto& kv : *kvPairs) {
futures->push_back(ctx->tx()->get(kv.key, false));
@ -211,14 +203,6 @@ private:
break;
}
}
void randomOperations() {
if (numOpLeft == 0)
return;
numOpLeft--;
randomOperation([this]() { randomOperations(); });
}
};
WorkloadFactory<ApiCorrectnessWorkload> ApiCorrectnessWorkloadFactory("ApiCorrectness");

View File

@ -29,13 +29,20 @@ namespace FdbApiTester {
class TesterOptions {
public:
// FDB API version, using the latest version by default
int apiVersion = FDB_API_VERSION;
std::string clusterFile;
bool trace = false;
std::string traceDir;
std::string traceFormat;
std::string traceFormat = "xml";
std::string logGroup;
std::string externalClientLibrary;
std::string externalClientDir;
bool disableLocalClient = false;
std::string testFile;
std::string inputPipeName;
std::string outputPipeName;
int transactionRetryLimit = 0;
int numFdbThreads;
int numClientThreads;
int numDatabases;

View File

@ -45,10 +45,6 @@ std::unordered_map<std::string, std::function<void(const std::string& value, Tes
[](const std::string& value, TestSpec* spec) { //
spec->title = value;
} },
{ "apiVersion",
[](const std::string& value, TestSpec* spec) { //
processIntOption(value, "apiVersion", spec->apiVersion, 700, 720);
} },
{ "blockOnFutures",
[](const std::string& value, TestSpec* spec) { //
spec->blockOnFutures = (value == "true");

View File

@ -42,9 +42,6 @@ struct TestSpec {
// Title of the test
std::string title;
// FDB API version, using the latest version by default
int apiVersion = FDB_API_VERSION;
// Use blocking waits on futures instead of scheduling callbacks
bool blockOnFutures = false;

View File

@ -20,8 +20,10 @@
#include "TesterTransactionExecutor.h"
#include "TesterUtil.h"
#include "foundationdb/fdb_c_types.h"
#include "test/apitester/TesterScheduler.h"
#include <memory>
#include <stdexcept>
#include <unordered_map>
#include <mutex>
#include <atomic>
@ -31,6 +33,9 @@
namespace FdbApiTester {
constexpr int LONG_WAIT_TIME_US = 1000000;
constexpr int LARGE_NUMBER_OF_RETRIES = 5;
void TransactionActorBase::complete(fdb_error_t err) {
error = err;
context = {};
@ -69,8 +74,10 @@ public:
TransactionContextBase(FDBTransaction* tx,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler)
: fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), txState(TxState::IN_PROGRESS) {}
IScheduler* scheduler,
int retryLimit)
: fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit),
txState(TxState::IN_PROGRESS), commitCalled(false) {}
// A state machine:
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
@ -87,6 +94,7 @@ public:
if (txState != TxState::IN_PROGRESS) {
return;
}
commitCalled = true;
lock.unlock();
Future f = fdbTx.commit();
auto thisRef = shared_from_this();
@ -102,6 +110,11 @@ public:
}
txState = TxState::DONE;
lock.unlock();
if (retriedErrors.size() >= LARGE_NUMBER_OF_RETRIES) {
fmt::print("Transaction succeeded after {} retries on errors: {}\n",
retriedErrors.size(),
fmt::join(retriedErrors, ", "));
}
// cancel transaction so that any pending operations on it
// fail gracefully
fdbTx.cancel();
@ -146,11 +159,29 @@ protected:
} else {
std::unique_lock<std::mutex> lock(mutex);
txState = TxState::IN_PROGRESS;
commitCalled = false;
lock.unlock();
txActor->start();
}
}
// Checks if a transaction can be retried. Fails the transaction if the check fails
bool canRetry(fdb_error_t lastErr) {
ASSERT(txState == TxState::ON_ERROR);
retriedErrors.push_back(lastErr);
if (retryLimit == 0 || retriedErrors.size() <= retryLimit) {
if (retriedErrors.size() == LARGE_NUMBER_OF_RETRIES) {
fmt::print("Transaction already retried {} times, on errors: {}\n",
retriedErrors.size(),
fmt::join(retriedErrors, ", "));
}
return true;
}
fmt::print("Transaction retry limit reached. Retried on errors: {}\n", fmt::join(retriedErrors, ", "));
transactionFailed(lastErr);
return false;
}
// FDB transaction
Transaction fdbTx;
@ -166,11 +197,26 @@ protected:
// Reference to the scheduler
IScheduler* scheduler;
// Retry limit
int retryLimit;
// Transaction execution state
TxState txState;
// onError future used in ON_ERROR state
Future onErrorFuture;
// The error code on which onError was called
fdb_error_t onErrorArg;
// The time point of calling onError
TimePoint onErrorCallTimePoint;
// Transaction is committed or being committed
bool commitCalled;
// A history of errors on which the transaction was retried
std::vector<fdb_error_t> retriedErrors;
};
/**
@ -181,8 +227,9 @@ public:
BlockingTransactionContext(FDBTransaction* tx,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler)
: TransactionContextBase(tx, txActor, cont, scheduler) {}
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -197,12 +244,21 @@ protected:
return;
}
lock.unlock();
auto start = timeNow();
fdb_error_t err = fdb_future_block_until_ready(f.fdbFuture());
if (err) {
transactionFailed(err);
return;
}
err = f.getError();
auto waitTimeUs = timeElapsedInUs(start);
if (waitTimeUs > LONG_WAIT_TIME_US) {
fmt::print("Long waiting time on a future: {:.3f}s, return code {} ({}), commit called: {}\n",
microsecToSec(waitTimeUs),
err,
fdb_get_error(err),
commitCalled);
}
if (err == error_code_transaction_cancelled) {
return;
}
@ -223,13 +279,29 @@ protected:
txState = TxState::ON_ERROR;
lock.unlock();
if (!canRetry(err)) {
return;
}
ASSERT(!onErrorFuture);
onErrorFuture = fdbTx.onError(err);
onErrorArg = err;
auto start = timeNow();
fdb_error_t err2 = fdb_future_block_until_ready(onErrorFuture.fdbFuture());
if (err2) {
transactionFailed(err2);
return;
}
auto waitTimeUs = timeElapsedInUs(start);
if (waitTimeUs > LONG_WAIT_TIME_US) {
fdb_error_t err3 = onErrorFuture.getError();
fmt::print("Long waiting time on onError({}) future: {:.3f}s, return code {} ({})\n",
onErrorArg,
microsecToSec(waitTimeUs),
err3,
fdb_get_error(err3));
}
auto thisRef = std::static_pointer_cast<BlockingTransactionContext>(shared_from_this());
scheduler->schedule([thisRef]() { thisRef->handleOnErrorResult(); });
}
@ -243,8 +315,9 @@ public:
AsyncTransactionContext(FDBTransaction* tx,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler)
: TransactionContextBase(tx, txActor, cont, scheduler) {}
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -252,7 +325,7 @@ protected:
if (txState != TxState::IN_PROGRESS) {
return;
}
callbackMap[f.fdbFuture()] = CallbackInfo{ f, cont, shared_from_this(), retryOnError };
callbackMap[f.fdbFuture()] = CallbackInfo{ f, cont, shared_from_this(), retryOnError, timeNow() };
lock.unlock();
fdb_error_t err = fdb_future_set_callback(f.fdbFuture(), futureReadyCallback, this);
if (err) {
@ -264,11 +337,20 @@ protected:
}
static void futureReadyCallback(FDBFuture* f, void* param) {
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
txCtx->onFutureReady(f);
try {
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
txCtx->onFutureReady(f);
} catch (std::runtime_error& err) {
fmt::print("Unexpected exception in callback {}\n", err.what());
abort();
} catch (...) {
fmt::print("Unknown error in callback\n");
abort();
}
}
void onFutureReady(FDBFuture* f) {
auto endTime = timeNow();
injectRandomSleep();
// Hold a reference to this to avoid it to be
// destroyed before releasing the mutex
@ -283,6 +365,13 @@ protected:
}
lock.unlock();
fdb_error_t err = fdb_future_get_error(f);
auto waitTimeUs = timeElapsedInUs(cbInfo.startTime, endTime);
if (waitTimeUs > LONG_WAIT_TIME_US) {
fmt::print("Long waiting time on a future: {:.3f}s, return code {} ({})\n",
microsecToSec(waitTimeUs),
err,
fdb_get_error(err));
}
if (err == error_code_transaction_cancelled) {
return;
}
@ -302,8 +391,14 @@ protected:
txState = TxState::ON_ERROR;
lock.unlock();
if (!canRetry(err)) {
return;
}
ASSERT(!onErrorFuture);
onErrorArg = err;
onErrorFuture = tx()->onError(err);
onErrorCallTimePoint = timeNow();
onErrorThisRef = std::static_pointer_cast<AsyncTransactionContext>(shared_from_this());
fdb_error_t err2 = fdb_future_set_callback(onErrorFuture.fdbFuture(), onErrorReadyCallback, this);
if (err2) {
@ -313,11 +408,28 @@ protected:
}
static void onErrorReadyCallback(FDBFuture* f, void* param) {
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
txCtx->onErrorReady(f);
try {
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
txCtx->onErrorReady(f);
} catch (std::runtime_error& err) {
fmt::print("Unexpected exception in callback {}\n", err.what());
abort();
} catch (...) {
fmt::print("Unknown error in callback\n");
abort();
}
}
void onErrorReady(FDBFuture* f) {
auto waitTimeUs = timeElapsedInUs(onErrorCallTimePoint);
if (waitTimeUs > LONG_WAIT_TIME_US) {
fdb_error_t err = onErrorFuture.getError();
fmt::print("Long waiting time on onError({}): {:.3f}s, return code {} ({})\n",
onErrorArg,
microsecToSec(waitTimeUs),
err,
fdb_get_error(err));
}
injectRandomSleep();
auto thisRef = onErrorThisRef;
onErrorThisRef = {};
@ -353,6 +465,7 @@ protected:
TTaskFct cont;
std::shared_ptr<ITransactionContext> thisRef;
bool retryOnError;
TimePoint startTime;
};
// Map for keeping track of future waits and holding necessary object references
@ -385,9 +498,11 @@ protected:
} else {
std::shared_ptr<ITransactionContext> ctx;
if (options.blockOnFutures) {
ctx = std::make_shared<BlockingTransactionContext>(tx, txActor, cont, scheduler);
ctx = std::make_shared<BlockingTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
} else {
ctx = std::make_shared<AsyncTransactionContext>(tx, txActor, cont, scheduler);
ctx = std::make_shared<AsyncTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
}
txActor->init(ctx);
txActor->start();

View File

@ -123,6 +123,9 @@ struct TransactionExecutorOptions {
// The size of the database instance pool
int numDatabases = 1;
// Maximum number of retries per transaction (0 - unlimited)
int transactionRetryLimit = 0;
};
/**

View File

@ -20,9 +20,18 @@
#include "TesterUtil.h"
#include <cstdio>
#include <algorithm>
#include <ctype.h>
#include <chrono>
namespace FdbApiTester {
std::string lowerCase(const std::string& str) {
std::string res = str;
std::transform(res.begin(), res.end(), res.begin(), ::tolower);
return res;
}
Random::Random() {
std::random_device dev;
random.seed(dev());
@ -53,6 +62,7 @@ bool Random::randomBool(double trueRatio) {
void print_internal_error(const char* msg, const char* file, int line) {
fprintf(stderr, "Assertion %s failed @ %s %d:\n", msg, file, line);
fflush(stderr);
}
} // namespace FdbApiTester

View File

@ -27,9 +27,11 @@
#include <ostream>
#include <optional>
#include <fmt/format.h>
#include <chrono>
namespace fmt {
// fmt::format formatting for std::optional<T>
template <typename T>
struct formatter<std::optional<T>> : fmt::formatter<T> {
@ -47,6 +49,8 @@ struct formatter<std::optional<T>> : fmt::formatter<T> {
namespace FdbApiTester {
std::string lowerCase(const std::string& str);
class Random {
public:
Random();
@ -82,6 +86,25 @@ void print_internal_error(const char* msg, const char* file, int line);
} \
} while (false) // For use in destructors, where throwing exceptions is extremely dangerous
using TimePoint = std::chrono::steady_clock::time_point;
using TimeDuration = std::chrono::microseconds::rep;
static inline TimePoint timeNow() {
return std::chrono::steady_clock::now();
}
static inline TimeDuration timeElapsedInUs(const TimePoint& start, const TimePoint& end) {
return std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
}
static inline TimeDuration timeElapsedInUs(const TimePoint& start) {
return timeElapsedInUs(start, timeNow());
}
static inline double microsecToSec(TimeDuration timeUs) {
return timeUs / 1000000.0;
}
} // namespace FdbApiTester
#endif

View File

@ -20,11 +20,14 @@
#include "TesterWorkload.h"
#include "TesterUtil.h"
#include "fmt/core.h"
#include "test/apitester/TesterScheduler.h"
#include <cstdlib>
#include <memory>
#include <fmt/format.h>
#include <vector>
#include <iostream>
#include <cstdio>
namespace FdbApiTester {
@ -58,6 +61,23 @@ double WorkloadConfig::getFloatOption(const std::string& name, double defaultVal
}
}
bool WorkloadConfig::getBoolOption(const std::string& name, bool defaultVal) const {
auto iter = options.find(name);
if (iter == options.end()) {
return defaultVal;
} else {
std::string val = lowerCase(iter->second);
if (val == "true") {
return true;
} else if (val == "false") {
return false;
} else {
throw TesterError(
fmt::format("Invalid workload configuration. Invalid value {} for {}", iter->second, name));
}
}
}
WorkloadBase::WorkloadBase(const WorkloadConfig& config)
: manager(nullptr), tasksScheduled(0), numErrors(0), clientId(config.clientId), numClients(config.numClients),
failed(false) {
@ -90,7 +110,7 @@ void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskF
if (tx->getErrorCode() == error_code_success) {
cont();
} else {
std::string msg = fmt::format("Transaction failed with error: {} ({}})", err, fdb_get_error(err));
std::string msg = fmt::format("Transaction failed with error: {} ({})", err, fdb_get_error(err));
if (failOnError) {
error(msg);
failed = true;
@ -127,9 +147,15 @@ void WorkloadBase::scheduledTaskDone() {
}
}
void WorkloadBase::confirmProgress() {
info("Progress confirmed");
manager->confirmProgress(this);
}
void WorkloadManager::add(std::shared_ptr<IWorkload> workload, TTaskFct cont) {
std::unique_lock<std::mutex> lock(mutex);
workloads[workload.get()] = WorkloadInfo{ workload, cont };
workloads[workload.get()] = WorkloadInfo{ workload, cont, workload->getControlIfc(), false };
fmt::print(stderr, "Workload {} added\n", workload->getWorkloadId());
}
void WorkloadManager::run() {
@ -144,6 +170,13 @@ void WorkloadManager::run() {
iter->start();
}
scheduler->join();
if (ctrlInputThread.joinable()) {
ctrlInputThread.join();
}
if (outputPipe.is_open()) {
outputPipe << "DONE" << std::endl;
outputPipe.close();
}
if (failed()) {
fmt::print(stderr, "{} workloads failed\n", numWorkloadsFailed);
} else {
@ -169,6 +202,90 @@ void WorkloadManager::workloadDone(IWorkload* workload, bool failed) {
}
}
void WorkloadManager::openControlPipes(const std::string& inputPipeName, const std::string& outputPipeName) {
if (!inputPipeName.empty()) {
ctrlInputThread = std::thread(&WorkloadManager::readControlInput, this, inputPipeName);
}
if (!outputPipeName.empty()) {
fmt::print(stderr, "Opening pipe {} for writing\n", outputPipeName);
outputPipe.open(outputPipeName, std::ofstream::out);
}
}
void WorkloadManager::readControlInput(std::string pipeName) {
fmt::print(stderr, "Opening pipe {} for reading\n", pipeName);
// Open in binary mode and read char-by-char to avoid
// any kind of buffering
FILE* f = fopen(pipeName.c_str(), "rb");
setbuf(f, NULL);
std::string line;
while (true) {
int ch = fgetc(f);
if (ch == EOF) {
return;
}
if (ch != '\n') {
line += ch;
continue;
}
if (line.empty()) {
continue;
}
fmt::print(stderr, "Received {} command\n", line);
if (line == "STOP") {
handleStopCommand();
} else if (line == "CHECK") {
handleCheckCommand();
}
line.clear();
}
}
void WorkloadManager::handleStopCommand() {
std::unique_lock<std::mutex> lock(mutex);
for (auto& iter : workloads) {
IWorkloadControlIfc* controlIfc = iter.second.controlIfc;
if (controlIfc) {
controlIfc->stop();
}
}
}
void WorkloadManager::handleCheckCommand() {
std::unique_lock<std::mutex> lock(mutex);
// Request to confirm progress from all workloads
// providing the control interface
for (auto& iter : workloads) {
IWorkloadControlIfc* controlIfc = iter.second.controlIfc;
if (controlIfc) {
iter.second.progressConfirmed = false;
controlIfc->checkProgress();
}
}
}
void WorkloadManager::confirmProgress(IWorkload* workload) {
std::unique_lock<std::mutex> lock(mutex);
// Save the progress confirmation of the workload
auto iter = workloads.find(workload);
ASSERT(iter != workloads.end());
iter->second.progressConfirmed = true;
// Check if all workloads have confirmed progress
bool allConfirmed = true;
for (auto& iter : workloads) {
if (iter.second.controlIfc && !iter.second.progressConfirmed) {
allConfirmed = false;
break;
}
}
lock.unlock();
if (allConfirmed) {
// Notify the test controller about the successful progress check
ASSERT(outputPipe.is_open());
outputPipe << "CHECK_OK" << std::endl;
}
}
std::shared_ptr<IWorkload> IWorkloadFactory::create(std::string const& name, const WorkloadConfig& config) {
auto it = factories().find(name);
if (it == factories().end())

View File

@ -29,11 +29,23 @@
#include <atomic>
#include <unordered_map>
#include <mutex>
#include <thread>
#include <fstream>
namespace FdbApiTester {
class WorkloadManager;
class IWorkloadControlIfc {
public:
// Stop the workload
virtual void stop() = 0;
// Check if the test is progressing from the point of calling
// Progress must be confirmed by calling confirmProgress on the workload manager
virtual void checkProgress() = 0;
};
// Workoad interface
class IWorkload {
public:
@ -44,6 +56,12 @@ public:
// Start executing the workload
virtual void start() = 0;
// Get workload identifier
virtual std::string getWorkloadId() = 0;
// Get workload control interface if supported, nullptr otherwise
virtual IWorkloadControlIfc* getControlIfc() = 0;
};
// Workload configuration
@ -57,12 +75,16 @@ struct WorkloadConfig {
// Total number of clients
int numClients;
// Selected FDB API version
int apiVersion;
// Workload options: as key-value pairs
std::unordered_map<std::string, std::string> options;
// Get option of a certain type by name. Throws an exception if the values is of a wrong type
int getIntOption(const std::string& name, int defaultVal) const;
double getFloatOption(const std::string& name, double defaultVal) const;
bool getBoolOption(const std::string& name, bool defaultVal) const;
};
// A base class for test workloads
@ -74,6 +96,10 @@ public:
// Initialize the workload
void init(WorkloadManager* manager) override;
IWorkloadControlIfc* getControlIfc() override { return nullptr; }
std::string getWorkloadId() override { return workloadId; }
protected:
// Schedule the a task as a part of the workload
void schedule(TTaskFct task);
@ -92,6 +118,9 @@ protected:
// Log an info message
void info(const std::string& msg);
// Confirm a successfull progress check
void confirmProgress();
private:
WorkloadManager* manager;
@ -130,6 +159,9 @@ public:
WorkloadManager(ITransactionExecutor* txExecutor, IScheduler* scheduler)
: txExecutor(txExecutor), scheduler(scheduler), numWorkloadsFailed(0) {}
// Open named pipes for communication with the test controller
void openControlPipes(const std::string& inputPipeName, const std::string& outputPipeName);
// Add a workload
// A continuation is to be specified for subworkloads
void add(std::shared_ptr<IWorkload> workload, TTaskFct cont = NO_OP_TASK);
@ -152,11 +184,27 @@ private:
std::shared_ptr<IWorkload> ref;
// Continuation to be executed after completing the workload
TTaskFct cont;
// Control interface of the workload (optional)
IWorkloadControlIfc* controlIfc;
// Progress check confirmation status
bool progressConfirmed;
};
// To be called by a workload to notify that it is done
void workloadDone(IWorkload* workload, bool failed);
// To be called by a workload to confirm a successful progress check
void confirmProgress(IWorkload* workload);
// Receive and handle control commands from the input pipe
void readControlInput(std::string pipeName);
// Handle STOP command received from the test controller
void handleStopCommand();
// Handle CHECK command received from the test controller
void handleCheckCommand();
// Transaction executor to be used by the workloads
ITransactionExecutor* txExecutor;
@ -171,6 +219,12 @@ private:
// Number of workloads failed
int numWorkloadsFailed;
// Thread for receiving test control commands
std::thread ctrlInputThread;
// Output pipe for emitting test control events
std::ofstream outputPipe;
};
// A workload factory

View File

@ -45,7 +45,13 @@ enum TesterOptionId {
OPT_TRACE_FORMAT,
OPT_KNOB,
OPT_EXTERNAL_CLIENT_LIBRARY,
OPT_TEST_FILE
OPT_EXTERNAL_CLIENT_DIRECTORY,
OPT_DISABLE_LOCAL_CLIENT,
OPT_TEST_FILE,
OPT_INPUT_PIPE,
OPT_OUTPUT_PIPE,
OPT_FDB_API_VERSION,
OPT_TRANSACTION_RETRY_LIMIT
};
CSimpleOpt::SOption TesterOptionDefs[] = //
@ -59,8 +65,14 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
{ OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP },
{ OPT_KNOB, "--knob-", SO_REQ_SEP },
{ OPT_EXTERNAL_CLIENT_LIBRARY, "--external-client-library", SO_REQ_SEP },
{ OPT_EXTERNAL_CLIENT_DIRECTORY, "--external-client-dir", SO_REQ_SEP },
{ OPT_DISABLE_LOCAL_CLIENT, "--disable-local-client", SO_NONE },
{ OPT_TEST_FILE, "-f", SO_REQ_SEP },
{ OPT_TEST_FILE, "--test-file", SO_REQ_SEP },
{ OPT_INPUT_PIPE, "--input-pipe", SO_REQ_SEP },
{ OPT_OUTPUT_PIPE, "--output-pipe", SO_REQ_SEP },
{ OPT_FDB_API_VERSION, "--api-version", SO_REQ_SEP },
{ OPT_TRANSACTION_RETRY_LIMIT, "--transaction-retry-limit", SO_REQ_SEP },
SO_END_OF_OPTIONS };
void printProgramUsage(const char* execName) {
@ -84,9 +96,22 @@ void printProgramUsage(const char* execName) {
" Changes a knob option. KNOBNAME should be lowercase.\n"
" --external-client-library FILE\n"
" Path to the external client library.\n"
" --external-client-dir DIR\n"
" Directory containing external client libraries.\n"
" --disable-local-client DIR\n"
" Disable the local client, i.e. use only external client libraries.\n"
" --input-pipe NAME\n"
" Name of the input pipe for communication with the test controller.\n"
" --output-pipe NAME\n"
" Name of the output pipe for communication with the test controller.\n"
" --api-version VERSION\n"
" Required FDB API version (default %d).\n"
" --transaction-retry-limit NUMBER\n"
" Maximum number of retries per tranaction (default: 0 - unlimited)\n"
" -f, --test-file FILE\n"
" Test file to run.\n"
" -h, --help Display this help and exit.\n");
" -h, --help Display this help and exit.\n",
FDB_API_VERSION);
}
// Extracts the key for command line arguments that are specified with a prefix (e.g. --knob-).
@ -106,6 +131,19 @@ bool validateTraceFormat(std::string_view format) {
return format == "xml" || format == "json";
}
const int MIN_TESTABLE_API_VERSION = 400;
void processIntOption(const std::string& optionName, const std::string& value, int minValue, int maxValue, int& res) {
char* endptr;
res = strtol(value.c_str(), &endptr, 10);
if (*endptr != '\0') {
throw TesterError(fmt::format("Invalid value {} for {}", value, optionName));
}
if (res < minValue || res > maxValue) {
throw TesterError(fmt::format("Value for {} must be between {} and {}", optionName, minValue, maxValue));
}
}
bool processArg(TesterOptions& options, const CSimpleOpt& args) {
switch (args.OptionId()) {
case OPT_CONNFILE:
@ -139,11 +177,29 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
case OPT_EXTERNAL_CLIENT_LIBRARY:
options.externalClientLibrary = args.OptionArg();
break;
case OPT_EXTERNAL_CLIENT_DIRECTORY:
options.externalClientDir = args.OptionArg();
break;
case OPT_DISABLE_LOCAL_CLIENT:
options.disableLocalClient = true;
break;
case OPT_TEST_FILE:
options.testFile = args.OptionArg();
options.testSpec = readTomlTestSpec(options.testFile);
break;
case OPT_INPUT_PIPE:
options.inputPipeName = args.OptionArg();
break;
case OPT_OUTPUT_PIPE:
options.outputPipeName = args.OptionArg();
break;
case OPT_FDB_API_VERSION:
processIntOption(
args.OptionText(), args.OptionArg(), MIN_TESTABLE_API_VERSION, FDB_API_VERSION, options.apiVersion);
break;
case OPT_TRANSACTION_RETRY_LIMIT:
processIntOption(args.OptionText(), args.OptionArg(), 0, 1000, options.transactionRetryLimit);
break;
}
return true;
}
@ -184,6 +240,16 @@ void applyNetworkOptions(TesterOptions& options) {
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT));
fdb_check(
FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_LIBRARY, options.externalClientLibrary));
} else if (!options.externalClientDir.empty()) {
if (options.disableLocalClient) {
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT));
}
fdb_check(
FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_DIRECTORY, options.externalClientDir));
} else {
if (options.disableLocalClient) {
throw TesterError("Invalid options: Cannot disable local client if no external library is provided");
}
}
if (options.testSpec.multiThreaded) {
@ -220,34 +286,44 @@ void randomizeOptions(TesterOptions& options) {
}
bool runWorkloads(TesterOptions& options) {
TransactionExecutorOptions txExecOptions;
txExecOptions.blockOnFutures = options.testSpec.blockOnFutures;
txExecOptions.numDatabases = options.numDatabases;
txExecOptions.databasePerTransaction = options.testSpec.databasePerTransaction;
try {
TransactionExecutorOptions txExecOptions;
txExecOptions.blockOnFutures = options.testSpec.blockOnFutures;
txExecOptions.numDatabases = options.numDatabases;
txExecOptions.databasePerTransaction = options.testSpec.databasePerTransaction;
txExecOptions.transactionRetryLimit = options.transactionRetryLimit;
std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads);
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions);
scheduler->start();
txExecutor->init(scheduler.get(), options.clusterFile.c_str());
std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads);
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions);
txExecutor->init(scheduler.get(), options.clusterFile.c_str());
WorkloadManager workloadMgr(txExecutor.get(), scheduler.get());
for (const auto& workloadSpec : options.testSpec.workloads) {
for (int i = 0; i < options.numClients; i++) {
WorkloadConfig config;
config.name = workloadSpec.name;
config.options = workloadSpec.options;
config.clientId = i;
config.numClients = options.numClients;
std::shared_ptr<IWorkload> workload = IWorkloadFactory::create(workloadSpec.name, config);
if (!workload) {
throw TesterError(fmt::format("Unknown workload '{}'", workloadSpec.name));
WorkloadManager workloadMgr(txExecutor.get(), scheduler.get());
for (const auto& workloadSpec : options.testSpec.workloads) {
for (int i = 0; i < options.numClients; i++) {
WorkloadConfig config;
config.name = workloadSpec.name;
config.options = workloadSpec.options;
config.clientId = i;
config.numClients = options.numClients;
config.apiVersion = options.apiVersion;
std::shared_ptr<IWorkload> workload = IWorkloadFactory::create(workloadSpec.name, config);
if (!workload) {
throw TesterError(fmt::format("Unknown workload '{}'", workloadSpec.name));
}
workloadMgr.add(workload);
}
workloadMgr.add(workload);
}
}
if (!options.inputPipeName.empty() || !options.outputPipeName.empty()) {
workloadMgr.openControlPipes(options.inputPipeName, options.outputPipeName);
}
workloadMgr.run();
return !workloadMgr.failed();
scheduler->start();
workloadMgr.run();
return !workloadMgr.failed();
} catch (const std::runtime_error& err) {
fmt::print(stderr, "ERROR: {}\n", err.what());
return false;
}
}
} // namespace
@ -264,7 +340,7 @@ int main(int argc, char** argv) {
}
randomizeOptions(options);
fdb_check(fdb_select_api_version(options.testSpec.apiVersion));
fdb_check(fdb_select_api_version(options.apiVersion));
applyNetworkOptions(options);
fdb_check(fdb_setup_network());

View File

@ -0,0 +1,35 @@
[[test]]
title = 'Mixed Workload for Upgrade Tests with a Multi-Threaded Client'
multiThreaded = true
buggify = true
databasePerTransaction = false
minFdbThreads = 2
maxFdbThreads = 8
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
runUntilStop = true
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
runUntilStop = true
readExistingKeysRatio = 0.9

View File

@ -0,0 +1,33 @@
[[test]]
title = 'Mixed Workload for Upgrade Tests with a Single FDB Thread'
multiThreaded = false
buggify = true
databasePerTransaction = false
minDatabases = 2
maxDatabases = 8
minClientThreads = 2
maxClientThreads = 8
minClients = 2
maxClients = 8
[[test.workload]]
name = 'ApiCorrectness'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
runUntilStop = true
readExistingKeysRatio = 0.9
[[test.workload]]
name = 'CancelTransaction'
minKeyLength = 1
maxKeyLength = 64
minValueLength = 1
maxValueLength = 1000
maxKeysPerTransaction = 50
initialSize = 100
runUntilStop = true
readExistingKeysRatio = 0.9

View File

@ -4,10 +4,10 @@ import os
import shutil
import subprocess
import sys
from local_cluster import LocalCluster
from local_cluster import random_secret_string
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from pathlib import Path
from random import choice
class ClusterFileGenerator:
def __init__(self, output_dir: str):
@ -15,8 +15,7 @@ class ClusterFileGenerator:
assert self.output_dir.exists(), "{} does not exist".format(output_dir)
assert self.output_dir.is_dir(), "{} is not a directory".format(output_dir)
self.tmp_dir = self.output_dir.joinpath(
'tmp',
''.join(choice(LocalCluster.valid_letters_for_secret) for i in range(16)))
'tmp', random_secret_string(16))
self.tmp_dir.mkdir(parents=True)
self.cluster_file_path = self.tmp_dir.joinpath('fdb.cluster')
@ -43,11 +42,13 @@ if __name__ == '__main__':
Before the command is executed, the following arguments will be preprocessed:
- All occurrences of @CLUSTER_FILE@ will be replaced with the path to the generated cluster file.
The environment variable FDB_CLUSTER_FILE is set to the generated cluster file for the command if
The environment variable FDB_CLUSTER_FILE is set to the generated cluster file for the command if
it is not set already.
""")
parser.add_argument('--output-dir', '-o', metavar='OUTPUT_DIRECTORY', help='Directory where output files are written', required=True)
parser.add_argument('cmd', metavar="COMMAND", nargs="+", help="The command to run")
parser.add_argument('--output-dir', '-o', metavar='OUTPUT_DIRECTORY',
help='Directory where output files are written', required=True)
parser.add_argument('cmd', metavar="COMMAND",
nargs="+", help="The command to run")
args = parser.parse_args()
errcode = 1
@ -61,7 +62,9 @@ if __name__ == '__main__':
cmd_args.append(cmd)
env = dict(**os.environ)
env['FDB_CLUSTER_FILE'] = env.get('FDB_CLUSTER_FILE', generator.cluster_file_path)
errcode = subprocess.run(cmd_args, stdout=sys.stdout, stderr=sys.stderr, env=env).returncode
env['FDB_CLUSTER_FILE'] = env.get(
'FDB_CLUSTER_FILE', generator.cluster_file_path)
errcode = subprocess.run(
cmd_args, stdout=sys.stdout, stderr=sys.stderr, env=env).returncode
sys.exit(errcode)

View File

@ -1,10 +1,11 @@
import json
from pathlib import Path
from argparse import ArgumentParser
import random
import string
import subprocess
import sys
import os
import socket
import time
def _get_free_port_internal():
@ -25,6 +26,19 @@ def get_free_port():
return port
def is_port_in_use(port):
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
valid_letters_for_secret = string.ascii_letters + string.digits
def random_secret_string(len):
return ''.join(random.choice(valid_letters_for_secret) for i in range(len))
class LocalCluster:
configuration_template = """
## foundationdb.conf
@ -48,7 +62,7 @@ cluster-file = {etcdir}/fdb.cluster
## Default parameters for individual fdbserver processes
[fdbserver]
command = {fdbserver_bin}
public-address = auto:$ID
public-address = {ip_address}:$ID
listen-address = public
datadir = {datadir}/$ID
logdir = {logdir}
@ -65,76 +79,148 @@ logdir = {logdir}
## An individual fdbserver process with id 4000
## Parameters set here override defaults from the [fdbserver] section
"""
valid_letters_for_secret = string.ascii_letters + string.digits
def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str,
fdbcli_binary: str, process_number: int, create_config=True, port=None, ip_address=None):
def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str, fdbcli_binary: str,
process_number: int, create_config=True, port=None, ip_address=None):
self.basedir = Path(basedir)
self.etc = self.basedir.joinpath('etc')
self.log = self.basedir.joinpath('log')
self.data = self.basedir.joinpath('data')
self.conf_file = self.etc.joinpath('foundationdb.conf')
self.cluster_file = self.etc.joinpath('fdb.cluster')
self.fdbserver_binary = Path(fdbserver_binary)
self.fdbmonitor_binary = Path(fdbmonitor_binary)
self.fdbcli_binary = Path(fdbcli_binary)
for b in (self.fdbserver_binary, self.fdbmonitor_binary, self.fdbcli_binary):
assert b.exists(), "{} does not exist".format(b)
if not self.basedir.exists():
self.basedir.mkdir()
self.etc = self.basedir.joinpath('etc')
self.log = self.basedir.joinpath('log')
self.data = self.basedir.joinpath('data')
self.etc.mkdir(exist_ok=True)
self.log.mkdir(exist_ok=True)
self.data.mkdir(exist_ok=True)
self.port = get_free_port() if port is None else port
self.process_number = process_number
self.ip_address = '127.0.0.1' if ip_address is None else ip_address
self.first_port = port
if (self.first_port is not None):
self.last_used_port = int(self.first_port)-1
self.server_ports = [self.__next_port()
for _ in range(self.process_number)]
self.cluster_desc = random_secret_string(8)
self.cluster_secret = random_secret_string(8)
self.env_vars = {}
self.running = False
self.process = None
self.fdbmonitor_logfile = None
self.use_legacy_conf_syntax = False
if create_config:
with open(self.etc.joinpath('fdb.cluster'), 'x') as f:
random_string = lambda len : ''.join(random.choice(LocalCluster.valid_letters_for_secret) for i in range(len))
f.write('{desc}:{secret}@{ip_addr}:{server_port}'.format(
desc=random_string(8),
secret=random_string(8),
ip_addr=self.ip_address,
server_port=self.port
))
with open(self.etc.joinpath('foundationdb.conf'), 'x') as f:
f.write(LocalCluster.configuration_template.format(
etcdir=self.etc,
fdbserver_bin=self.fdbserver_binary,
datadir=self.data,
logdir=self.log
))
# By default, the cluster only has one process
# If a port number is given and process_number > 1, we will use subsequent numbers
# E.g., port = 4000, process_number = 5
# Then 4000,4001,4002,4003,4004 will be used as ports
# If port number is not given, we will randomly pick free ports
for index, _ in enumerate(range(process_number)):
f.write('[fdbserver.{server_port}]\n'.format(server_port=self.port))
self.port = get_free_port() if port is None else str(int(self.port) + 1)
self.create_cluster_file()
self.save_config()
def __enter__(self):
def __next_port(self):
if (self.first_port is None):
return get_free_port()
else:
self.last_used_port += 1
return self.last_used_port
def save_config(self):
new_conf_file = self.conf_file.parent / (self.conf_file.name + '.new')
with open(new_conf_file, 'x') as f:
conf_template = LocalCluster.configuration_template
if (self.use_legacy_conf_syntax):
conf_template = conf_template.replace("-", "_")
f.write(conf_template.format(
etcdir=self.etc,
fdbserver_bin=self.fdbserver_binary,
datadir=self.data,
logdir=self.log,
ip_address=self.ip_address
))
# By default, the cluster only has one process
# If a port number is given and process_number > 1, we will use subsequent numbers
# E.g., port = 4000, process_number = 5
# Then 4000,4001,4002,4003,4004 will be used as ports
# If port number is not given, we will randomly pick free ports
for port in self.server_ports:
f.write('[fdbserver.{server_port}]\n'.format(
server_port=port))
f.flush()
os.fsync(f.fileno())
os.replace(new_conf_file, self.conf_file)
def create_cluster_file(self):
with open(self.cluster_file, 'x') as f:
f.write('{desc}:{secret}@{ip_addr}:{server_port}'.format(
desc=self.cluster_desc,
secret=self.cluster_secret,
ip_addr=self.ip_address,
server_port=self.server_ports[0]
))
def start_cluster(self):
assert not self.running, "Can't start a server that is already running"
args = [str(self.fdbmonitor_binary),
'--conffile',
str(self.etc.joinpath('foundationdb.conf')),
'--lockfile',
str(self.etc.joinpath('fdbmonitor.lock'))]
self.fdbmonitor_logfile = open(self.log.joinpath('fdbmonitor.log'), 'w')
self.process = subprocess.Popen(args, stdout=self.fdbmonitor_logfile, stderr=self.fdbmonitor_logfile)
self.fdbmonitor_logfile = open(
self.log.joinpath('fdbmonitor.log'), 'w')
self.process = subprocess.Popen(
args, stdout=self.fdbmonitor_logfile, stderr=self.fdbmonitor_logfile, env=self.process_env())
self.running = True
return self
def __exit__(self, xc_type, exc_value, traceback):
def stop_cluster(self):
assert self.running, "Server is not running"
if self.process.poll() is None:
self.process.terminate()
self.running = False
def create_database(self, storage='ssd'):
args = [self.fdbcli_binary, '-C', self.etc.joinpath('fdb.cluster'), '--exec',
'configure new single {} tenant_mode=optional_experimental'.format(storage)]
subprocess.run(args)
def ensure_ports_released(self, timeout_sec=5):
sec = 0
while (sec < timeout_sec):
in_use = False
for port in self.server_ports:
if is_port_in_use(port):
print("Port {} in use. Waiting for it to be released".format(port))
in_use = True
break
if not in_use:
return
time.sleep(0.5)
sec += 0.5
assert False, "Failed to release ports in {}s".format(timeout_sec)
def __enter__(self):
self.start_cluster()
return self
def __exit__(self, xc_type, exc_value, traceback):
self.stop_cluster()
def create_database(self, storage='ssd', enable_tenants=True):
db_config = 'configure new single {}'.format(storage)
if (enable_tenants):
db_config += " tenant_mode=optional_experimental"
args = [self.fdbcli_binary, '-C',
self.cluster_file, '--exec', db_config]
res = subprocess.run(args, env=self.process_env())
assert res.returncode == 0, "Create database failed with {}".format(
res.returncode)
def get_status(self):
args = [self.fdbcli_binary, '-C', self.cluster_file, '--exec',
'status json']
res = subprocess.run(args, env=self.process_env(),
stdout=subprocess.PIPE)
assert res.returncode == 0, "Get status failed with {}".format(
res.returncode)
return json.loads(res.stdout)
def process_env(self):
env = dict(os.environ)
env.update(self.env_vars)
return env
def set_env_var(self, var_name, var_val):
self.env_vars[var_name] = var_val

View File

@ -5,9 +5,8 @@ import os
import shutil
import subprocess
import sys
from local_cluster import LocalCluster
from local_cluster import LocalCluster, random_secret_string
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from random import choice
from pathlib import Path
@ -18,8 +17,7 @@ class TempCluster:
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
tmp_dir = self.build_dir.joinpath(
"tmp",
"".join(choice(LocalCluster.valid_letters_for_secret)
for i in range(16)),
random_secret_string(16)
)
tmp_dir.mkdir(parents=True)
self.cluster = LocalCluster(

463
tests/TestRunner/upgrade_test.py Executable file
View File

@ -0,0 +1,463 @@
#!/usr/bin/env python3
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import glob
import os
from pathlib import Path
import platform
import random
import shutil
import stat
import subprocess
import sys
from threading import Thread, Event
import traceback
import time
from urllib import request
from local_cluster import LocalCluster, random_secret_string
SUPPORTED_PLATFORMS = ["x86_64"]
SUPPORTED_VERSIONS = ["7.2.0", "7.1.0", "7.0.0", "6.3.24", "6.3.23",
"6.3.22", "6.3.18", "6.3.17", "6.3.16", "6.3.15", "6.3.13", "6.3.12", "6.3.9", "6.2.30",
"6.2.29", "6.2.28", "6.2.27", "6.2.26", "6.2.25", "6.2.24", "6.2.23", "6.2.22", "6.2.21",
"6.2.20", "6.2.19", "6.2.18", "6.2.17", "6.2.16", "6.2.15", "6.2.10", "6.1.13", "6.1.12",
"6.1.11", "6.1.10", "6.0.18", "6.0.17", "6.0.16", "6.0.15", "6.0.14", "5.2.8", "5.2.7",
"5.1.7", "5.1.6"]
FDB_DOWNLOAD_ROOT = "https://github.com/apple/foundationdb/releases/download/"
CURRENT_VERSION = "7.2.0"
HEALTH_CHECK_TIMEOUT_SEC = 5
PROGRESS_CHECK_TIMEOUT_SEC = 30
TRANSACTION_RETRY_LIMIT = 100
RUN_WITH_GDB = False
def make_executable(path):
st = os.stat(path)
os.chmod(path, st.st_mode | stat.S_IEXEC)
def remove_file_no_fail(filename):
try:
os.remove(filename)
except OSError:
pass
def version_from_str(ver_str):
ver = [int(s) for s in ver_str.split(".")]
assert len(ver) == 3, "Invalid version string {}".format(ver_str)
return ver
def api_version_from_str(ver_str):
ver_tuple = version_from_str(ver_str)
return ver_tuple[0]*100+ver_tuple[1]*10
def version_before(ver_str1, ver_str2):
return version_from_str(ver_str1) < version_from_str(ver_str2)
def random_sleep(minSec, maxSec):
timeSec = random.uniform(minSec, maxSec)
print("Sleeping for {0:.3f}s".format(timeSec))
time.sleep(timeSec)
class UpgradeTest:
def __init__(self, build_dir: str, upgrade_path: list, process_number: int = 1, port: str = None):
self.build_dir = Path(build_dir).resolve()
assert self.build_dir.exists(), "{} does not exist".format(build_dir)
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
self.upgrade_path = upgrade_path
for version in upgrade_path:
assert version in SUPPORTED_VERSIONS, "Unsupported version {}".format(
version)
self.platform = platform.machine()
assert self.platform in SUPPORTED_PLATFORMS, "Unsupported platform {}".format(
self.platform)
self.tmp_dir = self.build_dir.joinpath(
"tmp",
random_secret_string(16)
)
self.tmp_dir.mkdir(parents=True)
self.download_dir = self.build_dir.joinpath(
"tmp",
"old_binaries"
)
self.download_old_binaries()
self.create_external_lib_dir()
init_version = upgrade_path[0]
self.cluster = LocalCluster(
self.tmp_dir,
self.binary_path(init_version, "fdbserver"),
self.binary_path(init_version, "fdbmonitor"),
self.binary_path(init_version, "fdbcli"),
process_number,
port=port,
create_config=False
)
self.cluster.create_cluster_file()
self.configure_version(init_version)
self.log = self.cluster.log
self.etc = self.cluster.etc
self.data = self.cluster.data
self.input_pipe_path = self.tmp_dir.joinpath(
"input.{}".format(random_secret_string(8)))
self.output_pipe_path = self.tmp_dir.joinpath(
"output.{}".format(random_secret_string(8)))
os.mkfifo(self.input_pipe_path)
os.mkfifo(self.output_pipe_path)
self.progress_event = Event()
def binary_path(self, version, bin_name):
if version == CURRENT_VERSION:
return self.build_dir.joinpath("bin", bin_name)
else:
return self.download_dir.joinpath(version, bin_name)
def lib_dir(self, version):
if version == CURRENT_VERSION:
return self.build_dir.joinpath("lib")
else:
return self.download_dir.joinpath(version)
# Download an old binary of a given version from a remote repository
def download_old_binary(self, version, target_bin_name, remote_bin_name, makeExecutable):
local_file = self.binary_path(version, target_bin_name)
if (local_file.exists()):
return
self.download_dir.joinpath(version).mkdir(
parents=True, exist_ok=True)
remote_file = "{}{}/{}".format(FDB_DOWNLOAD_ROOT,
version, remote_bin_name)
print("Downloading '{}' to '{}'...".format(remote_file, local_file))
request.urlretrieve(remote_file, local_file)
print("Download complete")
assert local_file.exists(), "{} does not exist".format(local_file)
if makeExecutable:
make_executable(local_file)
# Download all old binaries required for testing the specified upgrade path
def download_old_binaries(self):
for version in self.upgrade_path:
if version == CURRENT_VERSION:
continue
self.download_old_binary(version,
"fdbserver", "fdbserver.{}".format(self.platform), True)
self.download_old_binary(version,
"fdbmonitor", "fdbmonitor.{}".format(self.platform), True)
self.download_old_binary(version,
"fdbcli", "fdbcli.{}".format(self.platform), True)
self.download_old_binary(version,
"libfdb_c.so", "libfdb_c.{}.so".format(self.platform), False)
# Create a directory for external client libraries for MVC and fill it
# with the libraries necessary for the specified upgrade path
def create_external_lib_dir(self):
self.external_lib_dir = self.tmp_dir.joinpath("client_libs")
self.external_lib_dir.mkdir(parents=True)
for version in self.upgrade_path:
src_file_path = self.lib_dir(version).joinpath("libfdb_c.so")
assert src_file_path.exists(), "{} does not exist".format(src_file_path)
target_file_path = self.external_lib_dir.joinpath(
"libfdb_c.{}.so".format(version))
shutil.copyfile(src_file_path, target_file_path)
# Perform a health check of the cluster: Use fdbcli status command to check if the number of
# server processes and their versions are as expected
def health_check(self, timeout_sec=HEALTH_CHECK_TIMEOUT_SEC):
retries = 0
while retries < timeout_sec:
retries += 1
status = self.cluster.get_status()
if not "processes" in status["cluster"]:
print("Health check: no processes found. Retrying")
time.sleep(1)
continue
num_proc = len(status["cluster"]["processes"])
if (num_proc < self.cluster.process_number):
print("Health check: {} of {} processes found. Retrying".format(
num_proc, self.cluster.process_number))
time.sleep(1)
continue
assert num_proc == self.cluster.process_number, "Number of processes: expected: {}, actual: {}".format(
self.cluster.process_number, num_proc)
for (_, proc_stat) in status["cluster"]["processes"].items():
proc_ver = proc_stat["version"]
assert proc_ver == self.cluster_version, "Process version: expected: {}, actual: {}".format(
self.cluster_version, proc_ver)
print("Health check: OK")
return
assert False, "Health check: Failed"
# Create and save a cluster configuration for the given version
def configure_version(self, version):
self.cluster.fdbmonitor_binary = self.binary_path(
version, "fdbmonitor")
self.cluster.fdbserver_binary = self.binary_path(version, "fdbserver")
self.cluster.fdbcli_binary = self.binary_path(version, "fdbcli")
self.cluster.set_env_var = "LD_LIBRARY_PATH", self.lib_dir(version)
if (version_before(version, "7.1.0")):
self.cluster.use_legacy_conf_syntax = True
self.cluster.save_config()
self.cluster_version = version
# Upgrade the cluster to the given version
def upgrade_to(self, version):
print("Upgrading to version {}".format(version))
self.cluster.stop_cluster()
self.configure_version(version)
self.cluster.ensure_ports_released()
self.cluster.start_cluster()
print("Upgraded to {}".format(version))
def __enter__(self):
print("Starting cluster version {}".format(self.cluster_version))
self.cluster.start_cluster()
self.cluster.create_database(enable_tenants=False)
return self
def __exit__(self, xc_type, exc_value, traceback):
self.cluster.stop_cluster()
shutil.rmtree(self.tmp_dir)
# Determine FDB API version matching the upgrade path
def determine_api_version(self):
self.api_version = api_version_from_str(CURRENT_VERSION)
for version in self.upgrade_path:
self.api_version = min(
api_version_from_str(version), self.api_version)
# Start the tester to generate the workload specified by the test file
def exec_workload(self, test_file):
self.tester_retcode = 1
try:
self.determine_api_version()
cmd_args = [self.tester_bin,
'--cluster-file', self.cluster.cluster_file,
'--test-file', test_file,
'--external-client-dir', self.external_lib_dir,
'--disable-local-client',
'--input-pipe', self.input_pipe_path,
'--output-pipe', self.output_pipe_path,
'--api-version', str(self.api_version),
'--log',
'--log-dir', self.log,
'--transaction-retry-limit', str(TRANSACTION_RETRY_LIMIT)]
if (RUN_WITH_GDB):
cmd_args = ['gdb', '-ex', 'run', '--args'] + cmd_args
print("Executing test command: {}".format(
" ".join([str(c) for c in cmd_args])))
self.tester_proc = subprocess.Popen(
cmd_args, stdout=sys.stdout, stderr=sys.stderr)
self.tester_retcode = self.tester_proc.wait()
self.tester_proc = None
if (self.tester_retcode != 0):
print("Tester failed with return code {}".format(
self.tester_retcode))
except Exception:
print("Execution of test workload failed")
print(traceback.format_exc())
# Perform a progress check: Trigger it and wait until it is completed
def progress_check(self, ctrl_pipe):
self.progress_event.clear()
os.write(ctrl_pipe, b"CHECK\n")
self.progress_event.wait(
None if RUN_WITH_GDB else PROGRESS_CHECK_TIMEOUT_SEC)
if (self.progress_event.is_set()):
print("Progress check: OK")
else:
assert False, "Progress check failed after upgrade to version {}".format(
self.cluster_version)
# The main function of a thread for reading and processing
# the notifications received from the tester
def output_pipe_reader(self):
try:
print("Opening pipe {} for reading".format(self.output_pipe_path))
self.output_pipe = open(self.output_pipe_path, 'r')
for line in self.output_pipe:
msg = line.strip()
print("Received {}".format(msg))
if (msg == "CHECK_OK"):
self.progress_event.set()
self.output_pipe.close()
except Exception as e:
print("Error while reading output pipe", e)
print(traceback.format_exc())
# Execute the upgrade test workflow according to the specified
# upgrade path: perform the upgrade steps and check success after each step
def exec_upgrade_test(self):
print("Opening pipe {} for writing".format(self.input_pipe_path))
ctrl_pipe = os.open(self.input_pipe_path, os.O_WRONLY)
try:
self.health_check()
self.progress_check(ctrl_pipe)
for version in self.upgrade_path[1:]:
random_sleep(0.0, 2.0)
self.upgrade_to(version)
self.health_check()
self.progress_check(ctrl_pipe)
os.write(ctrl_pipe, b"STOP\n")
finally:
os.close(ctrl_pipe)
# Kill the tester process if it is still alive
def kill_tester_if_alive(self, workload_thread):
if not workload_thread.is_alive():
return
if self.tester_proc is not None:
try:
print("Killing the tester process")
self.tester_proc.kill()
workload_thread.join(5)
except:
print("Failed to kill the tester process")
# The main method implementing the test:
# - Start a thread for generating the workload using a tester binary
# - Start a thread for reading notifications from the tester
# - Trigger the upgrade steps and checks in the main thread
def exec_test(self, args):
self.tester_bin = self.build_dir.joinpath("bin", "fdb_c_api_tester")
assert self.tester_bin.exists(), "{} does not exist".format(self.tester_bin)
self.tester_proc = None
test_retcode = 1
try:
workload_thread = Thread(
target=self.exec_workload, args=(args.test_file,))
workload_thread.start()
reader_thread = Thread(target=self.output_pipe_reader)
reader_thread.start()
self.exec_upgrade_test()
test_retcode = 0
except Exception:
print("Upgrade test failed")
print(traceback.format_exc())
self.kill_tester_if_alive(workload_thread)
finally:
workload_thread.join(5)
reader_thread.join(5)
self.kill_tester_if_alive(workload_thread)
if test_retcode == 0:
test_retcode = self.tester_retcode
return test_retcode
# Check the cluster log for errors
def check_cluster_logs(self, error_limit=100):
sev40s = (
subprocess.getoutput(
"grep -r 'Severity=\"40\"' {}".format(
self.cluster.log.as_posix())
)
.rstrip()
.splitlines()
)
err_cnt = 0
for line in sev40s:
# When running ASAN we expect to see this message. Boost coroutine should be using the
# correct asan annotations so that it shouldn't produce any false positives.
if line.endswith(
"WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false positives in some cases!"
):
continue
if (err_cnt < error_limit):
print(line)
err_cnt += 1
if err_cnt > 0:
print(
">>>>>>>>>>>>>>>>>>>> Found {} severity 40 events - the test fails", err_cnt)
else:
print("No error found in logs")
return err_cnt == 0
# Dump the last cluster configuration and cluster logs
def dump_cluster_logs(self):
for etc_file in glob.glob(os.path.join(self.cluster.etc, "*")):
print(">>>>>>>>>>>>>>>>>>>> Contents of {}:".format(etc_file))
with open(etc_file, "r") as f:
print(f.read())
for log_file in glob.glob(os.path.join(self.cluster.log, "*")):
print(">>>>>>>>>>>>>>>>>>>> Contents of {}:".format(log_file))
with open(log_file, "r") as f:
print(f.read())
if __name__ == "__main__":
parser = ArgumentParser(
formatter_class=RawDescriptionHelpFormatter,
description="""
A script for testing FDB multi-version client in upgrade scenarios. Creates a local cluster,
generates a workload using fdb_c_api_tester with a specified test file, and performs
cluster upgrade according to the specified upgrade path. Checks if the workload successfully
progresses after each upgrade step.
""",
)
parser.add_argument(
"--build-dir",
"-b",
metavar="BUILD_DIRECTORY",
help="FDB build directory",
required=True,
)
parser.add_argument(
'--upgrade-path',
nargs='+',
help='Cluster upgrade path: a space separated list of versions',
default=[CURRENT_VERSION]
)
parser.add_argument(
'--test-file',
help='A .toml file describing a test workload to be generated with fdb_c_api_tester',
required=True,
)
parser.add_argument(
"--process-number",
"-p",
help="Number of fdb processes running (default: 0 - random)",
type=int,
default=0,
)
parser.add_argument(
'--disable-log-dump',
help='Do not dump cluster log on error',
action="store_true"
)
parser.add_argument(
'--run-with-gdb',
help='Execute the tester binary from gdb',
action="store_true"
)
args = parser.parse_args()
if (args.process_number == 0):
args.process_number = random.randint(1, 5)
print("Testing with {} processes".format(args.process_number))
if (args.run_with_gdb):
RUN_WITH_GDB = True
errcode = 1
with UpgradeTest(args.build_dir, args.upgrade_path, args.process_number) as test:
print("log-dir: {}".format(test.log))
print("etc-dir: {}".format(test.etc))
print("data-dir: {}".format(test.data))
print("cluster-file: {}".format(test.etc.joinpath("fdb.cluster")))
errcode = test.exec_test(args)
if not test.check_cluster_logs():
errcode = 1 if errcode == 0 else errcode
if errcode != 0 and not args.disable_log_dump:
test.dump_cluster_logs()
sys.exit(errcode)