1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-27 18:37:04 +08:00

Add an enum to IKeyValueStore to indicate the source/priority of the read

This commit is contained in:
Daniel Smith 2021-10-15 14:35:59 -04:00
parent 0e4a6cc921
commit df53cc9580
7 changed files with 99 additions and 51 deletions

@ -47,16 +47,30 @@ public:
virtual Future<Void> commit(
bool sequential = false) = 0; // returns when prior sets and clears are (atomically) durable
virtual Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) = 0;
enum class ReadType {
EAGER,
FETCH,
LOW,
NORMAL,
HIGH,
};
virtual Future<Optional<Value>> readValue(KeyRef key,
Optional<UID> debugID = Optional<UID>(),
ReadType type = ReadType::NORMAL) = 0;
// Like readValue(), but returns only the first maxLength bytes of the value if it is longer
virtual Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
Optional<UID> debugID = Optional<UID>()) = 0;
Optional<UID> debugID = Optional<UID>(),
ReadType type = ReadType::NORMAL) = 0;
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) = 0;
virtual Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit = 1 << 30,
int byteLimit = 1 << 30,
ReadType type = ReadType::NORMAL) = 0;
// To debug MEMORY_RADIXTREE type ONLY
// Returns (1) how many key & value pairs have been inserted (2) how many nodes have been created (3) how many

@ -56,7 +56,7 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
void clear(KeyRangeRef range, const Arena* arena = nullptr) override { store->clear(range, arena); }
Future<Void> commit(bool sequential = false) override { return store->commit(sequential); }
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) override {
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID, IKeyValueStore::ReadType) override {
return doReadValue(store, key, debugID);
}
@ -66,13 +66,14 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
// reason, you will need to fix this.
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
Optional<UID> debugID = Optional<UID>()) override {
Optional<UID> debugID,
IKeyValueStore::ReadType) override {
return doReadValuePrefix(store, key, maxLength, debugID);
}
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) override {
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
return doReadRange(store, keys, rowLimit, byteLimit);
}

@ -194,7 +194,7 @@ public:
return c;
}
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) override {
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID, IKeyValueStore::ReadType) override {
if (recovering.isError())
throw recovering.getError();
if (!recovering.isReady())
@ -208,7 +208,8 @@ public:
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
Optional<UID> debugID = Optional<UID>()) override {
Optional<UID> debugID,
IKeyValueStore::ReadType) override {
if (recovering.isError())
throw recovering.getError();
if (!recovering.isReady())
@ -227,7 +228,7 @@ public:
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) override {
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
if (recovering.isError())
throw recovering.getError();
if (!recovering.isReady())
@ -826,18 +827,18 @@ private:
ACTOR static Future<Optional<Value>> waitAndReadValue(KeyValueStoreMemory* self, Key key) {
wait(self->recovering);
return self->readValue(key).get();
return static_cast<IKeyValueStore*>(self)->readValue(key).get();
}
ACTOR static Future<Optional<Value>> waitAndReadValuePrefix(KeyValueStoreMemory* self, Key key, int maxLength) {
wait(self->recovering);
return self->readValuePrefix(key, maxLength).get();
return static_cast<IKeyValueStore*>(self)->readValuePrefix(key, maxLength).get();
}
ACTOR static Future<RangeResult> waitAndReadRange(KeyValueStoreMemory* self,
KeyRange keys,
int rowLimit,
int byteLimit) {
wait(self->recovering);
return self->readRange(keys, rowLimit, byteLimit).get();
return static_cast<IKeyValueStore*>(self)->readRange(keys, rowLimit, byteLimit).get();
}
ACTOR static Future<Void> waitAndCommit(KeyValueStoreMemory* self, bool sequential) {
wait(self->recovering);

@ -645,21 +645,24 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return res;
}
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID) override {
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID, IKeyValueStore::ReadType) override {
auto a = new Reader::ReadValueAction(key, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID) override {
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
Optional<UID> debugID,
IKeyValueStore::ReadType) override {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit) override {
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto res = a->result.getFuture();
readThreads->post(a);

@ -1577,9 +1577,12 @@ public:
void clear(KeyRangeRef range, const Arena* arena = nullptr) override;
Future<Void> commit(bool sequential = false) override;
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID) override;
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID) override;
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) override;
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID, IKeyValueStore::ReadType) override;
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
Optional<UID> debugID,
IKeyValueStore::ReadType) override;
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override;
KeyValueStoreSQLite(std::string const& filename,
UID logID,
@ -2192,21 +2195,27 @@ Future<Void> KeyValueStoreSQLite::commit(bool sequential) {
writeThread->post(p);
return f;
}
Future<Optional<Value>> KeyValueStoreSQLite::readValue(KeyRef key, Optional<UID> debugID) {
Future<Optional<Value>> KeyValueStoreSQLite::readValue(KeyRef key, Optional<UID> debugID, IKeyValueStore::ReadType) {
++readsRequested;
auto p = new Reader::ReadValueAction(key, debugID);
auto f = p->result.getFuture();
readThreads->post(p);
return f;
}
Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID) {
Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix(KeyRef key,
int maxLength,
Optional<UID> debugID,
IKeyValueStore::ReadType) {
++readsRequested;
auto p = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
auto f = p->result.getFuture();
readThreads->post(p);
return f;
}
Future<RangeResult> KeyValueStoreSQLite::readRange(KeyRangeRef keys, int rowLimit, int byteLimit) {
Future<RangeResult> KeyValueStoreSQLite::readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
IKeyValueStore::ReadType) {
++readsRequested;
auto p = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto f = p->result.getFuture();

@ -7078,7 +7078,7 @@ public:
m_tree->set(keyValue);
}
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) override {
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
debug_printf("READRANGE %s\n", printable(keys).c_str());
return catchError(readRange_impl(this, keys, rowLimit, byteLimit));
}
@ -7245,13 +7245,14 @@ public:
return Optional<Value>();
}
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) override {
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID, IKeyValueStore::ReadType) override {
return catchError(readValue_impl(this, key, debugID));
}
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
Optional<UID> debugID = Optional<UID>()) override {
Optional<UID> debugID,
IKeyValueStore::ReadType) override {
return catchError(map(readValue_impl(this, key, debugID), [maxLength](Optional<Value> v) {
if (v.present() && v.get().size() > maxLength) {
v.get().contents() = v.get().substr(0, maxLength);

@ -195,15 +195,25 @@ struct StorageServerDisk {
Future<Void> commit() { return storage->commit(); }
// SOMEDAY: Put readNextKeyInclusive in IKeyValueStore
Future<Key> readNextKeyInclusive(KeyRef key) { return readFirstKey(storage, KeyRangeRef(key, allKeys.end)); }
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) {
return storage->readValue(key, debugID);
Future<Key> readNextKeyInclusive(KeyRef key, IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
return readFirstKey(storage, KeyRangeRef(key, allKeys.end), type);
}
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID = Optional<UID>()) {
return storage->readValuePrefix(key, maxLength, debugID);
Future<Optional<Value>> readValue(KeyRef key,
Optional<UID> debugID = Optional<UID>(),
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
return storage->readValue(key, debugID, type);
}
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) {
return storage->readRange(keys, rowLimit, byteLimit);
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
Optional<UID> debugID = Optional<UID>(),
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
return storage->readValuePrefix(key, maxLength, debugID, type);
}
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit = 1 << 30,
int byteLimit = 1 << 30,
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
return storage->readRange(keys, rowLimit, byteLimit, type);
}
KeyValueStoreType getKeyValueStoreType() const { return storage->getType(); }
@ -216,8 +226,8 @@ private:
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range) {
RangeResult r = wait(storage->readRange(range, 1));
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range, IKeyValueStore::ReadType type) {
RangeResult r = wait(storage->readRange(range, 1, 1 << 30, type));
if (r.size())
return r[0].key;
else
@ -1282,7 +1292,7 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
path = 1;
} else if (!i || !i->isClearTo() || i->getEndKey() <= req.key) {
path = 2;
Optional<Value> vv = wait(data->storage.readValue(req.key, req.debugID));
Optional<Value> vv = wait(data->storage.readValue(req.key, req.debugID, IKeyValueStore::ReadType::NORMAL));
// Validate that while we were reading the data we didn't lose the version or shard
if (version < data->storageVersion()) {
TEST(true); // transaction_too_old after readValue
@ -1631,7 +1641,8 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
KeyRange range,
int limit,
int* pLimitBytes,
SpanID parentSpan) {
SpanID parentSpan,
IKeyValueStore::ReadType type) {
state GetKeyValuesReply result;
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
state StorageServer::VersionedData::iterator vCurrent = view.end();
@ -1695,7 +1706,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
// Read the data on disk up to vCurrent (or the end of the range)
readEnd = vCurrent ? std::min(vCurrent.key(), range.end) : range.end;
RangeResult atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes));
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, type));
ASSERT(atStorageVersion.size() <= limit);
if (data->storageVersion() > version)
@ -1776,7 +1787,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
readBegin = vCurrent ? std::max(vCurrent->isClearTo() ? vCurrent->getEndKey() : vCurrent.key(), range.begin)
: range.begin;
RangeResult atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes));
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, type));
ASSERT(atStorageVersion.size() <= -limit);
if (data->storageVersion() > version)
@ -1833,7 +1844,8 @@ ACTOR Future<Key> findKey(StorageServer* data,
Version version,
KeyRange range,
int* pOffset,
SpanID parentSpan)
SpanID parentSpan,
IKeyValueStore::ReadType type)
// Attempts to find the key indicated by sel in the data at version, within range.
// Precondition: selectorInRange(sel, range)
// If it is found, offset is set to 0 and a key is returned which falls inside range.
@ -1871,7 +1883,8 @@ ACTOR Future<Key> findKey(StorageServer* data,
forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())),
(distance + skipEqualKey) * sign,
&maxBytes,
span.context));
span.context,
type));
state bool more = rep.more && rep.data.size() != distance + skipEqualKey;
// If we get only one result in the reverse direction as a result of the data being too large, we could get stuck in
@ -1879,8 +1892,8 @@ ACTOR Future<Key> findKey(StorageServer* data,
if (more && !forward && rep.data.size() == 1) {
TEST(true); // Reverse key selector returned only one result in range read
maxBytes = std::numeric_limits<int>::max();
GetKeyValuesReply rep2 = wait(
readRange(data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes, span.context));
GetKeyValuesReply rep2 = wait(readRange(
data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes, span.context, type));
rep = rep2;
more = rep.more && rep.data.size() != distance + skipEqualKey;
ASSERT(rep.data.size() == 2 || !more);
@ -1945,6 +1958,8 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
{
state Span span("SS:getKeyValues"_loc, { req.spanContext });
state int64_t resultSize = 0;
state IKeyValueStore::ReadType type =
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
++data->counters.getRangeQueries;
@ -1989,10 +2004,10 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
state int offset2;
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual()
? Future<Key>(req.begin.getKey())
: findKey(data, req.begin, version, shard, &offset1, span.context);
: findKey(data, req.begin, version, shard, &offset1, span.context, type);
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual()
? Future<Key>(req.end.getKey())
: findKey(data, req.end, version, shard, &offset2, span.context);
: findKey(data, req.end, version, shard, &offset2, span.context, type);
state Key begin = wait(fBegin);
state Key end = wait(fEnd);
@ -2032,8 +2047,8 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
} else {
state int remainingLimitBytes = req.limitBytes;
GetKeyValuesReply _r =
wait(readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span.context));
GetKeyValuesReply _r = wait(
readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span.context, type));
GetKeyValuesReply r = _r;
if (req.debugID.present())
@ -2110,6 +2125,8 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
{
state Span span("SS:getKeyValuesStream"_loc, { req.spanContext });
state int64_t resultSize = 0;
state IKeyValueStore::ReadType type =
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);
++data->counters.getRangeStreamQueries;
@ -2155,10 +2172,10 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
state int offset2;
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual()
? Future<Key>(req.begin.getKey())
: findKey(data, req.begin, version, shard, &offset1, span.context);
: findKey(data, req.begin, version, shard, &offset1, span.context, type);
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual()
? Future<Key>(req.end.getKey())
: findKey(data, req.end, version, shard, &offset2, span.context);
: findKey(data, req.end, version, shard, &offset2, span.context, type);
state Key begin = wait(fBegin);
state Key end = wait(fEnd);
if (req.debugID.present())
@ -2207,7 +2224,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
? 1
: CLIENT_KNOBS->REPLY_BYTE_LIMIT;
GetKeyValuesReply _r =
wait(readRange(data, version, KeyRangeRef(begin, end), req.limit, &byteLimit, span.context));
wait(readRange(data, version, KeyRangeRef(begin, end), req.limit, &byteLimit, span.context, type));
GetKeyValuesStreamReply r(_r);
if (req.debugID.present())
@ -2308,7 +2325,8 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
state KeyRange shard = getShardKeyRange(data, req.sel);
state int offset;
Key k = wait(findKey(data, req.sel, version, shard, &offset, req.spanContext));
Key k =
wait(findKey(data, req.sel, version, shard, &offset, req.spanContext, IKeyValueStore::ReadType::NORMAL));
data->checkChangeCounter(
changeCounter, KeyRangeRef(std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k)));
@ -2406,7 +2424,7 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
if (SERVER_KNOBS->ENABLE_CLEAR_RANGE_EAGER_READS) {
std::vector<Future<Key>> keyEnd(eager->keyBegin.size());
for (int i = 0; i < keyEnd.size(); i++)
keyEnd[i] = data->storage.readNextKeyInclusive(eager->keyBegin[i]);
keyEnd[i] = data->storage.readNextKeyInclusive(eager->keyBegin[i], IKeyValueStore::ReadType::EAGER);
state Future<std::vector<Key>> futureKeyEnds = getAll(keyEnd);
state std::vector<Key> keyEndVal = wait(futureKeyEnds);
@ -2415,7 +2433,8 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
std::vector<Future<Optional<Value>>> value(eager->keys.size());
for (int i = 0; i < value.size(); i++)
value[i] = data->storage.readValuePrefix(eager->keys[i].first, eager->keys[i].second);
value[i] = data->storage.readValuePrefix(
eager->keys[i].first, eager->keys[i].second, Optional<UID>(), IKeyValueStore::ReadType::EAGER);
state Future<std::vector<Optional<Value>>> futureValues = getAll(value);
std::vector<Optional<Value>> optionalValues = wait(futureValues);