Rocksdb read range iterator pool to reuse iterators.

This commit is contained in:
Neethu Haneesha Bingi 2022-01-06 07:34:42 -08:00
parent db436fb494
commit ef4038fe8d
3 changed files with 177 additions and 42 deletions

View File

@ -355,6 +355,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_FETCH_QUEUE_HARD_MAX, 100 ); init( ROCKSDB_FETCH_QUEUE_HARD_MAX, 100 );
init( ROCKSDB_FETCH_QUEUE_SOFT_MAX, 50 ); init( ROCKSDB_FETCH_QUEUE_SOFT_MAX, 50 );
init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0; init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0;
init( ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME, 30.0 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME = 0.1;
init( ROCKSDB_READ_RANGE_REUSE_ITERATORS, true ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_ITERATORS = deterministicRandom()->coinflip() ? true : false;
// Leader election // Leader election
bool longLeaderElection = randomize && BUGGIFY; bool longLeaderElection = randomize && BUGGIFY;

View File

@ -289,6 +289,8 @@ public:
// These histograms are in read and write path which can cause performance overhead. // These histograms are in read and write path which can cause performance overhead.
// Set to 0 to disable histograms. // Set to 0 to disable histograms.
double ROCKSDB_HISTOGRAMS_SAMPLE_RATE; double ROCKSDB_HISTOGRAMS_SAMPLE_RATE;
double ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME;
bool ROCKSDB_READ_RANGE_REUSE_ITERATORS;
// Leader election // Leader election
int MAX_NOTIFICATIONS; int MAX_NOTIFICATIONS;

View File

@ -35,6 +35,7 @@ static_assert((ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR == 22) ? ROCKSDB_PATCH >= 1 :
"Unsupported rocksdb version. Update the rocksdb to 6.22.1 version"); "Unsupported rocksdb version. Update the rocksdb to 6.22.1 version");
namespace { namespace {
using DB = rocksdb::DB*;
const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = LiteralStringRef("RocksDBStorage"); const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = LiteralStringRef("RocksDBStorage");
const StringRef ROCKSDB_COMMIT_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBCommitLatency"); const StringRef ROCKSDB_COMMIT_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBCommitLatency");
@ -130,10 +131,125 @@ rocksdb::ReadOptions getReadOptions() {
return options; return options;
} }
struct ReadIterator {
uint64_t index; // incrementing counter to uniquely identify read iterator.
bool inUse;
std::shared_ptr<rocksdb::Iterator> iter;
double creationTime;
ReadIterator(uint64_t index, DB& db, rocksdb::ReadOptions& options)
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options)) {}
};
/*
ReadIteratorPool: Collection of iterators. Reuses iterators on non-concurrent multiple read operations,
instead of creating and deleting for every read.
Read: IteratorPool provides an unused iterator if exists or creates and gives a new iterator.
Returns back the iterator after the read is done.
Write: Iterators in the pool are deleted, forcing new iterator creation on next reads. The iterators
which are currently used by the reads can continue using the iterator as it is a shared_ptr. Once
the read is processed, shared_ptr goes out of scope and gets deleted. Eventually the iterator object
gets deleted as the ref count becomes 0.
*/
class ReadIteratorPool {
public:
ReadIteratorPool(DB& db, const std::string& path)
: db(db), index(0), iteratorsReuseCount(0), readRangeOptions(getReadOptions()) {
readRangeOptions.background_purge_on_iterator_cleanup = true;
readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
TraceEvent("ReadIteratorPool")
.detail("Path", path)
.detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS)
.detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN);
}
// Called on every db commit.
void update() {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
iteratorsMap.clear();
}
}
// Called on every read operation.
ReadIterator getIterator() {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
if (!it->second.inUse) {
it->second.inUse = true;
iteratorsReuseCount++;
return it->second;
}
}
index++;
ReadIterator iter(index, db, readRangeOptions);
iteratorsMap.insert({ index, iter });
return iter;
} else {
index++;
ReadIterator iter(index, db, readRangeOptions);
return iter;
}
}
// Called on every read operation, after the keys are collected.
void returnIterator(ReadIterator& iter) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
it = iteratorsMap.find(iter.index);
// iterator found: put the iterator back to the pool(inUse=false).
// iterator not found: update would have removed the iterator from pool, so nothing to do.
if (it != iteratorsMap.end()) {
ASSERT(it->second.inUse);
it->second.inUse = false;
}
}
}
// Called for every ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME seconds in a loop.
void refreshIterators() {
std::lock_guard<std::mutex> lock(mutex);
it = iteratorsMap.begin();
while (it != iteratorsMap.end()) {
if (now() - it->second.creationTime > SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME) {
it = iteratorsMap.erase(it);
} else {
it++;
}
}
}
uint64_t numReadIteratorsCreated() { return index; }
uint64_t numTimesReadIteratorsReused() { return iteratorsReuseCount; }
private:
std::unordered_map<int, ReadIterator> iteratorsMap;
std::unordered_map<int, ReadIterator>::iterator it;
DB& db;
rocksdb::ReadOptions readRangeOptions;
std::mutex mutex;
// incrementing counter for every new iterator creation, to uniquely identify the iterator in returnIterator().
uint64_t index;
uint64_t iteratorsReuseCount;
};
ACTOR Future<Void> refreshReadIteratorPool(std::shared_ptr<ReadIteratorPool> readIterPool) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME));
readIterPool->refreshIterators();
}
}
return Void();
}
ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) { ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) {
loop { loop {
wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY)); wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY));
TraceEvent e(SevInfo, "RocksDBFlowLock"); TraceEvent e("RocksDBFlowLock");
e.detail("ReadAvailable", readLock->available()); e.detail("ReadAvailable", readLock->available());
e.detail("ReadActivePermits", readLock->activePermits()); e.detail("ReadActivePermits", readLock->activePermits());
e.detail("ReadWaiters", readLock->waiters()); e.detail("ReadWaiters", readLock->waiters());
@ -143,7 +259,9 @@ ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetc
} }
} }
ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> statistics, rocksdb::DB* db) { ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> statistics,
rocksdb::DB* db,
std::shared_ptr<ReadIteratorPool> readIterPool) {
state std::vector<std::tuple<const char*, uint32_t, uint64_t>> tickerStats = { state std::vector<std::tuple<const char*, uint32_t, uint64_t>> tickerStats = {
{ "StallMicros", rocksdb::STALL_MICROS, 0 }, { "StallMicros", rocksdb::STALL_MICROS, 0 },
{ "BytesRead", rocksdb::BYTES_READ, 0 }, { "BytesRead", rocksdb::BYTES_READ, 0 },
@ -206,22 +324,37 @@ ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> stat
{ "BaseLevel", rocksdb::DB::Properties::kBaseLevel }, { "BaseLevel", rocksdb::DB::Properties::kBaseLevel },
{ "EstPendCompactBytes", rocksdb::DB::Properties::kEstimatePendingCompactionBytes }, { "EstPendCompactBytes", rocksdb::DB::Properties::kEstimatePendingCompactionBytes },
}; };
state std::unordered_map<std::string, uint64_t> readIteratorPoolStats = {
{ "NumReadIteratorsCreated", 0 },
{ "NumTimesReadIteratorsReused", 0 },
};
loop { loop {
wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY)); wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY));
TraceEvent e("RocksDBMetrics"); TraceEvent e("RocksDBMetrics");
uint64_t stat;
for (auto& t : tickerStats) { for (auto& t : tickerStats) {
auto& [name, ticker, cum] = t; auto& [name, ticker, cum] = t;
uint64_t val = statistics->getTickerCount(ticker); stat = statistics->getTickerCount(ticker);
e.detail(name, val - cum); e.detail(name, stat - cum);
cum = val; cum = stat;
} }
for (auto& p : propertyStats) { for (auto& p : propertyStats) {
auto& [name, property] = p; auto& [name, property] = p;
uint64_t stat = 0; stat = 0;
ASSERT(db->GetIntProperty(property, &stat)); ASSERT(db->GetIntProperty(property, &stat));
e.detail(name, stat); e.detail(name, stat);
} }
stat = readIterPool->numReadIteratorsCreated();
e.detail("NumReadIteratorsCreated", stat - readIteratorPoolStats["NumReadIteratorsCreated"]);
readIteratorPoolStats["NumReadIteratorsCreated"] = stat;
stat = readIterPool->numTimesReadIteratorsReused();
e.detail("NumTimesReadIteratorsReused", stat - readIteratorPoolStats["NumTimesReadIteratorsReused"]);
readIteratorPoolStats["NumTimesReadIteratorsReused"] = stat;
} }
} }
@ -245,7 +378,6 @@ Error statusToError(const rocksdb::Status& s) {
} }
struct RocksDBKeyValueStore : IKeyValueStore { struct RocksDBKeyValueStore : IKeyValueStore {
using DB = rocksdb::DB*;
using CF = rocksdb::ColumnFamilyHandle*; using CF = rocksdb::ColumnFamilyHandle*;
struct Writer : IThreadPoolReceiver { struct Writer : IThreadPoolReceiver {
@ -256,11 +388,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Reference<Histogram> commitQueueWaitHistogram; Reference<Histogram> commitQueueWaitHistogram;
Reference<Histogram> writeHistogram; Reference<Histogram> writeHistogram;
Reference<Histogram> deleteCompactRangeHistogram; Reference<Histogram> deleteCompactRangeHistogram;
std::shared_ptr<ReadIteratorPool> readIterPool;
explicit Writer(DB& db, UID id) explicit Writer(DB& db, UID id, std::shared_ptr<ReadIteratorPool> readIterPool)
: db(db), id(id), commitLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, : db(db), id(id), readIterPool(readIterPool),
ROCKSDB_COMMIT_LATENCY_HISTOGRAM, commitLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
Histogram::Unit::microseconds)), ROCKSDB_COMMIT_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
commitActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, commitActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_COMMIT_ACTION_HISTOGRAM, ROCKSDB_COMMIT_ACTION_HISTOGRAM,
Histogram::Unit::microseconds)), Histogram::Unit::microseconds)),
@ -306,16 +440,17 @@ struct RocksDBKeyValueStore : IKeyValueStore {
logRocksDBError(status, "Open"); logRocksDBError(status, "Open");
a.done.sendError(statusToError(status)); a.done.sendError(statusToError(status));
} else { } else {
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Open"); TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Open");
if (g_network->isSimulated()) { if (g_network->isSimulated()) {
// The current thread and main thread are same when the code runs in simulation. // The current thread and main thread are same when the code runs in simulation.
// blockUntilReady() is getting the thread into deadlock state, so directly calling // blockUntilReady() is getting the thread into deadlock state, so directly calling
// the metricsLogger. // the metricsLogger.
a.metrics = rocksDBMetricLogger(options.statistics, db) && flowLockLogger(a.readLock, a.fetchLock); a.metrics = rocksDBMetricLogger(options.statistics, db, readIterPool) &&
flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
} else { } else {
onMainThread([&] { onMainThread([&] {
a.metrics = a.metrics = rocksDBMetricLogger(options.statistics, db, readIterPool) &&
rocksDBMetricLogger(options.statistics, db) && flowLockLogger(a.readLock, a.fetchLock); flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
return Future<bool>(true); return Future<bool>(true);
}).blockUntilReady(); }).blockUntilReady();
} }
@ -369,6 +504,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double writeBeginTime = a.getHistograms ? timer_monotonic() : 0; double writeBeginTime = a.getHistograms ? timer_monotonic() : 0;
auto s = db->Write(options, a.batchToCommit.get()); auto s = db->Write(options, a.batchToCommit.get());
readIterPool->update();
if (a.getHistograms) { if (a.getHistograms) {
writeHistogram->sampleSeconds(timer_monotonic() - writeBeginTime); writeHistogram->sampleSeconds(timer_monotonic() - writeBeginTime);
} }
@ -404,6 +540,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
}; };
void action(CloseAction& a) { void action(CloseAction& a) {
readIterPool.reset();
if (db == nullptr) { if (db == nullptr) {
a.done.send(Void()); a.done.send(Void());
return; return;
@ -419,10 +556,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (!s.ok()) { if (!s.ok()) {
logRocksDBError(s, "Destroy"); logRocksDBError(s, "Destroy");
} else { } else {
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Destroy"); TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Destroy");
} }
} }
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Close"); TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Close");
a.done.send(Void()); a.done.send(Void());
} }
}; };
@ -444,11 +581,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Reference<Histogram> readRangeNewIteratorHistogram; Reference<Histogram> readRangeNewIteratorHistogram;
Reference<Histogram> readValueGetHistogram; Reference<Histogram> readValueGetHistogram;
Reference<Histogram> readPrefixGetHistogram; Reference<Histogram> readPrefixGetHistogram;
std::shared_ptr<ReadIteratorPool> readIterPool;
explicit Reader(DB& db) explicit Reader(DB& db, std::shared_ptr<ReadIteratorPool> readIterPool)
: db(db), readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, : db(db), readIterPool(readIterPool),
ROCKSDB_READRANGE_LATENCY_HISTOGRAM, readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
Histogram::Unit::microseconds)), ROCKSDB_READRANGE_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
readValueLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, readValueLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READVALUE_LATENCY_HISTOGRAM, ROCKSDB_READVALUE_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)), Histogram::Unit::microseconds)),
@ -665,24 +804,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
} }
int accumulatedBytes = 0; int accumulatedBytes = 0;
rocksdb::Status s; rocksdb::Status s;
auto options = getReadOptions();
uint64_t deadlineMircos =
db->GetEnv()->NowMicros() + (readRangeTimeout - (readBeginTime - a.startTime)) * 1000000;
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
// When using a prefix extractor, ensure that keys are returned in order even if they cross
// a prefix boundary.
options.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
if (a.rowLimit >= 0) { if (a.rowLimit >= 0) {
auto endSlice = toSlice(a.keys.end);
options.iterate_upper_bound = &endSlice;
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0; double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options)); ReadIterator readIter = readIterPool->getIterator();
if (a.getHistograms) { if (a.getHistograms) {
readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime); readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime);
} }
auto cursor = readIter.iter;
cursor->Seek(toSlice(a.keys.begin)); cursor->Seek(toSlice(a.keys.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) { while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
@ -703,16 +831,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
cursor->Next(); cursor->Next();
} }
s = cursor->status(); s = cursor->status();
readIterPool->returnIterator(readIter);
} else { } else {
auto beginSlice = toSlice(a.keys.begin);
options.iterate_lower_bound = &beginSlice;
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0; double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options)); ReadIterator readIter = readIterPool->getIterator();
if (a.getHistograms) { if (a.getHistograms) {
readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime); readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime);
} }
auto cursor = readIter.iter;
cursor->SeekForPrev(toSlice(a.keys.end)); cursor->SeekForPrev(toSlice(a.keys.end));
if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) { if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) {
cursor->Prev(); cursor->Prev();
@ -736,6 +862,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
cursor->Prev(); cursor->Prev();
} }
s = cursor->status(); s = cursor->status();
readIterPool->returnIterator(readIter);
} }
if (!s.ok()) { if (!s.ok()) {
@ -771,6 +898,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
int numReadWaiters; int numReadWaiters;
FlowLock fetchSemaphore; FlowLock fetchSemaphore;
int numFetchWaiters; int numFetchWaiters;
std::shared_ptr<ReadIteratorPool> readIterPool;
struct Counters { struct Counters {
CounterCollection cc; CounterCollection cc;
@ -784,7 +912,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Counters counters; Counters counters;
explicit RocksDBKeyValueStore(const std::string& path, UID id) explicit RocksDBKeyValueStore(const std::string& path, UID id)
: path(path), id(id), readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), : path(path), id(id), readIterPool(new ReadIteratorPool(db, path)),
readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX) { numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX) {
@ -805,9 +934,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
writeThread = createGenericThreadPool(); writeThread = createGenericThreadPool();
readThreads = createGenericThreadPool(); readThreads = createGenericThreadPool();
} }
writeThread->addThread(new Writer(db, id), "fdb-rocksdb-wr"); writeThread->addThread(new Writer(db, id, readIterPool), "fdb-rocksdb-wr");
TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) { for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(new Reader(db), "fdb-rocksdb-re"); readThreads->addThread(new Reader(db, readIterPool), "fdb-rocksdb-re");
} }
} }
@ -818,6 +948,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
self->metrics.reset(); self->metrics.reset();
wait(self->readThreads->stop()); wait(self->readThreads->stop());
self->readIterPool.reset();
auto a = new Writer::CloseAction(self->path, deleteOnClose); auto a = new Writer::CloseAction(self->path, deleteOnClose);
auto f = a->done.getFuture(); auto f = a->done.getFuture();
self->writeThread->post(a); self->writeThread->post(a);