mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 10:22:20 +08:00
Added KeyBackedObjectMap and KeyBackedObjectProperty classes for storing serializable objects in FDB (#5896)
* Cleaned up some lambda capture workaround since x=y captures weren't available when these classes were originally written. * Added KeyBackedObjectMap and KeyBackObjectProperty, which work like KeyBackedMap and KeyBackedProperty but use ObjectWriter/Reader for Value serialization so that the type can evolve over time. * Disabled unit tests which shouldn't run as part of random selection.
This commit is contained in:
parent
86ea63d1da
commit
d97d968176
@ -26,7 +26,9 @@
|
||||
#include "fdbclient/IClientApi.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/Subspace.h"
|
||||
#include "flow/ObjectSerializer.h"
|
||||
#include "flow/genericactors.actor.h"
|
||||
#include "flow/serialize.h"
|
||||
|
||||
// Codec is a utility struct to convert a type to and from a Tuple. It is used by the template
|
||||
// classes below like KeyBackedProperty and KeyBackedMap to convert key parts and values
|
||||
@ -168,14 +170,8 @@ public:
|
||||
Future<T> getOrThrow(Reference<ReadYourWritesTransaction> tr,
|
||||
Snapshot snapshot = Snapshot::False,
|
||||
Error err = key_not_found()) const {
|
||||
auto keyCopy = key;
|
||||
auto backtrace = platform::get_backtrace();
|
||||
return map(get(tr, snapshot), [=](Optional<T> val) -> T {
|
||||
if (!val.present()) {
|
||||
TraceEvent(SevInfo, "KeyBackedProperty_KeyNotFound")
|
||||
.detail("Key", keyCopy)
|
||||
.detail("Err", err.code())
|
||||
.detail("ParentTrace", backtrace.c_str());
|
||||
throw err;
|
||||
}
|
||||
|
||||
@ -184,45 +180,39 @@ public:
|
||||
}
|
||||
|
||||
Future<Optional<T>> get(Database cx, Snapshot snapshot = Snapshot::False) const {
|
||||
auto& copy = *this;
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
|
||||
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return copy.get(tr, snapshot);
|
||||
return self.get(tr, snapshot);
|
||||
});
|
||||
}
|
||||
|
||||
Future<T> getD(Database cx, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
|
||||
auto& copy = *this;
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
|
||||
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return copy.getD(tr, snapshot, defaultValue);
|
||||
return self.getD(tr, snapshot, defaultValue);
|
||||
});
|
||||
}
|
||||
|
||||
Future<T> getOrThrow(Database cx, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const {
|
||||
auto& copy = *this;
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
|
||||
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return copy.getOrThrow(tr, snapshot, err);
|
||||
return self.getOrThrow(tr, snapshot, err);
|
||||
});
|
||||
}
|
||||
|
||||
void set(Reference<ReadYourWritesTransaction> tr, T const& val) { return tr->set(key, Codec<T>::pack(val).pack()); }
|
||||
|
||||
Future<Void> set(Database cx, T const& val) {
|
||||
auto _key = key;
|
||||
Value _val = Codec<T>::pack(val).pack();
|
||||
return runRYWTransaction(cx, [_key, _val](Reference<ReadYourWritesTransaction> tr) {
|
||||
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->set(_key, _val);
|
||||
|
||||
self->set(tr, val);
|
||||
return Future<Void>(Void());
|
||||
});
|
||||
}
|
||||
@ -262,12 +252,12 @@ public:
|
||||
Key key;
|
||||
};
|
||||
|
||||
// Convenient read/write access to a sorted map of KeyType to ValueType that has key as its prefix
|
||||
// Convenient read/write access to a sorted map of KeyType to ValueType under prefix
|
||||
// Even though 'this' is not actually mutated, methods that change db keys are not const.
|
||||
template <typename _KeyType, typename _ValueType>
|
||||
class KeyBackedMap {
|
||||
public:
|
||||
KeyBackedMap(KeyRef key) : space(key) {}
|
||||
KeyBackedMap(KeyRef prefix) : space(prefix) {}
|
||||
|
||||
typedef _KeyType KeyType;
|
||||
typedef _ValueType ValueType;
|
||||
@ -336,6 +326,164 @@ public:
|
||||
Subspace space;
|
||||
};
|
||||
|
||||
// Convenient read/write access to a single value of type T stored at key
|
||||
// Even though 'this' is not actually mutated, methods that change the db key are not const.
|
||||
template <typename T, typename VersionOptions>
|
||||
class KeyBackedObjectProperty {
|
||||
public:
|
||||
KeyBackedObjectProperty(KeyRef key, VersionOptions versionOptions) : key(key), versionOptions(versionOptions) {}
|
||||
Future<Optional<T>> get(Reference<ReadYourWritesTransaction> tr, Snapshot snapshot = Snapshot::False) const {
|
||||
|
||||
return map(tr->get(key, snapshot), [vo = versionOptions](Optional<Value> const& val) -> Optional<T> {
|
||||
if (val.present())
|
||||
return ObjectReader::fromStringRef<T>(val.get(), vo);
|
||||
return {};
|
||||
});
|
||||
}
|
||||
|
||||
// Get property's value or defaultValue if it doesn't exist
|
||||
Future<T> getD(Reference<ReadYourWritesTransaction> tr,
|
||||
Snapshot snapshot = Snapshot::False,
|
||||
T defaultValue = T()) const {
|
||||
return map(get(tr, snapshot), [=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; });
|
||||
}
|
||||
// Get property's value or throw error if it doesn't exist
|
||||
Future<T> getOrThrow(Reference<ReadYourWritesTransaction> tr,
|
||||
Snapshot snapshot = Snapshot::False,
|
||||
Error err = key_not_found()) const {
|
||||
return map(get(tr, snapshot), [=](Optional<T> val) -> T {
|
||||
if (!val.present()) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
return val.get();
|
||||
});
|
||||
}
|
||||
|
||||
Future<Optional<T>> get(Database cx, Snapshot snapshot = Snapshot::False) const {
|
||||
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return self.get(tr, snapshot);
|
||||
});
|
||||
}
|
||||
|
||||
Future<T> getD(Database cx, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
|
||||
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return self.getD(tr, snapshot, defaultValue);
|
||||
});
|
||||
}
|
||||
|
||||
Future<T> getOrThrow(Database cx, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const {
|
||||
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return self.getOrThrow(tr, snapshot, err);
|
||||
});
|
||||
}
|
||||
|
||||
void set(Reference<ReadYourWritesTransaction> tr, T const& val) {
|
||||
return tr->set(key, ObjectWriter::toValue(val, versionOptions));
|
||||
}
|
||||
|
||||
Future<Void> set(Database cx, T const& val) {
|
||||
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
self.set(tr, val);
|
||||
return Future<Void>(Void());
|
||||
});
|
||||
}
|
||||
|
||||
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(key); }
|
||||
|
||||
Key key;
|
||||
VersionOptions versionOptions;
|
||||
};
|
||||
|
||||
// Convenient read/write access to a sorted map of KeyType to ValueType under key prefix
|
||||
// ValueType is encoded / decoded with ObjectWriter/ObjectReader
|
||||
// Even though 'this' is not actually mutated, methods that change db keys are not const.
|
||||
template <typename _KeyType, typename _ValueType, typename VersionOptions>
|
||||
class KeyBackedObjectMap {
|
||||
public:
|
||||
KeyBackedObjectMap(KeyRef prefix, VersionOptions versionOptions) : space(prefix), versionOptions(versionOptions) {}
|
||||
|
||||
typedef _KeyType KeyType;
|
||||
typedef _ValueType ValueType;
|
||||
typedef std::pair<KeyType, ValueType> PairType;
|
||||
typedef std::vector<PairType> PairsType;
|
||||
|
||||
// If end is not present one key past the end of the map is used.
|
||||
Future<PairsType> getRange(Reference<ReadYourWritesTransaction> tr,
|
||||
KeyType const& begin,
|
||||
Optional<KeyType> const& end,
|
||||
int limit,
|
||||
Snapshot snapshot = Snapshot::False,
|
||||
Reverse reverse = Reverse::False) const {
|
||||
Key endKey = end.present() ? space.pack(Codec<KeyType>::pack(end.get())) : space.range().end;
|
||||
return map(
|
||||
tr->getRange(
|
||||
KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), endKey), GetRangeLimits(limit), snapshot, reverse),
|
||||
[self = *this](RangeResult const& kvs) -> PairsType {
|
||||
PairsType results;
|
||||
for (int i = 0; i < kvs.size(); ++i) {
|
||||
KeyType key = Codec<KeyType>::unpack(self.space.unpack(kvs[i].key));
|
||||
ValueType val = ObjectReader::fromStringRef<ValueType>(kvs[i].value, self.versionOptions);
|
||||
results.push_back(PairType(key, val));
|
||||
}
|
||||
return results;
|
||||
});
|
||||
}
|
||||
|
||||
Future<Optional<ValueType>> get(Reference<ReadYourWritesTransaction> tr,
|
||||
KeyType const& key,
|
||||
Snapshot snapshot = Snapshot::False) const {
|
||||
return map(tr->get(space.pack(Codec<KeyType>::pack(key)), snapshot),
|
||||
[vo = versionOptions](Optional<Value> const& val) -> Optional<ValueType> {
|
||||
if (val.present())
|
||||
return ObjectReader::fromStringRef<ValueType>(val.get(), vo);
|
||||
return {};
|
||||
});
|
||||
}
|
||||
|
||||
// Returns a Property that can be get/set that represents key's entry in this this.
|
||||
KeyBackedObjectProperty<ValueType, VersionOptions> getProperty(KeyType const& key) const {
|
||||
return KeyBackedObjectProperty<ValueType, VersionOptions>(space.pack(Codec<KeyType>::pack(key)),
|
||||
versionOptions);
|
||||
}
|
||||
|
||||
// Returns the expectedSize of the set key
|
||||
int set(Reference<ReadYourWritesTransaction> tr, KeyType const& key, ValueType const& val) {
|
||||
Key k = space.pack(Codec<KeyType>::pack(key));
|
||||
Value v = ObjectWriter::toValue(val, versionOptions);
|
||||
tr->set(k, v);
|
||||
return k.expectedSize() + v.expectedSize();
|
||||
}
|
||||
|
||||
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& key) {
|
||||
return tr->clear(space.pack(Codec<KeyType>::pack(key)));
|
||||
}
|
||||
|
||||
void erase(Reference<ITransaction> tr, KeyType const& key) {
|
||||
return tr->clear(space.pack(Codec<KeyType>::pack(key)));
|
||||
}
|
||||
|
||||
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& begin, KeyType const& end) {
|
||||
return tr->clear(KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), space.pack(Codec<KeyType>::pack(end))));
|
||||
}
|
||||
|
||||
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(space.range()); }
|
||||
|
||||
Subspace space;
|
||||
VersionOptions versionOptions;
|
||||
};
|
||||
|
||||
template <typename _ValueType>
|
||||
class KeyBackedSet {
|
||||
public:
|
||||
|
@ -1169,7 +1169,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
||||
// DB has [A - B) and [C - D). They should show up in knownBlobRanges, and [B - C) should be in removed.
|
||||
// DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D) should
|
||||
// be in removed.
|
||||
TEST_CASE("/blobmanager/updateranges") {
|
||||
TEST_CASE(":/blobmanager/updateranges") {
|
||||
KeyRangeMap<bool> knownBlobRanges(false, normalKeys.end);
|
||||
Arena ar;
|
||||
|
||||
|
@ -10254,7 +10254,7 @@ ACTOR Future<Void> randomRangeScans(IKeyValueStore* kvs,
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("!/redwood/performance/randomRangeScans") {
|
||||
TEST_CASE(":/redwood/performance/randomRangeScans") {
|
||||
state int prefixLen = 30;
|
||||
state int suffixSize = 12;
|
||||
state int valueSize = 100;
|
||||
|
Loading…
x
Reference in New Issue
Block a user