From 61d621a4c4ba41f0ae8d0eb0e0cbe249fc8c338d Mon Sep 17 00:00:00 2001 From: He Liu <86634338+liquid-helium@users.noreply.github.com> Date: Mon, 27 Jun 2022 14:51:26 -0700 Subject: [PATCH] Fix read iterator pool race (#7455) --- .../KeyValueStoreShardedRocksDB.actor.cpp | 90 ++++++++++++------- tests/CMakeLists.txt | 2 +- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index a5c4978f31..611f87d11f 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -100,6 +100,7 @@ std::string getErrorReason(BackgroundErrorReason reason) { return format("%d Unknown", reason); } } + // Background error handling is tested with Chaos test. // TODO: Test background error in simulation. RocksDB doesn't use flow IO in simulation, which limits our ability to // inject IO errors. We could implement rocksdb::FileSystem using flow IO to unblock simulation. Also, trace event is @@ -144,6 +145,11 @@ private: std::mutex mutex; }; +// Encapsulation of shared states. +struct ShardedRocksDBState { + bool closing = false; +}; + std::shared_ptr rocksdb_block_cache = nullptr; rocksdb::Slice toSlice(StringRef s) { @@ -328,7 +334,7 @@ public: ASSERT(cf); readRangeOptions.background_purge_on_iterator_cleanup = true; readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0); - TraceEvent("ReadIteratorPool") + TraceEvent(SevDebug, "ReadIteratorPool") .detail("Path", path) .detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) .detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN); @@ -407,25 +413,6 @@ private: uint64_t iteratorsReuseCount; }; -ACTOR Future refreshReadIteratorPool( - std::unordered_map>* physicalShards) { - state Reference histogram = Histogram::getHistogram( - ROCKSDBSTORAGE_HISTOGRAM_GROUP, "TimeSpentRefreshIterators"_sr, Histogram::Unit::microseconds); - - if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { - loop { - wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME)); - - double startTime = timer_monotonic(); - for (auto& [_, shard] : *physicalShards) { - shard->readIterPool->refreshIterators(); - } - histogram->sample(timer_monotonic() - startTime); - } - } - return Void(); -} - ACTOR Future flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) { loop { wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY)); @@ -450,7 +437,7 @@ struct DataShard { // PhysicalShard represent a collection of logical shards. A PhysicalShard could have one or more DataShards. A // PhysicalShard is stored as a column family in rocksdb. Each PhysicalShard has its own iterator pool. struct PhysicalShard { - PhysicalShard(rocksdb::DB* db, std::string id) : db(db), id(id) {} + PhysicalShard(rocksdb::DB* db, std::string id) : db(db), id(id), isInitialized(false) {} rocksdb::Status init() { if (cf) { @@ -462,10 +449,16 @@ struct PhysicalShard { return status; } readIterPool = std::make_shared(db, cf, id); + this->isInitialized.store(true); return status; } - bool initialized() { return cf != nullptr; } + bool initialized() { return this->isInitialized.load(); } + + void refreshReadIteratorPool() { + ASSERT(this->readIterPool != nullptr); + this->readIterPool->refreshIterators(); + } ~PhysicalShard() { if (!deletePending) @@ -487,6 +480,7 @@ struct PhysicalShard { std::unordered_map> dataShards; std::shared_ptr readIterPool; bool deletePending = false; + std::atomic isInitialized; }; // Manages physical shards and maintains logical shard mapping. @@ -1310,6 +1304,39 @@ ACTOR Future rocksDBAggregatedMetricsLogger(std::shared_ptr refreshReadIteratorPools( + std::shared_ptr rState, + Future readyToStart, + std::unordered_map>* physicalShards) { + state Reference histogram = Histogram::getHistogram( + ROCKSDBSTORAGE_HISTOGRAM_GROUP, "TimeSpentRefreshIterators"_sr, Histogram::Unit::microseconds); + + if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { + try { + wait(readyToStart); + loop { + wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME)); + if (rState->closing) { + break; + } + double startTime = timer_monotonic(); + for (auto& [_, shard] : *physicalShards) { + if (shard->initialized()) { + shard->refreshReadIteratorPool(); + } + } + histogram->sample(timer_monotonic() - startTime); + } + } catch (Error& e) { + if (e.code() != error_code_actor_cancelled) { + TraceEvent(SevError, "RefreshReadIteratorPoolError").errorUnsuppressed(e); + } + } + } + + return Void(); + } + struct Writer : IThreadPoolReceiver { int threadIndex; std::unordered_map* columnFamilyMap; @@ -1360,15 +1387,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { return; } - if (g_network->isSimulated()) { - a.metrics = refreshReadIteratorPool(a.shardManager->getAllShards()); - } else { - onMainThread([&] { - a.metrics = refreshReadIteratorPool(a.shardManager->getAllShards()); - return Future(true); - }).blockUntilReady(); - } - TraceEvent(SevInfo, "RocksDB").detail("Method", "Open"); a.done.send(Void()); } @@ -1865,7 +1883,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { // Persist shard mappinng key range should not be in shardMap. explicit ShardedRocksDBKeyValueStore(const std::string& path, UID id) - : path(path), id(id), readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), + : rState(std::make_shared()), path(path), id(id), + readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), @@ -1898,8 +1917,10 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { Future getError() const override { return errorFuture; } ACTOR static void doClose(ShardedRocksDBKeyValueStore* self, bool deleteOnClose) { + self->rState->closing = true; // The metrics future retains a reference to the DB, so stop it before we delete it. self->metrics.reset(); + self->refreshHolder.cancel(); wait(self->readThreads->stop()); auto a = new Writer::CloseAction(&self->shardManager, deleteOnClose); @@ -1930,6 +1951,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { auto a = std::make_unique( &shardManager, metrics, &readSemaphore, &fetchSemaphore, errorListener); openFuture = a->done.getFuture(); + this->refreshHolder = refreshReadIteratorPools(this->rState, openFuture, shardManager.getAllShards()); writeThread->post(a.release()); return openFuture; } @@ -2092,6 +2114,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { // Used for debugging shard mapping issue. std::vector> getDataMapping() { return shardManager.getDataMapping(); } + std::shared_ptr rState; + ShardManager shardManager; std::shared_ptr rocksDBMetrics; std::string path; const std::string dataPath; @@ -2101,7 +2125,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { std::shared_ptr errorListener; Future errorFuture; Promise closePromise; - ShardManager shardManager; Future openFuture; Optional> metrics; FlowLock readSemaphore; @@ -2109,6 +2132,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { FlowLock fetchSemaphore; int numFetchWaiters; Counters counters; + Future refreshHolder; }; } // namespace diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e572582650..dcdb46ffb3 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -190,7 +190,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml UNIT) if (WITH_ROCKSDB_EXPERIMENTAL) add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml) - add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml IGNORE) # TODO: re-enable once storage engine is stable. + add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT) add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml) else() add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml IGNORE)