diff --git a/bindings/c/test/apitester/TesterApiWrapper.cpp b/bindings/c/test/apitester/TesterApiWrapper.cpp index 34b1fd802c..0955278d6b 100644 --- a/bindings/c/test/apitester/TesterApiWrapper.cpp +++ b/bindings/c/test/apitester/TesterApiWrapper.cpp @@ -75,6 +75,10 @@ void Transaction::reset() { fdb_transaction_reset(tx_.get()); } +fdb_error_t Transaction::setOption(FDBTransactionOption option) { + return fdb_transaction_set_option(tx_.get(), option, reinterpret_cast<const uint8_t*>(""), 0); +} + fdb_error_t FdbApi::setOption(FDBNetworkOption option, std::string_view value) { return fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(value.data()), value.size()); } diff --git a/bindings/c/test/apitester/TesterApiWrapper.h b/bindings/c/test/apitester/TesterApiWrapper.h index df6c3817dc..266358d022 100644 --- a/bindings/c/test/apitester/TesterApiWrapper.h +++ b/bindings/c/test/apitester/TesterApiWrapper.h @@ -70,6 +70,7 @@ public: Future commit(); Future onError(fdb_error_t err); void reset(); + fdb_error_t setOption(FDBTransactionOption option); private: std::shared_ptr<FDBTransaction> tx_; diff --git a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp index 3f2ee3573d..f01920cc3d 100644 --- a/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp +++ b/bindings/c/test/apitester/TesterCorrectnessWorkload.cpp @@ -30,7 +30,7 @@ namespace FdbApiTester { class ApiCorrectnessWorkload : public WorkloadBase { public: - enum OpType { OP_INSERT, OP_GET, OP_LAST = OP_GET }; + enum OpType { OP_INSERT, OP_GET, OP_COMMIT_READ, OP_LAST = OP_COMMIT_READ }; // The minimum length of a key int minKeyLength; @@ -47,6 +47,9 @@ public: // Maximum number of keys to be accessed by a transaction int maxKeysPerTransaction; + // Initial data size (number of key-value pairs) + int initialSize; + // The number of operations to be executed int numOperations; @@ -59,9 +62,10 @@ public: ApiCorrectnessWorkload(std::string_view prefix) { minKeyLength = 1; maxKeyLength = 64; - minValueLength = 1; - maxValueLength = 1000; + minValueLength = 5; + maxValueLength = 10; maxKeysPerTransaction = 50; + initialSize = 100; numOperations = 1000; readExistingKeysRatio = 0.9; keyPrefix = prefix; @@ -69,9 +73,16 @@ public: } void start() override { - schedule([this]() { nextTransaction(); }); + schedule([this]() { + // 1. Populate initial data + populateData([this]() { + // 2. Generate random workload + randomOperations(); + }); + }); } +private: std::string randomKeyName() { return keyPrefix + random.randomStringLowerCase(minKeyLength, maxKeyLength); } std::string randomValue() { return random.randomStringLowerCase(minValueLength, maxValueLength); } @@ -90,9 +101,13 @@ public: std::string key = store.getKey(genKey, true, 1); if (key != store.endKey()) { return key; - } else { - return store.getKey(genKey, true, 0); } + key = store.getKey(genKey, true, 0); + if (key != store.startKey()) { + return key; + } + std::cout << "No existing key found, using a new random key." << std::endl; + return genKey; } std::string randomKey(double existingKeyRatio) { @@ -120,7 +135,54 @@ public: for (const KeyValue& kv : *kvPairs) { store.set(kv.key, kv.value); } - cont(); + schedule(cont); + }); + } + + void randomCommitReadOp(TTaskFct cont) { + int numKeys = random.randomInt(1, maxKeysPerTransaction); + auto kvPairs = std::make_shared<std::vector<KeyValue>>(); + for (int i = 0; i < numKeys; i++) { + kvPairs->push_back(KeyValue{ randomKey(readExistingKeysRatio), randomValue() }); + } + execTransaction( + [kvPairs](auto ctx) { + for (const KeyValue& kv : *kvPairs) { + ctx->tx()->set(kv.key, kv.value); + } + ctx->commit(); + }, + [this, kvPairs, cont]() { + for (const KeyValue& kv : *kvPairs) { + store.set(kv.key, kv.value); + } + auto results = std::make_shared<std::vector<std::optional<std::string>>>(); + execTransaction( + [kvPairs, results](auto ctx) { + // TODO: Enable after merging with GRV caching + // ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE); + auto futures = std::make_shared<std::vector<Future>>(); + for (const auto& kv : *kvPairs) { + futures->push_back(ctx->tx()->get(kv.key, false)); + } + ctx->continueAfterAll(futures, [ctx, futures, results]() { + for (auto& f : *futures) { + results->push_back(((ValueFuture&)f).getValue()); + } + ctx->done(); + }); + }, + [this, kvPairs, results, cont]() { + ASSERT(results->size() == kvPairs->size()); + for (int i = 0; i < kvPairs->size(); i++) { + auto expected = store.get((*kvPairs)[i].key); + if ((*results)[i] != expected) { + std::cout << "randomCommitReadOp mismatch. key: " << (*kvPairs)[i].key + << " expected: " << expected << " actual: " << (*results)[i] << std::endl; + } + } + schedule(cont); + }); }); } @@ -149,11 +211,11 @@ public: for (int i = 0; i < keys->size(); i++) { auto expected = store.get((*keys)[i]); if ((*results)[i] != expected) { - std::cout << "randomGetOp mismatch. expected: " << expected << " actual: " << (*results)[i] - << std::endl; + std::cout << "randomGetOp mismatch. key :" << (*keys)[i] << " expected: " << expected + << " actual: " << (*results)[i] << std::endl; } } - cont(); + schedule(cont); }); } @@ -166,11 +228,21 @@ public: case OP_GET: randomGetOp(cont); break; + case OP_COMMIT_READ: + randomCommitReadOp(cont); + break; } } -private: - void nextTransaction() { + void populateData(TTaskFct cont) { + if (store.size() < initialSize) { + randomInsertOp([this, cont]() { populateData(cont); }); + } else { + schedule(cont); + } + } + + void randomOperations() { if (numOpLeft % 100 == 0) { std::cout << numOpLeft << " transactions left" << std::endl; } @@ -178,7 +250,7 @@ private: return; numOpLeft--; - randomOperation([this]() { schedule([this]() { nextTransaction(); }); }); + randomOperation([this]() { randomOperations(); }); } int numOpLeft;