diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index c15692a5e3..d7849395fd 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -960,7 +960,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( KMS_CONNECTOR_TYPE, "RESTKmsConnector" ); // Blob granlues - init( BG_URL, isSimulated ? "file://fdbblob/" : "" ); // TODO: store in system key space or something, eventually + init( BG_URL, isSimulated ? "file://simfdb/fdbblob/" : "" ); // TODO: store in system key space or something, eventually bool buggifyMediumGranules = simulationMediumShards || (randomize && BUGGIFY); // BlobGranuleVerify* simulation tests use "knobs", BlobGranuleCorrectness* use "tenant", default in real clusters is "knobs" init( BG_METADATA_SOURCE, "knobs" ); diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 462c42f749..4cd1410c9e 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -155,7 +155,7 @@ struct ShardedRocksDBState { std::shared_ptr rocksdb_block_cache = nullptr; -rocksdb::Slice toSlice(StringRef s) { +const rocksdb::Slice toSlice(StringRef s) { return rocksdb::Slice(reinterpret_cast(s.begin()), s.size()); } @@ -309,8 +309,20 @@ struct ReadIterator { bool inUse; std::shared_ptr iter; double creationTime; - ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, rocksdb::ReadOptions& options) + KeyRange keyRange; + std::shared_ptr beginSlice, endSlice; + + ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, const rocksdb::ReadOptions& options) : index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {} + ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, const KeyRange& range) + : index(index), inUse(true), creationTime(now()), keyRange(range) { + auto options = getReadOptions(); + beginSlice = std::shared_ptr(new rocksdb::Slice(toSlice(keyRange.begin))); + options.iterate_lower_bound = beginSlice.get(); + endSlice = std::shared_ptr(new rocksdb::Slice(toSlice(keyRange.end))); + options.iterate_upper_bound = endSlice.get(); + iter = std::shared_ptr(db->NewIterator(options, cf)); + } }; /* @@ -348,7 +360,8 @@ public: } // Called on every read operation. - ReadIterator getIterator() { + ReadIterator getIterator(const KeyRange& range) { + // Shared iterators are not bounded. if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) { std::lock_guard lock(mutex); for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) { @@ -364,7 +377,7 @@ public: return iter; } else { index++; - ReadIterator iter(cf, index, db, readRangeOptions); + ReadIterator iter(cf, index, db, range); return iter; } } @@ -511,7 +524,7 @@ struct PhysicalShard { double deleteTimeSec; }; -int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, int byteLimit, RangeResult* result) { +int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, int byteLimit, RangeResult* result) { if (rowLimit == 0 || byteLimit == 0) { return 0; } @@ -523,7 +536,7 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, // When using a prefix extractor, ensure that keys are returned in order even if they cross // a prefix boundary. if (rowLimit >= 0) { - ReadIterator readIter = shard->readIterPool->getIterator(); + ReadIterator readIter = shard->readIterPool->getIterator(range); auto cursor = readIter.iter; cursor->Seek(toSlice(range.begin)); while (cursor->Valid() && toStringRef(cursor->key()) < range.end) { @@ -540,7 +553,7 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, s = cursor->status(); shard->readIterPool->returnIterator(readIter); } else { - ReadIterator readIter = shard->readIterPool->getIterator(); + ReadIterator readIter = shard->readIterPool->getIterator(range); auto cursor = readIter.iter; cursor->SeekForPrev(toSlice(range.end)); if (cursor->Valid() && toStringRef(cursor->key()) == range.end) { @@ -2150,10 +2163,16 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { : keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()), getHistograms( (deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false) { + std::set usedShards; for (const DataShard* shard : shards) { - if (shard != nullptr) { - shardRanges.emplace_back(shard->physicalShard, keys & shard->range); - } + ASSERT(shard); + shardRanges.emplace_back(shard->physicalShard, keys & shard->range); + usedShards.insert(shard->physicalShard); + } + if (usedShards.size() != shards.size()) { + TraceEvent("ReadRangeMetrics") + .detail("NumPhysicalShards", usedShards.size()) + .detail("NumDataShards", shards.size()); } } double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; } diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 6e4a1ae579..9efa86f297 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -2149,7 +2149,7 @@ int main(int argc, char* argv[]) { auto dataFolder = opts.dataFolder.size() ? opts.dataFolder : "simfdb"; std::vector directories = platform::listDirectories(dataFolder); - const std::set allowedDirectories = { ".", "..", "backups", "unittests" }; + const std::set allowedDirectories = { ".", "..", "backups", "unittests", "fdbblob" }; for (const auto& dir : directories) { if (dir.size() != 32 && allowedDirectories.count(dir) == 0 && dir.find("snap") == std::string::npos) { diff --git a/fdbserver/include/fdbserver/StorageMetrics.actor.h b/fdbserver/include/fdbserver/StorageMetrics.actor.h index dc518cf318..c524d36f0a 100644 --- a/fdbserver/include/fdbserver/StorageMetrics.actor.h +++ b/fdbserver/include/fdbserver/StorageMetrics.actor.h @@ -44,6 +44,8 @@ const StringRef TLOG_MSGS_PTREE_UPDATES_LATENCY_HISTOGRAM = "TLogMsgsPTreeUpdate const StringRef STORAGE_UPDATES_DURABLE_LATENCY_HISTOGRAM = "StorageUpdatesDurableLatency"_sr; const StringRef STORAGE_COMMIT_LATENCY_HISTOGRAM = "StorageCommitLatency"_sr; const StringRef SS_DURABLE_VERSION_UPDATE_LATENCY_HISTOGRAM = "SSDurableVersionUpdateLatency"_sr; +const StringRef SS_READ_RANGE_BYTES_RETURNED_HISTOGRAM = "SSReadRangeBytesReturned"_sr; +const StringRef SS_READ_RANGE_BYTES_LIMIT_HISTOGRAM = "SSReadRangeBytesLimit"_sr; struct StorageMetricSample { IndexedSet sample; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 90f8350b82..ed257dd207 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -735,6 +735,9 @@ public: Reference storageUpdatesDurableLatencyHistogram; Reference storageCommitLatencyHistogram; Reference ssDurableVersionUpdateLatencyHistogram; + // Histograms of requests sent to KVS. + Reference readRangeBytesReturnedHistogram; + Reference readRangeBytesLimitHistogram; // watch map operations Reference getWatchMetadata(KeyRef key) const; @@ -1296,6 +1299,12 @@ public: ssDurableVersionUpdateLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, SS_DURABLE_VERSION_UPDATE_LATENCY_HISTOGRAM, Histogram::Unit::microseconds)), + readRangeBytesReturnedHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, + SS_READ_RANGE_BYTES_RETURNED_HISTOGRAM, + Histogram::Unit::countLinear)), + readRangeBytesLimitHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, + SS_READ_RANGE_BYTES_LIMIT_HISTOGRAM, + Histogram::Unit::countLinear)), tag(invalidTag), poppedAllAfter(std::numeric_limits::max()), cpuUsage(0.0), diskUsage(0.0), storage(this, storage), shardChangeCounter(0), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0), prevVersion(0), rebootAfterDurableVersion(std::numeric_limits::max()), @@ -3460,6 +3469,8 @@ ACTOR Future readRange(StorageServer* data, RangeResult atStorageVersion = wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, options)); data->counters.kvScanBytes += atStorageVersion.logicalSize(); + data->readRangeBytesReturnedHistogram->sample(atStorageVersion.logicalSize()); + data->readRangeBytesLimitHistogram->sample(*pLimitBytes); ASSERT(atStorageVersion.size() <= limit); if (data->storageVersion() > version) { @@ -3555,6 +3566,8 @@ ACTOR Future readRange(StorageServer* data, RangeResult atStorageVersion = wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, options)); data->counters.kvScanBytes += atStorageVersion.logicalSize(); + data->readRangeBytesReturnedHistogram->sample(atStorageVersion.logicalSize()); + data->readRangeBytesLimitHistogram->sample(*pLimitBytes); ASSERT(atStorageVersion.size() <= -limit); if (data->storageVersion() > version)