diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 84a6e53a78..fabeaa6754 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -301,7 +301,8 @@ rocksdb::Options getOptions() { options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM); } - // TODO: enable rocksdb metrics. + options.statistics = rocksdb::CreateDBStatistics(); + options.statistics->set_stats_level(rocksdb::kExceptHistogramOrTimers); options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY; return options; } @@ -600,11 +601,10 @@ public: return Void(); } - rocksdb::Status init() { + rocksdb::Status init(rocksdb::Options options) { // Open instance. TraceEvent(SevInfo, "ShardedRocksShardManagerInitBegin", this->logId).detail("DataPath", path); std::vector columnFamilies; - rocksdb::Options options = getOptions(); rocksdb::Status status = rocksdb::DB::ListColumnFamilies(options, path, &columnFamilies); rocksdb::ColumnFamilyOptions cfOptions = getCFOptions(); @@ -1041,9 +1041,7 @@ private: class RocksDBMetrics { public: - RocksDBMetrics(); - // Statistics - std::shared_ptr getStatsObjForRocksDB(); + RocksDBMetrics(UID debugID, std::shared_ptr stats); void logStats(rocksdb::DB* db); // PerfContext void resetPerfContext(); @@ -1069,9 +1067,10 @@ public: Reference getWriteHistogram(); Reference getDeleteCompactRangeHistogram(); // Stat for Memory Usage - void logMemUsagePerShard(std::string shardName, rocksdb::DB* db); + void logMemUsage(rocksdb::DB* db); private: + const UID debugID; // Global Statistic Input to RocksDB DB instance std::shared_ptr stats; // Statistic Output from RocksDB @@ -1158,9 +1157,8 @@ Reference RocksDBMetrics::getDeleteCompactRangeHistogram() { return deleteCompactRangeHistogram; } -RocksDBMetrics::RocksDBMetrics() { - stats = rocksdb::CreateDBStatistics(); - stats->set_stats_level(rocksdb::kExceptHistogramOrTimers); +RocksDBMetrics::RocksDBMetrics(UID debugID, std::shared_ptr stats) + : debugID(debugID), stats(stats) { tickerStats = { { "StallMicros", rocksdb::STALL_MICROS, 0 }, { "BytesRead", rocksdb::BYTES_READ, 0 }, @@ -1343,16 +1341,8 @@ RocksDBMetrics::RocksDBMetrics() { ROCKSDBSTORAGE_HISTOGRAM_GROUP, ROCKSDB_DELETE_COMPACTRANGE_HISTOGRAM, Histogram::Unit::microseconds); } -std::shared_ptr RocksDBMetrics::getStatsObjForRocksDB() { - // Zhe: reserved for statistic of RocksDBMetrics per shard - // ASSERT(shard != nullptr && shard->stats != nullptr); - // return shard->stats; - ASSERT(stats != nullptr); - return stats; -} - void RocksDBMetrics::logStats(rocksdb::DB* db) { - TraceEvent e("ShardedRocksDBMetrics"); + TraceEvent e(SevInfo, "ShardedRocksDBMetrics", debugID); uint64_t stat; for (auto& [name, ticker, cumulation] : tickerStats) { stat = stats->getTickerCount(ticker); @@ -1364,18 +1354,10 @@ void RocksDBMetrics::logStats(rocksdb::DB* db) { ASSERT(db->GetIntProperty(property, &stat)); e.detail(name, stat); } - /* - stat = readIterPool->numReadIteratorsCreated(); - e.detail("NumReadIteratorsCreated", stat - readIteratorPoolStats["NumReadIteratorsCreated"]); - readIteratorPoolStats["NumReadIteratorsCreated"] = stat; - stat = readIterPool->numTimesReadIteratorsReused(); - e.detail("NumTimesReadIteratorsReused", stat - readIteratorPoolStats["NumTimesReadIteratorsReused"]); - readIteratorPoolStats["NumTimesReadIteratorsReused"] = stat; - */ } -void RocksDBMetrics::logMemUsagePerShard(std::string shardName, rocksdb::DB* db) { - TraceEvent e("ShardedRocksDBShardMemMetrics"); +void RocksDBMetrics::logMemUsage(rocksdb::DB* db) { + TraceEvent e(SevInfo, "ShardedRocksDBMemMetrics", debugID); uint64_t stat; ASSERT(db != nullptr); ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kBlockCacheUsage, &stat)); @@ -1386,7 +1368,6 @@ void RocksDBMetrics::logMemUsagePerShard(std::string shardName, rocksdb::DB* db) e.detail("AllMemtablesBytes", stat); ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kBlockCachePinnedUsage, &stat)); e.detail("BlockCachePinnedUsage", stat); - e.detail("Name", shardName); } void RocksDBMetrics::resetPerfContext() { @@ -1401,7 +1382,7 @@ void RocksDBMetrics::setPerfContext(int index) { } void RocksDBMetrics::logPerfContext(bool ignoreZeroMetric) { - TraceEvent e("ShardedRocksDBPerfContextMetrics"); + TraceEvent e(SevInfo, "ShardedRocksDBPerfContextMetrics", debugID); e.setMaxEventLength(20000); for (auto& [name, metric, vals] : perfContextMetrics) { uint64_t s = 0; @@ -1563,17 +1544,30 @@ uint64_t RocksDBMetrics::getRocksdbPerfcontextMetric(int metric) { return 0; } -ACTOR Future rocksDBAggregatedMetricsLogger(std::shared_ptr rocksDBMetrics, rocksdb::DB* db) { - loop { - wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY)); - /* - if (SERVER_KNOBS->ROCKSDB_ENABLE_STATISTIC) { - rocksDBMetrics->logStats(db); +ACTOR Future rocksDBAggregatedMetricsLogger(std::shared_ptr rState, + Future openFuture, + std::shared_ptr rocksDBMetrics, + ShardManager* shardManager) { + try { + wait(openFuture); + state rocksdb::DB* db = shardManager->getDb(); + loop { + wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY)); + if (rState->closing) { + break; + } + rocksDBMetrics->logStats(db); + rocksDBMetrics->logMemUsage(db); + if (SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE != 0) { + rocksDBMetrics->logPerfContext(true); + } + } + } catch (Error& e) { + if (e.code() != error_code_actor_cancelled) { + TraceEvent(SevError, "ShardedRocksDBMetricsError").errorUnsuppressed(e); } - if (SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE != 0) { - rocksDBMetrics->logPerfContext(true); - }*/ } + return Void(); } struct ShardedRocksDBKeyValueStore : IKeyValueStore { @@ -1639,6 +1633,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { struct OpenAction : TypedAction { ShardManager* shardManager; + rocksdb::Options dbOptions; ThreadReturnPromise done; Optional>& metrics; const FlowLock* readLock; @@ -1646,18 +1641,20 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { std::shared_ptr errorListener; OpenAction(ShardManager* shardManager, + rocksdb::Options dbOptions, Optional>& metrics, const FlowLock* readLock, const FlowLock* fetchLock, std::shared_ptr errorListener) - : shardManager(shardManager), metrics(metrics), readLock(readLock), fetchLock(fetchLock), - errorListener(errorListener) {} + : shardManager(shardManager), dbOptions(dbOptions), metrics(metrics), readLock(readLock), + fetchLock(fetchLock), errorListener(errorListener) {} double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; void action(OpenAction& a) { - auto status = a.shardManager->init(); + auto status = a.shardManager->init(a.dbOptions); + if (!status.ok()) { logRocksDBError(status, "Open"); a.done.sendError(statusToError(status)); @@ -2114,10 +2111,10 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { std::reverse(a.shardRanges.begin(), a.shardRanges.end()); } - // TODO: consider multi-thread reads. It's possible to read multiple shards in parallel. However, the number - // of rows to read needs to be calculated based on the previous read result. We may read more than we - // expected when parallel read is used when the previous result is not available. It's unlikely to get to - // performance improvement when the actual number of rows to read is very small. + // TODO: consider multi-thread reads. It's possible to read multiple shards in parallel. However, the + // number of rows to read needs to be calculated based on the previous read result. We may read more + // than we expected when parallel read is used when the previous result is not available. It's unlikely + // to get to performance improvement when the actual number of rows to read is very small. int accumulatedBytes = 0; int numShards = 0; for (auto& [shard, range] : a.shardRanges) { @@ -2175,17 +2172,19 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), errorListener(std::make_shared()), errorFuture(errorListener->getFuture()), - shardManager(path, id), rocksDBMetrics(new RocksDBMetrics()) { - // In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine - // is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also - // block the network thread in a way that would be unacceptable in production but is a necessary evil here. When - // performing the reads in background threads in simulation, the event loop thinks there is no work to do and - // advances time faster than 1 sec/sec. By the time the blocking read actually finishes, simulation has advanced - // time by more than 5 seconds, so every read fails with a transaction_too_old error. Doing blocking IO on the - // main thread solves this issue. There are almost certainly better fixes, but my goal was to get a less - // invasive change merged first and work on a more realistic version if/when we think that would provide - // substantially more confidence in the correctness. - // TODO: Adapt the simulation framework to not advance time quickly when background reads/writes are occurring. + shardManager(path, id), dbOptions(getOptions()), + rocksDBMetrics(std::make_shared(id, dbOptions.statistics)) { + // In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage + // engine is still multi-threaded as background compaction threads are still present. Reads/writes to disk + // will also block the network thread in a way that would be unacceptable in production but is a necessary + // evil here. When performing the reads in background threads in simulation, the event loop thinks there is + // no work to do and advances time faster than 1 sec/sec. By the time the blocking read actually finishes, + // simulation has advanced time by more than 5 seconds, so every read fails with a transaction_too_old + // error. Doing blocking IO on the main thread solves this issue. There are almost certainly better fixes, + // but my goal was to get a less invasive change merged first and work on a more realistic version if/when + // we think that would provide substantially more confidence in the correctness. + // TODO: Adapt the simulation framework to not advance time quickly when background reads/writes are + // occurring. if (g_network->isSimulated()) { writeThread = CoroThreadPool::createThreadPool(); readThreads = CoroThreadPool::createThreadPool(); @@ -2233,14 +2232,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { Future init() override { if (openFuture.isValid()) { return openFuture; - // Restore durable state if KVS is open. KVS will be re-initialized during rollback. To avoid the cost of - // opening and closing multiple rocksdb instances, we reconcile the shard map using persist shard mapping - // data. + // Restore durable state if KVS is open. KVS will be re-initialized during rollback. To avoid the cost + // of opening and closing multiple rocksdb instances, we reconcile the shard map using persist shard + // mapping data. } else { auto a = std::make_unique( - &shardManager, metrics, &readSemaphore, &fetchSemaphore, errorListener); + &shardManager, dbOptions, metrics, &readSemaphore, &fetchSemaphore, errorListener); openFuture = a->done.getFuture(); - this->metrics = ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager); + this->metrics = ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager) && + rocksDBAggregatedMetricsLogger(this->rState, openFuture, rocksDBMetrics, &shardManager); this->refreshHolder = refreshReadIteratorPools(this->rState, openFuture, shardManager.getAllShards()); writeThread->post(a.release()); return openFuture; @@ -2433,6 +2433,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { std::shared_ptr rState; ShardManager shardManager; + rocksdb::Options dbOptions; std::shared_ptr rocksDBMetrics; std::string path; UID id;