diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index f276a7b577..2bd7deb162 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -355,6 +355,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_FETCH_QUEUE_HARD_MAX, 100 ); init( ROCKSDB_FETCH_QUEUE_SOFT_MAX, 50 ); init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0; + init( ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME, 30.0 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME = 0.1; + init( ROCKSDB_READ_RANGE_REUSE_ITERATORS, true ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_ITERATORS = deterministicRandom()->coinflip() ? true : false; // Leader election bool longLeaderElection = randomize && BUGGIFY; diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 3dafbbed52..efeb10cee3 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -289,6 +289,8 @@ public: // These histograms are in read and write path which can cause performance overhead. // Set to 0 to disable histograms. double ROCKSDB_HISTOGRAMS_SAMPLE_RATE; + double ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME; + bool ROCKSDB_READ_RANGE_REUSE_ITERATORS; // Leader election int MAX_NOTIFICATIONS; diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 43e414e730..441c4e1a7a 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -35,6 +35,7 @@ static_assert((ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR == 22) ? ROCKSDB_PATCH >= 1 : "Unsupported rocksdb version. Update the rocksdb to 6.22.1 version"); namespace { +using DB = rocksdb::DB*; const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = LiteralStringRef("RocksDBStorage"); const StringRef ROCKSDB_COMMIT_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBCommitLatency"); @@ -130,10 +131,125 @@ rocksdb::ReadOptions getReadOptions() { return options; } +struct ReadIterator { + uint64_t index; // incrementing counter to uniquely identify read iterator. + bool inUse; + std::shared_ptr iter; + double creationTime; + ReadIterator(uint64_t index, DB& db, rocksdb::ReadOptions& options) + : index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options)) {} +}; + +/* +ReadIteratorPool: Collection of iterators. Reuses iterators on non-concurrent multiple read operations, +instead of creating and deleting for every read. + +Read: IteratorPool provides an unused iterator if exists or creates and gives a new iterator. +Returns back the iterator after the read is done. + +Write: Iterators in the pool are deleted, forcing new iterator creation on next reads. The iterators +which are currently used by the reads can continue using the iterator as it is a shared_ptr. Once +the read is processed, shared_ptr goes out of scope and gets deleted. Eventually the iterator object +gets deleted as the ref count becomes 0. +*/ +class ReadIteratorPool { +public: + ReadIteratorPool(DB& db, const std::string& path) + : db(db), index(0), iteratorsReuseCount(0), readRangeOptions(getReadOptions()) { + readRangeOptions.background_purge_on_iterator_cleanup = true; + readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0); + TraceEvent("ReadIteratorPool") + .detail("Path", path) + .detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) + .detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN); + } + + // Called on every db commit. + void update() { + if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { + std::lock_guard lock(mutex); + iteratorsMap.clear(); + } + } + + // Called on every read operation. + ReadIterator getIterator() { + if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { + std::lock_guard lock(mutex); + for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) { + if (!it->second.inUse) { + it->second.inUse = true; + iteratorsReuseCount++; + return it->second; + } + } + index++; + ReadIterator iter(index, db, readRangeOptions); + iteratorsMap.insert({ index, iter }); + return iter; + } else { + index++; + ReadIterator iter(index, db, readRangeOptions); + return iter; + } + } + + // Called on every read operation, after the keys are collected. + void returnIterator(ReadIterator& iter) { + if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { + std::lock_guard lock(mutex); + it = iteratorsMap.find(iter.index); + // iterator found: put the iterator back to the pool(inUse=false). + // iterator not found: update would have removed the iterator from pool, so nothing to do. + if (it != iteratorsMap.end()) { + ASSERT(it->second.inUse); + it->second.inUse = false; + } + } + } + + // Called for every ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME seconds in a loop. + void refreshIterators() { + std::lock_guard lock(mutex); + it = iteratorsMap.begin(); + while (it != iteratorsMap.end()) { + if (now() - it->second.creationTime > SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME) { + it = iteratorsMap.erase(it); + } else { + it++; + } + } + } + + uint64_t numReadIteratorsCreated() { return index; } + + uint64_t numTimesReadIteratorsReused() { return iteratorsReuseCount; } + +private: + std::unordered_map iteratorsMap; + std::unordered_map::iterator it; + DB& db; + rocksdb::ReadOptions readRangeOptions; + std::mutex mutex; + // incrementing counter for every new iterator creation, to uniquely identify the iterator in returnIterator(). + uint64_t index; + uint64_t iteratorsReuseCount; +}; + +ACTOR Future refreshReadIteratorPool(std::shared_ptr readIterPool) { + if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { + loop { + wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME)); + readIterPool->refreshIterators(); + } + } + return Void(); +} + ACTOR Future flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) { loop { wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY)); - TraceEvent e(SevInfo, "RocksDBFlowLock"); + TraceEvent e("RocksDBFlowLock"); e.detail("ReadAvailable", readLock->available()); e.detail("ReadActivePermits", readLock->activePermits()); e.detail("ReadWaiters", readLock->waiters()); @@ -143,7 +259,9 @@ ACTOR Future flowLockLogger(const FlowLock* readLock, const FlowLock* fetc } } -ACTOR Future rocksDBMetricLogger(std::shared_ptr statistics, rocksdb::DB* db) { +ACTOR Future rocksDBMetricLogger(std::shared_ptr statistics, + rocksdb::DB* db, + std::shared_ptr readIterPool) { state std::vector> tickerStats = { { "StallMicros", rocksdb::STALL_MICROS, 0 }, { "BytesRead", rocksdb::BYTES_READ, 0 }, @@ -206,22 +324,37 @@ ACTOR Future rocksDBMetricLogger(std::shared_ptr stat { "BaseLevel", rocksdb::DB::Properties::kBaseLevel }, { "EstPendCompactBytes", rocksdb::DB::Properties::kEstimatePendingCompactionBytes }, }; + + state std::unordered_map readIteratorPoolStats = { + { "NumReadIteratorsCreated", 0 }, + { "NumTimesReadIteratorsReused", 0 }, + }; + loop { wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY)); TraceEvent e("RocksDBMetrics"); + uint64_t stat; for (auto& t : tickerStats) { auto& [name, ticker, cum] = t; - uint64_t val = statistics->getTickerCount(ticker); - e.detail(name, val - cum); - cum = val; + stat = statistics->getTickerCount(ticker); + e.detail(name, stat - cum); + cum = stat; } for (auto& p : propertyStats) { auto& [name, property] = p; - uint64_t stat = 0; + stat = 0; ASSERT(db->GetIntProperty(property, &stat)); e.detail(name, stat); } + + stat = readIterPool->numReadIteratorsCreated(); + e.detail("NumReadIteratorsCreated", stat - readIteratorPoolStats["NumReadIteratorsCreated"]); + readIteratorPoolStats["NumReadIteratorsCreated"] = stat; + + stat = readIterPool->numTimesReadIteratorsReused(); + e.detail("NumTimesReadIteratorsReused", stat - readIteratorPoolStats["NumTimesReadIteratorsReused"]); + readIteratorPoolStats["NumTimesReadIteratorsReused"] = stat; } } @@ -245,7 +378,6 @@ Error statusToError(const rocksdb::Status& s) { } struct RocksDBKeyValueStore : IKeyValueStore { - using DB = rocksdb::DB*; using CF = rocksdb::ColumnFamilyHandle*; struct Writer : IThreadPoolReceiver { @@ -256,11 +388,13 @@ struct RocksDBKeyValueStore : IKeyValueStore { Reference commitQueueWaitHistogram; Reference writeHistogram; Reference deleteCompactRangeHistogram; + std::shared_ptr readIterPool; - explicit Writer(DB& db, UID id) - : db(db), id(id), commitLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, - ROCKSDB_COMMIT_LATENCY_HISTOGRAM, - Histogram::Unit::microseconds)), + explicit Writer(DB& db, UID id, std::shared_ptr readIterPool) + : db(db), id(id), readIterPool(readIterPool), + commitLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, + ROCKSDB_COMMIT_LATENCY_HISTOGRAM, + Histogram::Unit::microseconds)), commitActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, ROCKSDB_COMMIT_ACTION_HISTOGRAM, Histogram::Unit::microseconds)), @@ -306,16 +440,17 @@ struct RocksDBKeyValueStore : IKeyValueStore { logRocksDBError(status, "Open"); a.done.sendError(statusToError(status)); } else { - TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Open"); + TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Open"); if (g_network->isSimulated()) { // The current thread and main thread are same when the code runs in simulation. // blockUntilReady() is getting the thread into deadlock state, so directly calling // the metricsLogger. - a.metrics = rocksDBMetricLogger(options.statistics, db) && flowLockLogger(a.readLock, a.fetchLock); + a.metrics = rocksDBMetricLogger(options.statistics, db, readIterPool) && + flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool); } else { onMainThread([&] { - a.metrics = - rocksDBMetricLogger(options.statistics, db) && flowLockLogger(a.readLock, a.fetchLock); + a.metrics = rocksDBMetricLogger(options.statistics, db, readIterPool) && + flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool); return Future(true); }).blockUntilReady(); } @@ -369,6 +504,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { double writeBeginTime = a.getHistograms ? timer_monotonic() : 0; auto s = db->Write(options, a.batchToCommit.get()); + readIterPool->update(); if (a.getHistograms) { writeHistogram->sampleSeconds(timer_monotonic() - writeBeginTime); } @@ -404,6 +540,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; void action(CloseAction& a) { + readIterPool.reset(); if (db == nullptr) { a.done.send(Void()); return; @@ -419,10 +556,10 @@ struct RocksDBKeyValueStore : IKeyValueStore { if (!s.ok()) { logRocksDBError(s, "Destroy"); } else { - TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Destroy"); + TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Destroy"); } } - TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Close"); + TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Close"); a.done.send(Void()); } }; @@ -444,11 +581,13 @@ struct RocksDBKeyValueStore : IKeyValueStore { Reference readRangeNewIteratorHistogram; Reference readValueGetHistogram; Reference readPrefixGetHistogram; + std::shared_ptr readIterPool; - explicit Reader(DB& db) - : db(db), readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, - ROCKSDB_READRANGE_LATENCY_HISTOGRAM, - Histogram::Unit::microseconds)), + explicit Reader(DB& db, std::shared_ptr readIterPool) + : db(db), readIterPool(readIterPool), + readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, + ROCKSDB_READRANGE_LATENCY_HISTOGRAM, + Histogram::Unit::microseconds)), readValueLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, ROCKSDB_READVALUE_LATENCY_HISTOGRAM, Histogram::Unit::microseconds)), @@ -665,24 +804,13 @@ struct RocksDBKeyValueStore : IKeyValueStore { } int accumulatedBytes = 0; rocksdb::Status s; - auto options = getReadOptions(); - uint64_t deadlineMircos = - db->GetEnv()->NowMicros() + (readRangeTimeout - (readBeginTime - a.startTime)) * 1000000; - std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000); - options.deadline = std::chrono::duration_cast(deadlineSeconds); - // When using a prefix extractor, ensure that keys are returned in order even if they cross - // a prefix boundary. - options.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0); if (a.rowLimit >= 0) { - auto endSlice = toSlice(a.keys.end); - options.iterate_upper_bound = &endSlice; - double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0; - auto cursor = std::unique_ptr(db->NewIterator(options)); + ReadIterator readIter = readIterPool->getIterator(); if (a.getHistograms) { readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime); } - + auto cursor = readIter.iter; cursor->Seek(toSlice(a.keys.begin)); while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) { KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); @@ -703,16 +831,14 @@ struct RocksDBKeyValueStore : IKeyValueStore { cursor->Next(); } s = cursor->status(); + readIterPool->returnIterator(readIter); } else { - auto beginSlice = toSlice(a.keys.begin); - options.iterate_lower_bound = &beginSlice; - double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0; - auto cursor = std::unique_ptr(db->NewIterator(options)); + ReadIterator readIter = readIterPool->getIterator(); if (a.getHistograms) { readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime); } - + auto cursor = readIter.iter; cursor->SeekForPrev(toSlice(a.keys.end)); if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) { cursor->Prev(); @@ -736,6 +862,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { cursor->Prev(); } s = cursor->status(); + readIterPool->returnIterator(readIter); } if (!s.ok()) { @@ -771,6 +898,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { int numReadWaiters; FlowLock fetchSemaphore; int numFetchWaiters; + std::shared_ptr readIterPool; struct Counters { CounterCollection cc; @@ -784,7 +912,8 @@ struct RocksDBKeyValueStore : IKeyValueStore { Counters counters; explicit RocksDBKeyValueStore(const std::string& path, UID id) - : path(path), id(id), readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), + : path(path), id(id), readIterPool(new ReadIteratorPool(db, path)), + readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX) { @@ -805,9 +934,10 @@ struct RocksDBKeyValueStore : IKeyValueStore { writeThread = createGenericThreadPool(); readThreads = createGenericThreadPool(); } - writeThread->addThread(new Writer(db, id), "fdb-rocksdb-wr"); + writeThread->addThread(new Writer(db, id, readIterPool), "fdb-rocksdb-wr"); + TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM); for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) { - readThreads->addThread(new Reader(db), "fdb-rocksdb-re"); + readThreads->addThread(new Reader(db, readIterPool), "fdb-rocksdb-re"); } } @@ -818,6 +948,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { self->metrics.reset(); wait(self->readThreads->stop()); + self->readIterPool.reset(); auto a = new Writer::CloseAction(self->path, deleteOnClose); auto f = a->done.getFuture(); self->writeThread->post(a);