Merge pull request #6204 from neethuhaneesha/reuseIterators

Rocksdb read range iterator pool to reuse iterators.
This commit is contained in:
neethuhaneesha 2022-01-18 12:59:52 -08:00 committed by GitHub
commit 034b934ecd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 177 additions and 42 deletions

View File

@ -355,6 +355,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_FETCH_QUEUE_HARD_MAX, 100 );
init( ROCKSDB_FETCH_QUEUE_SOFT_MAX, 50 );
init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0;
init( ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME, 30.0 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME = 0.1;
init( ROCKSDB_READ_RANGE_REUSE_ITERATORS, true ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_ITERATORS = deterministicRandom()->coinflip() ? true : false;
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -289,6 +289,8 @@ public:
// These histograms are in read and write path which can cause performance overhead.
// Set to 0 to disable histograms.
double ROCKSDB_HISTOGRAMS_SAMPLE_RATE;
double ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME;
bool ROCKSDB_READ_RANGE_REUSE_ITERATORS;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -35,6 +35,7 @@ static_assert((ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR == 22) ? ROCKSDB_PATCH >= 1 :
"Unsupported rocksdb version. Update the rocksdb to 6.22.1 version");
namespace {
using DB = rocksdb::DB*;
const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = LiteralStringRef("RocksDBStorage");
const StringRef ROCKSDB_COMMIT_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBCommitLatency");
@ -130,10 +131,125 @@ rocksdb::ReadOptions getReadOptions() {
return options;
}
struct ReadIterator {
uint64_t index; // incrementing counter to uniquely identify read iterator.
bool inUse;
std::shared_ptr<rocksdb::Iterator> iter;
double creationTime;
ReadIterator(uint64_t index, DB& db, rocksdb::ReadOptions& options)
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options)) {}
};
/*
ReadIteratorPool: Collection of iterators. Reuses iterators on non-concurrent multiple read operations,
instead of creating and deleting for every read.
Read: IteratorPool provides an unused iterator if exists or creates and gives a new iterator.
Returns back the iterator after the read is done.
Write: Iterators in the pool are deleted, forcing new iterator creation on next reads. The iterators
which are currently used by the reads can continue using the iterator as it is a shared_ptr. Once
the read is processed, shared_ptr goes out of scope and gets deleted. Eventually the iterator object
gets deleted as the ref count becomes 0.
*/
class ReadIteratorPool {
public:
ReadIteratorPool(DB& db, const std::string& path)
: db(db), index(0), iteratorsReuseCount(0), readRangeOptions(getReadOptions()) {
readRangeOptions.background_purge_on_iterator_cleanup = true;
readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
TraceEvent("ReadIteratorPool")
.detail("Path", path)
.detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS)
.detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN);
}
// Called on every db commit.
void update() {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
iteratorsMap.clear();
}
}
// Called on every read operation.
ReadIterator getIterator() {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
if (!it->second.inUse) {
it->second.inUse = true;
iteratorsReuseCount++;
return it->second;
}
}
index++;
ReadIterator iter(index, db, readRangeOptions);
iteratorsMap.insert({ index, iter });
return iter;
} else {
index++;
ReadIterator iter(index, db, readRangeOptions);
return iter;
}
}
// Called on every read operation, after the keys are collected.
void returnIterator(ReadIterator& iter) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
it = iteratorsMap.find(iter.index);
// iterator found: put the iterator back to the pool(inUse=false).
// iterator not found: update would have removed the iterator from pool, so nothing to do.
if (it != iteratorsMap.end()) {
ASSERT(it->second.inUse);
it->second.inUse = false;
}
}
}
// Called for every ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME seconds in a loop.
void refreshIterators() {
std::lock_guard<std::mutex> lock(mutex);
it = iteratorsMap.begin();
while (it != iteratorsMap.end()) {
if (now() - it->second.creationTime > SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME) {
it = iteratorsMap.erase(it);
} else {
it++;
}
}
}
uint64_t numReadIteratorsCreated() { return index; }
uint64_t numTimesReadIteratorsReused() { return iteratorsReuseCount; }
private:
std::unordered_map<int, ReadIterator> iteratorsMap;
std::unordered_map<int, ReadIterator>::iterator it;
DB& db;
rocksdb::ReadOptions readRangeOptions;
std::mutex mutex;
// incrementing counter for every new iterator creation, to uniquely identify the iterator in returnIterator().
uint64_t index;
uint64_t iteratorsReuseCount;
};
ACTOR Future<Void> refreshReadIteratorPool(std::shared_ptr<ReadIteratorPool> readIterPool) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME));
readIterPool->refreshIterators();
}
}
return Void();
}
ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) {
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY));
TraceEvent e(SevInfo, "RocksDBFlowLock");
TraceEvent e("RocksDBFlowLock");
e.detail("ReadAvailable", readLock->available());
e.detail("ReadActivePermits", readLock->activePermits());
e.detail("ReadWaiters", readLock->waiters());
@ -143,7 +259,9 @@ ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetc
}
}
ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> statistics, rocksdb::DB* db) {
ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> statistics,
rocksdb::DB* db,
std::shared_ptr<ReadIteratorPool> readIterPool) {
state std::vector<std::tuple<const char*, uint32_t, uint64_t>> tickerStats = {
{ "StallMicros", rocksdb::STALL_MICROS, 0 },
{ "BytesRead", rocksdb::BYTES_READ, 0 },
@ -206,22 +324,37 @@ ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> stat
{ "BaseLevel", rocksdb::DB::Properties::kBaseLevel },
{ "EstPendCompactBytes", rocksdb::DB::Properties::kEstimatePendingCompactionBytes },
};
state std::unordered_map<std::string, uint64_t> readIteratorPoolStats = {
{ "NumReadIteratorsCreated", 0 },
{ "NumTimesReadIteratorsReused", 0 },
};
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY));
TraceEvent e("RocksDBMetrics");
uint64_t stat;
for (auto& t : tickerStats) {
auto& [name, ticker, cum] = t;
uint64_t val = statistics->getTickerCount(ticker);
e.detail(name, val - cum);
cum = val;
stat = statistics->getTickerCount(ticker);
e.detail(name, stat - cum);
cum = stat;
}
for (auto& p : propertyStats) {
auto& [name, property] = p;
uint64_t stat = 0;
stat = 0;
ASSERT(db->GetIntProperty(property, &stat));
e.detail(name, stat);
}
stat = readIterPool->numReadIteratorsCreated();
e.detail("NumReadIteratorsCreated", stat - readIteratorPoolStats["NumReadIteratorsCreated"]);
readIteratorPoolStats["NumReadIteratorsCreated"] = stat;
stat = readIterPool->numTimesReadIteratorsReused();
e.detail("NumTimesReadIteratorsReused", stat - readIteratorPoolStats["NumTimesReadIteratorsReused"]);
readIteratorPoolStats["NumTimesReadIteratorsReused"] = stat;
}
}
@ -245,7 +378,6 @@ Error statusToError(const rocksdb::Status& s) {
}
struct RocksDBKeyValueStore : IKeyValueStore {
using DB = rocksdb::DB*;
using CF = rocksdb::ColumnFamilyHandle*;
struct Writer : IThreadPoolReceiver {
@ -256,11 +388,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Reference<Histogram> commitQueueWaitHistogram;
Reference<Histogram> writeHistogram;
Reference<Histogram> deleteCompactRangeHistogram;
std::shared_ptr<ReadIteratorPool> readIterPool;
explicit Writer(DB& db, UID id)
: db(db), id(id), commitLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_COMMIT_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
explicit Writer(DB& db, UID id, std::shared_ptr<ReadIteratorPool> readIterPool)
: db(db), id(id), readIterPool(readIterPool),
commitLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_COMMIT_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
commitActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_COMMIT_ACTION_HISTOGRAM,
Histogram::Unit::microseconds)),
@ -306,16 +440,17 @@ struct RocksDBKeyValueStore : IKeyValueStore {
logRocksDBError(status, "Open");
a.done.sendError(statusToError(status));
} else {
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Open");
TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Open");
if (g_network->isSimulated()) {
// The current thread and main thread are same when the code runs in simulation.
// blockUntilReady() is getting the thread into deadlock state, so directly calling
// the metricsLogger.
a.metrics = rocksDBMetricLogger(options.statistics, db) && flowLockLogger(a.readLock, a.fetchLock);
a.metrics = rocksDBMetricLogger(options.statistics, db, readIterPool) &&
flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
} else {
onMainThread([&] {
a.metrics =
rocksDBMetricLogger(options.statistics, db) && flowLockLogger(a.readLock, a.fetchLock);
a.metrics = rocksDBMetricLogger(options.statistics, db, readIterPool) &&
flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
return Future<bool>(true);
}).blockUntilReady();
}
@ -369,6 +504,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double writeBeginTime = a.getHistograms ? timer_monotonic() : 0;
auto s = db->Write(options, a.batchToCommit.get());
readIterPool->update();
if (a.getHistograms) {
writeHistogram->sampleSeconds(timer_monotonic() - writeBeginTime);
}
@ -404,6 +540,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
void action(CloseAction& a) {
readIterPool.reset();
if (db == nullptr) {
a.done.send(Void());
return;
@ -419,10 +556,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (!s.ok()) {
logRocksDBError(s, "Destroy");
} else {
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Destroy");
TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Destroy");
}
}
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Close");
TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Close");
a.done.send(Void());
}
};
@ -444,11 +581,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Reference<Histogram> readRangeNewIteratorHistogram;
Reference<Histogram> readValueGetHistogram;
Reference<Histogram> readPrefixGetHistogram;
std::shared_ptr<ReadIteratorPool> readIterPool;
explicit Reader(DB& db)
: db(db), readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READRANGE_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
explicit Reader(DB& db, std::shared_ptr<ReadIteratorPool> readIterPool)
: db(db), readIterPool(readIterPool),
readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READRANGE_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
readValueLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READVALUE_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
@ -665,24 +804,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
int accumulatedBytes = 0;
rocksdb::Status s;
auto options = getReadOptions();
uint64_t deadlineMircos =
db->GetEnv()->NowMicros() + (readRangeTimeout - (readBeginTime - a.startTime)) * 1000000;
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
// When using a prefix extractor, ensure that keys are returned in order even if they cross
// a prefix boundary.
options.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
if (a.rowLimit >= 0) {
auto endSlice = toSlice(a.keys.end);
options.iterate_upper_bound = &endSlice;
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
ReadIterator readIter = readIterPool->getIterator();
if (a.getHistograms) {
readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime);
}
auto cursor = readIter.iter;
cursor->Seek(toSlice(a.keys.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
@ -703,16 +831,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
cursor->Next();
}
s = cursor->status();
readIterPool->returnIterator(readIter);
} else {
auto beginSlice = toSlice(a.keys.begin);
options.iterate_lower_bound = &beginSlice;
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
ReadIterator readIter = readIterPool->getIterator();
if (a.getHistograms) {
readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime);
}
auto cursor = readIter.iter;
cursor->SeekForPrev(toSlice(a.keys.end));
if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) {
cursor->Prev();
@ -736,6 +862,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
cursor->Prev();
}
s = cursor->status();
readIterPool->returnIterator(readIter);
}
if (!s.ok()) {
@ -771,6 +898,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
int numReadWaiters;
FlowLock fetchSemaphore;
int numFetchWaiters;
std::shared_ptr<ReadIteratorPool> readIterPool;
struct Counters {
CounterCollection cc;
@ -784,7 +912,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Counters counters;
explicit RocksDBKeyValueStore(const std::string& path, UID id)
: path(path), id(id), readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
: path(path), id(id), readIterPool(new ReadIteratorPool(db, path)),
readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX) {
@ -805,9 +934,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
writeThread = createGenericThreadPool();
readThreads = createGenericThreadPool();
}
writeThread->addThread(new Writer(db, id), "fdb-rocksdb-wr");
writeThread->addThread(new Writer(db, id, readIterPool), "fdb-rocksdb-wr");
TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(new Reader(db), "fdb-rocksdb-re");
readThreads->addThread(new Reader(db, readIterPool), "fdb-rocksdb-re");
}
}
@ -818,6 +948,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
self->metrics.reset();
wait(self->readThreads->stop());
self->readIterPool.reset();
auto a = new Writer::CloseAction(self->path, deleteOnClose);
auto f = a->done.getFuture();
self->writeThread->post(a);