1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-28 02:48:09 +08:00

Enable compaction on SimpleConfigDatabaseNode

This commit is contained in:
sfc-gh-tclinkenbeard 2021-04-24 13:37:15 -07:00
parent 8540889b14
commit 8fa6016287
5 changed files with 97 additions and 30 deletions

@ -31,6 +31,7 @@
class IConfigTransaction : public ReferenceCounted<IConfigTransaction> {
public:
virtual void set(KeyRef key, ValueRef value) = 0;
virtual void clear(KeyRef) = 0;
virtual void clearRange(KeyRef begin, KeyRef end) = 0;
virtual Future<Optional<Value>> get(KeyRef) = 0;
virtual Future<Standalone<RangeResultRef>> getRange(KeyRangeRef) = 0;
@ -48,6 +49,7 @@ public:
SimpleConfigTransaction(ClusterConnectionString const&);
~SimpleConfigTransaction();
void set(KeyRef begin, KeyRef end) override;
void clear(KeyRef) override;
void clearRange(KeyRef begin, KeyRef end) override;
Future<Optional<Value>> get(KeyRef) override;
Future<Standalone<RangeResultRef>> getRange(KeyRangeRef) override;

@ -77,6 +77,11 @@ public:
mutations.emplace_back_deep(mutations.arena(), MutationRef::Type::SetValue, key, value);
}
void clear(KeyRef key) {
mutations.emplace_back_deep(mutations.arena(), MutationRef::Type::ClearRange, key, keyAfter(key));
ASSERT(keyAfter(key) > key);
}
void clearRange(KeyRef begin, KeyRef end) {
mutations.emplace_back_deep(mutations.arena(), MutationRef::Type::ClearRange, begin, end);
}
@ -117,6 +122,10 @@ void SimpleConfigTransaction::set(KeyRef key, ValueRef value) {
impl->set(key, value);
}
void SimpleConfigTransaction::clear(KeyRef key) {
impl->clear(key);
}
void SimpleConfigTransaction::clearRange(KeyRef begin, KeyRef end) {
impl->clearRange(begin, end);
}

@ -30,16 +30,34 @@ class SimpleConfigBroadcasterImpl {
ActorCollection actors{ false };
static const double POLLING_INTERVAL; // TODO: Make knob?
static const double COMPACTION_INTERVAL; // TODO: Make knob?
ACTOR static Future<Void> fetchUpdates(SimpleConfigBroadcasterImpl *self) {
loop {
ConfigFollowerGetChangesReply reply = wait(
self->subscriber->getChanges.getReply(ConfigFollowerGetChangesRequest{ self->mostRecentVersion, {} }));
for (const auto &versionedMutation : reply.versionedMutations) {
self->versionedMutations.push_back(versionedMutation);
try {
ConfigFollowerGetChangesReply reply = wait(self->subscriber->getChanges.getReply(
ConfigFollowerGetChangesRequest{ self->mostRecentVersion, {} }));
for (const auto& versionedMutation : reply.versionedMutations) {
self->versionedMutations.push_back(versionedMutation);
}
self->mostRecentVersion = reply.mostRecentVersion;
wait(delayJittered(POLLING_INTERVAL));
} catch (Error& e) {
if (e.code() == error_code_version_already_compacted) {
ConfigFollowerGetFullDatabaseReply reply = wait(self->subscriber->getFullDatabase.getReply(
ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, Optional<Value>{} }));
self->database = reply.database;
} else {
throw e;
}
}
self->mostRecentVersion = reply.mostRecentVersion;
wait(delay(POLLING_INTERVAL));
}
}
ACTOR static Future<Void> compactor(SimpleConfigBroadcasterImpl* self) {
loop {
wait(delayJittered(COMPACTION_INTERVAL));
wait(self->subscriber->compact.getReply(ConfigFollowerCompactRequest{ self->mostRecentVersion }));
}
}
@ -57,11 +75,9 @@ class SimpleConfigBroadcasterImpl {
}
static void removeRange(std::map<Key, Value> &database, KeyRef begin, KeyRef end) {
ASSERT(end >= begin);
auto b = database.lower_bound(begin);
auto e = database.lower_bound(end);
if (e != database.end() && e->first == end) {
++e;
}
database.erase(b, e);
}
@ -73,6 +89,7 @@ class SimpleConfigBroadcasterImpl {
ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, Optional<Value>{} }));
self->database = reply.database;
self->actors.add(fetchUpdates(self));
self->actors.add(compactor(self));
loop {
//self->traceQueuedMutations();
choose {
@ -148,6 +165,7 @@ public:
};
const double SimpleConfigBroadcasterImpl::POLLING_INTERVAL = 0.5;
const double SimpleConfigBroadcasterImpl::COMPACTION_INTERVAL = 5.0;
SimpleConfigBroadcaster::SimpleConfigBroadcaster(ClusterConnectionString const& ccs)
: impl(std::make_unique<SimpleConfigBroadcasterImpl>(ccs)) {}

@ -30,6 +30,7 @@
namespace {
const KeyRef lastCompactedVersionKey = LiteralStringRef("lastCompactedVersion");
const KeyRef liveTransactionVersionKey = LiteralStringRef("liveTransactionVersion");
const KeyRef committedVersionKey = LiteralStringRef("committedVersion");
const KeyRangeRef kvKeys = KeyRangeRef(LiteralStringRef("kv/"), LiteralStringRef("kv0"));
@ -45,6 +46,7 @@ Key versionedMutationKey(Version version, uint32_t index) {
Version getVersionFromVersionedMutationKey(KeyRef versionedMutationKey) {
uint64_t bigEndianResult;
ASSERT(versionedMutationKey.startsWith(mutationKeys.begin));
BinaryReader br(versionedMutationKey.removePrefix(mutationKeys.begin), IncludeVersion());
br >> bigEndianResult;
return fromBigEndian64(bigEndianResult);
@ -83,7 +85,6 @@ class SimpleConfigDatabaseNodeImpl {
IKeyValueStore* kvStore; // FIXME: Prevent leak
std::map<std::string, std::string> config;
ActorCollection actors{ false };
Version lastCompactedVersion{ 0 };
FlowLock globalLock;
ACTOR static Future<Version> getLiveTransactionVersion(SimpleConfigDatabaseNodeImpl *self) {
@ -110,6 +111,19 @@ class SimpleConfigDatabaseNodeImpl {
return committedVersion;
}
ACTOR static Future<Version> getLastCompactedVersion(SimpleConfigDatabaseNodeImpl* self) {
Optional<Value> value = wait(self->kvStore->readValue(lastCompactedVersionKey));
state Version lastCompactedVersion = 0;
if (value.present()) {
lastCompactedVersion = BinaryReader::fromStringRef<Version>(value.get(), IncludeVersion());
} else {
self->kvStore->set(
KeyValueRef(lastCompactedVersionKey, BinaryWriter::toValue(lastCompactedVersion, IncludeVersion())));
wait(self->kvStore->commit());
}
return lastCompactedVersion;
}
ACTOR static Future<Standalone<VectorRef<VersionedMutationRef>>> getMutations(SimpleConfigDatabaseNodeImpl *self, Version startVersion, Version endVersion) {
Key startVersionKey = versionedMutationKey(startVersion, 0);
state KeyRangeRef keys(startVersionKey, mutationKeys.end);
@ -127,13 +141,17 @@ class SimpleConfigDatabaseNodeImpl {
}
ACTOR static Future<Void> getChanges(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetChangesRequest req) {
if (req.lastSeenVersion < self->lastCompactedVersion) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
Version lastCompactedVersion = wait(getLastCompactedVersion(self));
if (req.lastSeenVersion < lastCompactedVersion) {
req.reply.sendError(version_already_compacted());
return Void();
}
state Version committedVersion = wait(getCommittedVersion(self));
Standalone<VectorRef<VersionedMutationRef>> mutations = wait(getMutations(self, req.lastSeenVersion+1, committedVersion));
req.reply.send(ConfigFollowerGetChangesReply{committedVersion, mutations});
Standalone<VectorRef<VersionedMutationRef>> versionedMutations =
wait(getMutations(self, req.lastSeenVersion + 1, committedVersion));
req.reply.send(ConfigFollowerGetChangesReply{ committedVersion, versionedMutations });
return Void();
}
@ -189,7 +207,12 @@ class SimpleConfigDatabaseNodeImpl {
req.reply.sendError(transaction_too_old());
return Void();
}
state Standalone<RangeResultRef> range = wait(self->kvStore->readRange(req.keys));
state Standalone<RangeResultRef> range = wait(self->kvStore->readRange(req.keys.withPrefix(kvKeys.begin)));
// FIXME: Inefficient
for (auto& kv : range) {
ASSERT(kv.key.startsWith(kvKeys.begin));
kv.key = kv.key.removePrefix(kvKeys.begin);
}
Standalone<VectorRef<VersionedMutationRef>> versionedMutations = wait(getMutations(self, 0, req.version));
for (const auto& versionedMutation : versionedMutations) {
const auto& mutation = versionedMutation.mutation;
@ -249,6 +272,8 @@ class SimpleConfigDatabaseNodeImpl {
}
ACTOR static Future<Void> traceQueuedMutations(SimpleConfigDatabaseNodeImpl *self) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
state Version currentVersion = wait(getCommittedVersion(self));
Standalone<VectorRef<VersionedMutationRef>> versionedMutations = wait(getMutations(self, 0, currentVersion));
TraceEvent te("SimpleConfigNodeQueuedMutations");
@ -288,6 +313,8 @@ class SimpleConfigDatabaseNodeImpl {
ACTOR static Future<Void> getFullDatabase(SimpleConfigDatabaseNodeImpl* self,
ConfigFollowerGetFullDatabaseRequest req) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
state ConfigFollowerGetFullDatabaseReply reply;
Standalone<RangeResultRef> data = wait(self->kvStore->readRange(kvKeys));
for (const auto& kv : data) {
@ -308,10 +335,17 @@ class SimpleConfigDatabaseNodeImpl {
}
ACTOR static Future<Void> compact(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerCompactRequest req) {
// TODO: Lock
Standalone<VectorRef<VersionedMutationRef>> versionedMutations = wait(getMutations(self, 0, req.version));
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
state Version lastCompactedVersion = wait(getLastCompactedVersion(self));
if (req.version <= lastCompactedVersion) {
req.reply.send(Void());
return Void();
}
Standalone<VectorRef<VersionedMutationRef>> versionedMutations =
wait(getMutations(self, lastCompactedVersion + 1, req.version));
self->kvStore->clear(
KeyRangeRef(mutationKeys.begin, versionedMutationKey(req.version, 100000))); // FIXME: This is a hack
KeyRangeRef(versionedMutationKey(lastCompactedVersion + 1, 0), versionedMutationKey(req.version + 1, 0)));
for (const auto& versionedMutation : versionedMutations) {
const auto& version = versionedMutation.version;
const auto& mutation = versionedMutation.mutation;
@ -319,16 +353,18 @@ class SimpleConfigDatabaseNodeImpl {
req.reply.send(Void());
return Void();
} else if (mutation.type == MutationRef::SetValue) {
self->kvStore->set(KeyValueRef(mutation.param1, mutation.param2));
self->kvStore->set(KeyValueRef(mutation.param1.withPrefix(kvKeys.begin), mutation.param2));
} else if (mutation.type == MutationRef::ClearRange) {
self->kvStore->clear(KeyRangeRef(mutation.param1, mutation.param2));
self->kvStore->clear(
KeyRangeRef(mutation.param1.withPrefix(kvKeys.begin), mutation.param2.withPrefix(kvKeys.begin)));
} else {
ASSERT(false);
}
}
self->kvStore->set(
KeyValueRef(lastCompactedVersionKey, BinaryWriter::toValue(lastCompactedVersion, IncludeVersion())));
wait(self->kvStore->commit());
req.reply.send(Void());
self->lastCompactedVersion = req.version;
return Void();
}
@ -346,7 +382,6 @@ class SimpleConfigDatabaseNodeImpl {
}
when(ConfigFollowerCompactRequest req = waitNext(cfi->compact.getFuture())) {
self->actors.add(compact(self, req));
req.reply.send(Void());
}
when(wait(self->actors.getResult())) { ASSERT(false); }
}

@ -46,6 +46,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
int shrinkCount{ 0 };
int getChangesCount{ 0 };
int getFullDatabaseCount{ 0 };
int compactionCount{ 0 };
Promise<std::map<uint32_t, uint32_t>> finalSnapshot; // when clients finish, publish final snapshot here
ACTOR static Future<std::map<uint32_t, uint32_t>> getSnapshot(ConfigurationDatabaseWorkload* self, Database cx) {
@ -70,6 +71,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
}
uint32_t fromKey(KeyRef key) const {
ASSERT(key.startsWith(keys.begin));
return fromBigEndian32(BinaryReader::fromStringRef<uint32_t>(key.removePrefix(keys.begin), Unversioned()));
}
@ -114,7 +116,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
makeReference<SimpleConfigTransaction>(cx->getConnectionFile()->getConnectionString());
loop {
try {
int length = wait(getCycleLength(self, tr));
state int length = wait(getCycleLength(self, tr));
if (length == self->maxCycleSize) {
return Void();
}
@ -142,6 +144,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
makeReference<SimpleConfigTransaction>(cx->getConnectionFile()->getConnectionString());
loop {
try {
state Version currentVersion = wait(tr->getVersion());
state Standalone<RangeResultRef> range = wait(tr->getRange(self->keys));
if (range.size() == self->minCycleSize) {
return Void();
@ -156,7 +159,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
k2 = kv.value;
}
}
tr->clearRange(k1, k1);
tr->clear(k1);
tr->set(k0, k2);
wait(tr->commit());
++self->shrinkCount;
@ -222,12 +225,10 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
if (mutation.type == MutationRef::SetValue) {
database[self->fromKey(mutation.param1)] = self->fromKey(mutation.param2);
} else if (mutation.type == MutationRef::ClearRange) {
auto b = database.lower_bound(self->fromKey(mutation.param1));
auto e = database.lower_bound(self->fromKey(mutation.param2));
if (e != database.end() && e->first == self->fromKey(mutation.param2)) {
++e;
}
database.erase(b, e);
// FIXME: Here we're assuming all clears are point clears on existing keys
auto it = database.find(self->fromKey(mutation.param1));
ASSERT(it != database.end());
database.erase(it);
} else {
ASSERT(false);
}
@ -261,6 +262,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload {
wait(delay(2 * deterministicRandom()->random01() * self->meanCompactionInterval));
Version version = wait(getCurrentVersion(cfi));
wait(cfi->compact.getReply(ConfigFollowerCompactRequest{ version }));
++self->compactionCount;
}
}
@ -329,7 +331,7 @@ public:
}
// Validate cycle invariant
auto snapshot = finalSnapshot.getFuture().get();
int current = 0;
uint32_t current = 0;
for (int i = 0; i < snapshot.size(); ++i) {
if (i > 0 && current == 0)
return false;
@ -347,6 +349,7 @@ public:
m.push_back(PerfMetric("FinalSize", finalSnapshot.getFuture().get().size(), false));
m.push_back(PerfMetric("GetChangesCount", getChangesCount, false));
m.push_back(PerfMetric("GetFullDatabaseCount", getFullDatabaseCount, false));
m.push_back(PerfMetric("CompactionCount", compactionCount, false));
}
}
};