Modified ConfigurationDatabaseWorkload to run modified cycle workload

This commit is contained in:
sfc-gh-tclinkenbeard 2021-04-23 14:05:14 -07:00
parent 4665508da1
commit ee4f9d24b9
3 changed files with 109 additions and 44 deletions

View File

@ -28,7 +28,7 @@
#include "fdbclient/FDBTypes.h"
#include "flow/flow.h"
class IConfigTransaction {
class IConfigTransaction : public ReferenceCounted<IConfigTransaction> {
public:
virtual void set(KeyRef key, ValueRef value) = 0;
virtual void clearRange(KeyRef begin, KeyRef end) = 0;
@ -41,7 +41,7 @@ public:
virtual void fullReset() = 0;
};
class SimpleConfigTransaction : public IConfigTransaction {
class SimpleConfigTransaction final : public IConfigTransaction {
std::unique_ptr<class SimpleConfigTransactionImpl> impl;
public:

View File

@ -190,11 +190,23 @@ class SimpleConfigDatabaseNodeImpl {
for (const auto& versionedMutation : versionedMutations) {
const auto& mutation = versionedMutation.mutation;
if (mutation.type == MutationRef::Type::SetValue) {
// FIXME: This is very inefficient
Standalone<RangeResultRef> newRange;
bool added = false;
for (auto& kv : range) {
if (kv.key == mutation.param1) {
if (kv.key > mutation.param1 && !added) {
newRange.push_back_deep(newRange.arena(), KeyValueRef(mutation.param1, mutation.param2));
added = true;
} else if (kv.key == mutation.param1) {
kv.value = mutation.param2;
added = true;
}
newRange.push_back_deep(newRange.arena(), kv);
}
if (!added) {
newRange.push_back_deep(newRange.arena(), KeyValueRef(mutation.param1, mutation.param2));
}
range = std::move(newRange);
} else if (mutation.type == MutationRef::Type::ClearRange) {
// FIXME: This is very inefficient
Standalone<RangeResultRef> newRange;

View File

@ -28,60 +28,81 @@
#include "flow/actorcompiler.h" // has to be last include
class ConfigurationDatabaseWorkload : public TestWorkload {
Key key;
int numIncrements;
KeyRange keys;
int initialCycleSize;
int numTransactionsPerClient;
int numClients;
int numBroadcasters;
int numConsumersPerBroadcaster;
double meanSleepBetweenTransactions;
double meanSleepWithinTransaction;
int numSwaps{ 0 };
int transactionTooOldErrors{ 0 };
Promise<int> expectedTotal; // when clients finish, publish expected total value here
Promise<std::map<uint32_t, uint32_t>> finalSnapshot; // when clients finish, publish final snapshot here
ACTOR static Future<int> getCurrentValue(Database cx, Key key) {
state SimpleConfigTransaction tr(cx->getConnectionFile()->getConnectionString());
state int result = 0;
ACTOR static Future<std::map<uint32_t, uint32_t>> getSnapshot(ConfigurationDatabaseWorkload* self, Database cx) {
state std::map<uint32_t, uint32_t> result;
state Reference<IConfigTransaction> tr =
makeReference<SimpleConfigTransaction>(cx->getConnectionFile()->getConnectionString());
loop {
try {
Optional<Value> value = wait(tr.get(key));
if (value.present()) {
result = BinaryReader::fromStringRef<int>(value.get(), Unversioned());
Standalone<RangeResultRef> range = wait(tr->getRange(self->keys));
for (const auto& kv : range) {
result[self->fromKey(kv.key)] = self->fromKey(kv.value);
}
return result;
} catch (Error &e) {
wait(tr.onError(e));
wait(tr->onError(e));
}
}
}
ACTOR static Future<Void> increment(ConfigurationDatabaseWorkload* self, Database cx) {
state SimpleConfigTransaction tr(cx->getConnectionFile()->getConnectionString());
Key toKey(uint32_t index) const { return BinaryWriter::toValue(index, Unversioned()).withPrefix(keys.begin); }
uint32_t fromKey(KeyRef key) const {
return BinaryReader::fromStringRef<uint32_t>(key.removePrefix(keys.begin), Unversioned());
}
ACTOR static Future<int> getCycleLength(ConfigurationDatabaseWorkload const* self,
Reference<IConfigTransaction> tr) {
state Standalone<RangeResultRef> range = wait(tr->getRange(self->keys));
return range.size();
}
ACTOR static Future<Void> cycleSwap(ConfigurationDatabaseWorkload* self, Database cx) {
state Reference<IConfigTransaction> tr =
makeReference<SimpleConfigTransaction>(cx->getConnectionFile()->getConnectionString());
loop {
try {
state int currentValue = 0;
Optional<Value> value = wait(tr.get(self->key));
if (value.present()) {
currentValue = BinaryReader::fromStringRef<int>(value.get(), Unversioned());
}
++currentValue;
tr.set(self->key, BinaryWriter::toValue(currentValue, Unversioned()));
wait(delay(2 * self->meanSleepWithinTransaction * deterministicRandom()->random01()));
wait(tr.commit());
int length = wait(getCycleLength(self, tr));
state Key k0 = self->toKey(deterministicRandom()->randomInt(0, length));
Optional<Value> _k1 = wait(tr->get(k0));
state Key k1 = _k1.get();
wait(delay(deterministicRandom()->random01() * self->meanSleepWithinTransaction));
Optional<Value> _k2 = wait(tr->get(k1));
state Key k2 = _k2.get();
Optional<Value> _k3 = wait(tr->get(k2));
state Key k3 = _k3.get();
tr->set(k0, k2);
tr->set(k1, k3);
tr->set(k2, k1);
wait(delay(deterministicRandom()->random01() * self->meanSleepWithinTransaction));
wait(tr->commit());
++self->numSwaps;
return Void();
} catch (Error &e) {
if (e.code() == error_code_transaction_too_old) {
++self->transactionTooOldErrors;
}
wait(tr.onError(e));
wait(tr->onError(e));
}
}
}
ACTOR static Future<Void> runClient(ConfigurationDatabaseWorkload* self, Database cx) {
state int i = 0;
for (; i < self->numIncrements; ++i) {
wait(increment(self, cx));
for (; i < self->numTransactionsPerClient; ++i) {
wait(cycleSwap(self, cx));
wait(delay(2 * self->meanSleepBetweenTransactions * deterministicRandom()->random01()));
}
return Void();
@ -93,8 +114,8 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
clients.push_back(runClient(self, cx));
}
wait(waitForAll(clients));
int expectedTotal = wait(getCurrentValue(cx, self->key));
self->expectedTotal.send(expectedTotal);
state std::map<uint32_t, uint32_t> finalSnapshot = wait(getSnapshot(self, cx));
self->finalSnapshot.send(finalSnapshot);
return Void();
}
@ -104,10 +125,14 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
}
ACTOR static Future<Void> runConsumer(ConfigurationDatabaseWorkload* self, Reference<ConfigFollowerInterface> cfi) {
state std::map<Key, Value> database;
state std::map<uint32_t, uint32_t> database;
state Version mostRecentVersion = wait(getCurrentVersion(cfi));
state Future<int> expectedTotal = self->expectedTotal.getFuture();
state int currentValue = 0;
ConfigFollowerGetFullDatabaseReply reply =
wait(cfi->getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{ mostRecentVersion, {} }));
for (const auto& [k, v] : reply.database) {
database[self->fromKey(k)] = self->fromKey(v);
}
state Future<std::map<uint32_t, uint32_t>> finalSnapshot = self->finalSnapshot.getFuture();
loop {
state ConfigFollowerGetChangesReply changesReply =
wait(cfi->getChanges.getReply(ConfigFollowerGetChangesRequest{ mostRecentVersion, {} }));
@ -116,17 +141,15 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
for (const auto& versionedMutation : changesReply.versionedMutations) {
const auto& mutation = versionedMutation.mutation;
if (mutation.type == MutationRef::SetValue) {
database[mutation.param1] = mutation.param2;
database[self->fromKey(mutation.param1)] = self->fromKey(mutation.param2);
} else if (mutation.type == MutationRef::ClearRange) {
database.erase(database.find(mutation.param1), database.find(mutation.param2));
database.erase(database.find(self->fromKey(mutation.param1)),
database.find(self->fromKey(mutation.param2)));
} else {
ASSERT(false);
}
}
if (database.count(self->key)) {
currentValue = BinaryReader::fromStringRef<int>(database[self->key], Unversioned());
}
if (expectedTotal.isReady() && currentValue >= expectedTotal.get()) {
if (finalSnapshot.isReady() && database == finalSnapshot.get()) {
return Void();
}
wait(delayJittered(0.5));
@ -147,6 +170,22 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
return Void();
}
ACTOR static Future<Void> setup(ConfigurationDatabaseWorkload* self, Database cx) {
state Reference<IConfigTransaction> tr =
makeReference<SimpleConfigTransaction>(cx->getConnectionFile()->getConnectionString());
loop {
try {
for (int i = 0; i < self->initialCycleSize; ++i) {
tr->set(self->toKey(i), self->toKey((i + 1) % self->initialCycleSize));
}
wait(tr->commit());
return Void();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR static Future<Void> start(ConfigurationDatabaseWorkload *self, Database cx) {
state std::vector<Future<Void>> futures;
futures.push_back(runClients(self, cx));
@ -159,22 +198,36 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
public:
ConfigurationDatabaseWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
numIncrements = getOption(options, "numIncrements"_sr, 100);
Key keyPrefix = getOption(options, "keyPrefix"_sr, "key"_sr);
keys = KeyRangeRef(keyPrefix.withSuffix("/"_sr), keyPrefix.withSuffix("0"_sr));
initialCycleSize = getOption(options, "initialCycleSize"_sr, 10);
numTransactionsPerClient = getOption(options, "numTransactionsPerClient"_sr, 100);
numClients = getOption(options, "numClients"_sr, 10);
numBroadcasters = getOption(options, "numBroadcasters"_sr, 2);
numConsumersPerBroadcaster = getOption(options, "numConsumersPerBroadcaster"_sr, 2);
meanSleepBetweenTransactions = getOption(options, "meanSleepBetweenIncrements"_sr, 0.1);
meanSleepBetweenTransactions = getOption(options, "meanSleepBetweenTransactions"_sr, 0.1);
meanSleepWithinTransaction = getOption(options, "meanSleepWithinTransaction"_sr, 0.01);
}
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> setup(Database const& cx) override { return clientId ? Void() : setup(this, cx); }
Future<Void> start(Database const& cx) override { return clientId ? Void() : start(this, cx); }
Future<bool> check(Database const& cx) override {
return clientId ? true : (expectedTotal.getFuture().get() >= numClients * numIncrements);
if (clientId > 0) {
return true;
}
// Validate cycle invariant
auto snapshot = finalSnapshot.getFuture().get();
int current = 0;
for (int i = 0; i < snapshot.size(); ++i) {
if (i > 0 && current == 0)
return false;
current = snapshot[current];
}
return (current == 0);
}
std::string description() const override { return "ConfigurationDatabase"; }
void getMetrics(std::vector<PerfMetric>& m) override {
if (clientId == 0) {
m.push_back(PerfMetric("TotalWrites", expectedTotal.getFuture().get(), false));
m.push_back(PerfMetric("Swaps", numSwaps, false));
m.push_back(PerfMetric("TransactionTooOldErrors", transactionTooOldErrors, false));
}
}