From aa43956fbd72cc73371138fecdbb889f7eda1251 Mon Sep 17 00:00:00 2001 From: He Liu <86634338+liquid-helium@users.noreply.github.com> Date: Fri, 8 Jul 2022 15:17:38 -0700 Subject: [PATCH] Sharded RocksDB (#7540) * Some fixes. * Enabled ShardedRocksDB in IKeyValueStore.h * Added unit test for ShardedRocks for \xff\xff key space read/write. * Resolved comments. * Return empty read results if the physical shards are not initialized. Co-authored-by: He Liu --- .../KeyValueStoreShardedRocksDB.actor.cpp | 255 ++++++++++++++---- fdbserver/include/fdbserver/IKeyValueStore.h | 7 +- 2 files changed, 203 insertions(+), 59 deletions(-) diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 9f6f911a42..40531e0d8f 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -474,6 +474,16 @@ struct PhysicalShard { this->readIterPool->refreshIterators(); } + std::string toString() { + std::string ret = "[ID]: " + this->id + ", [CF]: "; + if (initialized()) { + ret += std::to_string(this->cf->GetID()); + } else { + ret += "Not initialized"; + } + return ret; + } + ~PhysicalShard() { if (!deletePending) return; @@ -497,7 +507,7 @@ struct PhysicalShard { std::atomic isInitialized; }; -int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int byteLimit, RangeResult* result) { +int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, int byteLimit, RangeResult* result) { if (rowLimit == 0 || byteLimit == 0) { return 0; } @@ -509,14 +519,14 @@ int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int rocksdb::Status s; auto options = getReadOptions(); // TODO: define single shard read timeout. - const uint64_t deadlineMircos = shard->physicalShard->db->GetEnv()->NowMicros() + readRangeTimeout * 1000000; + const uint64_t deadlineMircos = shard->db->GetEnv()->NowMicros() + readRangeTimeout * 1000000; options.deadline = std::chrono::microseconds(deadlineMircos / 1000000); // When using a prefix extractor, ensure that keys are returned in order even if they cross // a prefix boundary. options.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0); if (rowLimit >= 0) { - ReadIterator readIter = shard->physicalShard->readIterPool->getIterator(); + ReadIterator readIter = shard->readIterPool->getIterator(); auto cursor = readIter.iter; cursor->Seek(toSlice(range.begin)); while (cursor->Valid() && toStringRef(cursor->key()) < range.end) { @@ -531,9 +541,9 @@ int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int cursor->Next(); } s = cursor->status(); - shard->physicalShard->readIterPool->returnIterator(readIter); + shard->readIterPool->returnIterator(readIter); } else { - ReadIterator readIter = shard->physicalShard->readIterPool->getIterator(); + ReadIterator readIter = shard->readIterPool->getIterator(); auto cursor = readIter.iter; cursor->SeekForPrev(toSlice(range.end)); if (cursor->Valid() && toStringRef(cursor->key()) == range.end) { @@ -551,7 +561,7 @@ int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int cursor->Prev(); } s = cursor->status(); - shard->physicalShard->readIterPool->returnIterator(readIter); + shard->readIterPool->returnIterator(readIter); } if (!s.ok()) { @@ -566,9 +576,11 @@ int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int // Manages physical shards and maintains logical shard mapping. class ShardManager { public: - ShardManager(std::string path) : path(path), dataShardMap(nullptr, specialKeys.end) {} + ShardManager(std::string path, UID logId) : path(path), logId(logId), dataShardMap(nullptr, specialKeys.end) {} + rocksdb::Status init() { // Open instance. + TraceEvent(SevVerbose, "ShardManagerInitBegin", this->logId).detail("DataPath", path); std::vector columnFamilies; rocksdb::Options options = getOptions(); rocksdb::Status status = rocksdb::DB::ListColumnFamilies(options, path, &columnFamilies); @@ -608,12 +620,14 @@ public: TraceEvent(SevInfo, "ShardedRocskDB").detail("FoundShard", handle->GetName()).detail("Action", "Init"); } RangeResult metadata; - DataShard shard = DataShard(prefixRange(shardMappingPrefix), metadataShard.get()); - readRangeInDb(&shard, shard.range, UINT16_MAX, UINT16_MAX, &metadata); + readRangeInDb(metadataShard.get(), prefixRange(shardMappingPrefix), UINT16_MAX, UINT16_MAX, &metadata); std::vector> mapping = decodeShardMapping(metadata, shardMappingPrefix); for (const auto& [range, name] : mapping) { + TraceEvent(SevDebug, "ShardedRocksLoadPhysicalShard", this->logId) + .detail("Range", range) + .detail("PhysicalShard", name); auto it = physicalShards.find(name); // Create missing shards. if (it == physicalShards.end()) { @@ -635,6 +649,7 @@ public: std::unique_ptr dataShard = std::make_unique(specialKeys, defaultShard.get()); dataShardMap.insert(dataShard->range, dataShard.get()); defaultShard->dataShards[specialKeys.begin.toString()] = std::move(dataShard); + physicalShards[defaultShard->id] = defaultShard; metadataShard = std::make_shared(db, "kvs-metadata"); metadataShard->init(); @@ -650,15 +665,23 @@ public: return status; } metadataShard->readIterPool->update(); + TraceEvent(SevVerbose, "InitializeMetaDataShard", this->logId) + .detail("MetadataShardCF", metadataShard->cf->GetID()); } physicalShards["kvs-metadata"] = metadataShard; writeBatch = std::make_unique(); dirtyShards = std::make_unique>(); + + TraceEvent(SevDebug, "ShardManagerInitEnd", this->logId).detail("DataPath", path); return status; } - DataShard* getDataShard(KeyRef key) { return dataShardMap.rangeContaining(key).value(); } + DataShard* getDataShard(KeyRef key) { + DataShard* shard = dataShardMap[key]; + ASSERT(shard == nullptr || shard->range.contains(key)); + return shard; + } std::vector getDataShardsByRange(KeyRangeRef range) { std::vector result; @@ -678,28 +701,43 @@ public: } PhysicalShard* addRange(KeyRange range, std::string id) { + TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId) + .detail("Range", range) + .detail("PhysicalShardID", id); // Newly added range should not overlap with any existing range. - std::shared_ptr shard; - auto it = physicalShards.find(id); - if (it == physicalShards.end()) { - shard = std::make_shared(db, id); - physicalShards[id] = shard; - } else { - shard = it->second; + auto ranges = dataShardMap.intersectingRanges(range); + + for (auto it = ranges.begin(); it != ranges.end(); ++it) { + if (it.value() != nullptr && it.value()->physicalShard->id != id) { + TraceEvent(SevError, "ShardedRocksAddOverlappingRanges") + .detail("IntersectingRange", it->range()) + .detail("DataShardRange", it->value()->range) + .detail("PhysicalShard", it->value()->physicalShard->toString()); + } } + + auto [it, inserted] = physicalShards.emplace(id, std::make_shared(db, id)); + std::shared_ptr& shard = it->second; + auto dataShard = std::make_unique(range, shard.get()); dataShardMap.insert(range, dataShard.get()); shard->dataShards[range.begin.toString()] = std::move(dataShard); - TraceEvent(SevDebug, "ShardedRocksDB") - .detail("Action", "AddRange") - .detail("BeginKey", range.begin) - .detail("EndKey", range.end); + + validate(); + + TraceEvent(SevVerbose, "ShardedRocksAddRangeEnd", this->logId) + .detail("Range", range) + .detail("PhysicalShardID", id); + return shard.get(); } std::vector removeRange(KeyRange range) { + TraceEvent(SevVerbose, "ShardedRocksRemoveRangeBegin", this->logId).detail("Range", range); + std::vector shardIds; + std::vector newShards; auto ranges = dataShardMap.intersectingRanges(range); for (auto it = ranges.begin(); it != ranges.end(); ++it) { @@ -710,35 +748,60 @@ public: .detail("EndKey", range.end); continue; } - auto existingShard = it.value()->physicalShard; auto shardRange = it.range(); + TraceEvent(SevDebug, "ShardedRocksRemoveRange") + .detail("Range", range) + .detail("IntersectingRange", shardRange) + .detail("DataShardRange", it.value()->range) + .detail("PhysicalShard", existingShard->toString()); + ASSERT(it.value()->range == shardRange); // Ranges should be consistent. if (range.contains(shardRange)) { existingShard->dataShards.erase(shardRange.begin.toString()); + TraceEvent(SevInfo, "ShardedRocksRemovedRange") + .detail("Range", range) + .detail("RemovedRange", shardRange) + .detail("PhysicalShard", existingShard->toString()); if (existingShard->dataShards.size() == 0) { TraceEvent(SevDebug, "ShardedRocksDB").detail("EmptyShardId", existingShard->id); shardIds.push_back(existingShard->id); } continue; } - // Range modification could result in more than one segments. Remove the original segment key here. existingShard->dataShards.erase(shardRange.begin.toString()); if (shardRange.begin < range.begin) { - existingShard->dataShards[shardRange.begin.toString()] = + auto dataShard = std::make_unique(KeyRange(KeyRangeRef(shardRange.begin, range.begin)), existingShard); - logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE); + newShards.push_back(dataShard.get()); + const std::string msg = "Shrink shard from " + Traceable::toString(shardRange) + " to " + + Traceable::toString(dataShard->range); + existingShard->dataShards[shardRange.begin.toString()] = std::move(dataShard); + logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg); } if (shardRange.end > range.end) { - existingShard->dataShards[range.end.toString()] = + auto dataShard = std::make_unique(KeyRange(KeyRangeRef(range.end, shardRange.end)), existingShard); - logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE); + newShards.push_back(dataShard.get()); + const std::string msg = "Shrink shard from " + Traceable::toString(shardRange) + " to " + + Traceable::toString(dataShard->range); + existingShard->dataShards[range.end.toString()] = std::move(dataShard); + logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg); } } + dataShardMap.insert(range, nullptr); + for (DataShard* shard : newShards) { + dataShardMap.insert(shard->range, shard); + } + + validate(); + + TraceEvent(SevVerbose, "ShardedRocksRemoveRangeEnd", this->logId).detail("Range", range); + return shardIds; } @@ -760,8 +823,17 @@ public: TraceEvent(SevError, "ShardedRocksDB").detail("Error", "write to non-exist shard").detail("WriteKey", key); return; } + TraceEvent(SevVerbose, "ShardManagerPut", this->logId) + .detail("WriteKey", key) + .detail("Value", value) + .detail("MapRange", it.range()) + .detail("ShardRange", it.value()->range); + ASSERT(it.value()->range == (KeyRangeRef)it.range()); + ASSERT(writeBatch != nullptr); + ASSERT(dirtyShards != nullptr); writeBatch->Put(it.value()->physicalShard->cf, toSlice(key), toSlice(value)); dirtyShards->insert(it.value()->physicalShard); + TraceEvent(SevVerbose, "ShardManagerPutEnd", this->logId).detail("WriteKey", key).detail("Value", value); } void clear(KeyRef key) { @@ -894,8 +966,24 @@ public: return dataMap; } + void validate() { + TraceEvent(SevVerbose, "ValidateShardManager", this->logId); + for (auto s = dataShardMap.ranges().begin(); s != dataShardMap.ranges().end(); ++s) { + TraceEvent e(SevVerbose, "ValidateDataShardMap", this->logId); + e.detail("Range", s->range()); + const DataShard* shard = s->value(); + e.detail("ShardAddress", reinterpret_cast(shard)); + if (shard != nullptr) { + e.detail("PhysicalShard", shard->physicalShard->id); + } else { + e.detail("Shard", "Empty"); + } + } + } + private: - std::string path; + const std::string path; + const UID logId; rocksdb::DB* db = nullptr; std::unordered_map> physicalShards; // Stores mapping between cf id and cf handle, used during compaction. @@ -1480,15 +1568,17 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } struct Writer : IThreadPoolReceiver { + const UID logId; int threadIndex; std::unordered_map* columnFamilyMap; std::shared_ptr rocksDBMetrics; std::shared_ptr rateLimiter; - explicit Writer(int threadIndex, + explicit Writer(UID logId, + int threadIndex, std::unordered_map* columnFamilyMap, std::shared_ptr rocksDBMetrics) - : threadIndex(threadIndex), columnFamilyMap(columnFamilyMap), rocksDBMetrics(rocksDBMetrics), + : logId(logId), threadIndex(threadIndex), columnFamilyMap(columnFamilyMap), rocksDBMetrics(rocksDBMetrics), rateLimiter(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0 ? rocksdb::NewGenericRateLimiter( SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, // rate_bytes_per_sec @@ -1726,14 +1816,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { }; struct Reader : IThreadPoolReceiver { + const UID logId; double readValueTimeout; double readValuePrefixTimeout; double readRangeTimeout; int threadIndex; std::shared_ptr rocksDBMetrics; - explicit Reader(int threadIndex, std::shared_ptr rocksDBMetrics) - : threadIndex(threadIndex), rocksDBMetrics(rocksDBMetrics) { + explicit Reader(UID logId, int threadIndex, std::shared_ptr rocksDBMetrics) + : logId(logId), threadIndex(threadIndex), rocksDBMetrics(rocksDBMetrics) { if (g_network->isSimulated()) { // In simulation, increasing the read operation timeouts to 5 minutes, as some of the tests have // very high load and single read thread cannot process all the load within the timeouts. @@ -1751,7 +1842,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { struct ReadValueAction : TypedAction { Key key; - DataShard* shard; + PhysicalShard* shard; Optional debugID; double startTime; bool getHistograms; @@ -1759,7 +1850,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { bool logShardMemUsage; ThreadReturnPromise> result; - ReadValueAction(KeyRef key, DataShard* shard, Optional debugID) + ReadValueAction(KeyRef key, PhysicalShard* shard, Optional debugID) : key(key), shard(shard), debugID(debugID), startTime(timer_monotonic()), getHistograms( (deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false), @@ -1797,13 +1888,13 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { rocksdb::PinnableSlice value; auto options = getReadOptions(); - auto db = a.shard->physicalShard->db; + auto db = a.shard->db; uint64_t deadlineMircos = db->GetEnv()->NowMicros() + (readValueTimeout - (timer_monotonic() - a.startTime)) * 1000000; std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000); options.deadline = std::chrono::duration_cast(deadlineSeconds); double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0; - auto s = db->Get(options, a.shard->physicalShard->cf, toSlice(a.key), &value); + auto s = db->Get(options, a.shard->cf, toSlice(a.key), &value); if (a.getHistograms) { rocksDBMetrics->getReadValueGetHistogram(threadIndex) @@ -1836,14 +1927,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { struct ReadValuePrefixAction : TypedAction { Key key; int maxLength; - DataShard* shard; + PhysicalShard* shard; Optional debugID; double startTime; bool getHistograms; bool getPerfContext; bool logShardMemUsage; ThreadReturnPromise> result; - ReadValuePrefixAction(Key key, int maxLength, DataShard* shard, Optional debugID) + + ReadValuePrefixAction(Key key, int maxLength, PhysicalShard* shard, Optional debugID) : key(key), maxLength(maxLength), shard(shard), debugID(debugID), startTime(timer_monotonic()), getHistograms( (deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false), @@ -1854,6 +1946,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { : false){}; double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; } }; + void action(ReadValuePrefixAction& a) { if (a.getPerfContext) { rocksDBMetrics->resetPerfContext(); @@ -1881,14 +1974,14 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { rocksdb::PinnableSlice value; auto options = getReadOptions(); - auto db = a.shard->physicalShard->db; + auto db = a.shard->db; uint64_t deadlineMircos = db->GetEnv()->NowMicros() + (readValuePrefixTimeout - (timer_monotonic() - a.startTime)) * 1000000; std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000); options.deadline = std::chrono::duration_cast(deadlineSeconds); double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0; - auto s = db->Get(options, a.shard->physicalShard->cf, toSlice(a.key), &value); + auto s = db->Get(options, a.shard->cf, toSlice(a.key), &value); if (a.getHistograms) { rocksDBMetrics->getReadPrefixGetHistogram(threadIndex) @@ -1922,7 +2015,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { struct ReadRangeAction : TypedAction, FastAllocated { KeyRange keys; - std::vector shards; + std::vector> shardRanges; int rowLimit, byteLimit; double startTime; bool getHistograms; @@ -1930,16 +2023,21 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { bool logShardMemUsage; ThreadReturnPromise result; ReadRangeAction(KeyRange keys, std::vector shards, int rowLimit, int byteLimit) - : keys(keys), shards(shards), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()), + : keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()), getHistograms( (deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false), getPerfContext( (SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE != 0) && (deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE) ? true - : false) {} + : false) { + for (const DataShard* shard : shards) { + shardRanges.emplace_back(shard->physicalShard, keys & shard->range); + } + } double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; } }; + void action(ReadRangeAction& a) { if (a.getPerfContext) { rocksDBMetrics->resetPerfContext(); @@ -1966,7 +2064,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } if (rowLimit < 0) { // Reverses the shard order so we could read range in reverse direction. - std::reverse(a.shards.begin(), a.shards.end()); + std::reverse(a.shardRanges.begin(), a.shardRanges.end()); } // TODO: consider multi-thread reads. It's possible to read multiple shards in parallel. However, the number @@ -1975,11 +2073,9 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { // performance improvement when the actual number of rows to read is very small. int accumulatedBytes = 0; int numShards = 0; - for (auto shard : a.shards) { - auto range = shard->range; - KeyRange readRange = KeyRange(a.keys & range); - - auto bytesRead = readRangeInDb(shard, readRange, rowLimit, byteLimit, &result); + for (auto& [shard, range] : a.shardRanges) { + ASSERT(shard != nullptr && shard->initialized()); + auto bytesRead = readRangeInDb(shard, range, rowLimit, byteLimit, &result); if (bytesRead < 0) { // Error reading an instance. a.result.sendError(internal_error()); @@ -2031,7 +2127,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), errorListener(std::make_shared()), errorFuture(errorListener->getFuture()), - shardManager(path), rocksDBMetrics(new RocksDBMetrics()) { + shardManager(path, id), rocksDBMetrics(new RocksDBMetrics()) { // In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine // is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also // block the network thread in a way that would be unacceptable in production but is a necessary evil here. When @@ -2049,10 +2145,10 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { writeThread = createGenericThreadPool(); readThreads = createGenericThreadPool(); } - writeThread->addThread(new Writer(0, shardManager.getColumnFamilyMap(), rocksDBMetrics), "fdb-rocksdb-wr"); + writeThread->addThread(new Writer(id, 0, shardManager.getColumnFamilyMap(), rocksDBMetrics), "fdb-rocksdb-wr"); TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM); for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) { - readThreads->addThread(new Reader(i, rocksDBMetrics), "fdb-rocksdb-re"); + readThreads->addThread(new Reader(id, i, rocksDBMetrics), "fdb-rocksdb-re"); } } @@ -2081,7 +2177,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { void close() override { doClose(this, false); } - KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); } + KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_SHARDED_ROCKSDB); } Future init() override { if (openFuture.isValid()) { @@ -2162,15 +2258,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } Future> readValue(KeyRef key, IKeyValueStore::ReadType type, Optional debugID) override { - auto shard = shardManager.getDataShard(key); - if (shard == nullptr) { + auto* shard = shardManager.getDataShard(key); + if (shard == nullptr || !shard->physicalShard->initialized()) { // TODO: read non-exist system key range should not cause an error. TraceEvent(SevError, "ShardedRocksDB").detail("Detail", "Read non-exist key range").detail("ReadKey", key); return Optional(); } if (!shouldThrottle(type, key)) { - auto a = new Reader::ReadValueAction(key, shard, debugID); + auto a = new Reader::ReadValueAction(key, shard->physicalShard, debugID); auto res = a->result.getFuture(); readThreads->post(a); return res; @@ -2180,7 +2276,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? numFetchWaiters : numReadWaiters; checkWaiters(semaphore, maxWaiters); - auto a = std::make_unique(key, shard, debugID); + auto a = std::make_unique(key, shard->physicalShard, debugID); return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire); } @@ -2188,8 +2284,28 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { int maxLength, IKeyValueStore::ReadType type, Optional debugID) override { + auto* shard = shardManager.getDataShard(key); + if (shard == nullptr || !shard->physicalShard->initialized()) { + // TODO: read non-exist system key range should not cause an error. + TraceEvent(SevWarnAlways, "ShardedRocksDB") + .detail("Detail", "Read non-exist key range") + .detail("ReadKey", key); + return Optional(); + } - return Optional(); + if (!shouldThrottle(type, key)) { + auto a = new Reader::ReadValuePrefixAction(key, maxLength, shard->physicalShard, debugID); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore; + int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? numFetchWaiters : numReadWaiters; + + checkWaiters(semaphore, maxWaiters); + auto a = std::make_unique(key, maxLength, shard->physicalShard, debugID); + return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire); } ACTOR static Future> read(Reader::ReadRangeAction* action, @@ -2216,8 +2332,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { int rowLimit, int byteLimit, IKeyValueStore::ReadType type) override { + TraceEvent(SevVerbose, "ShardedRocksReadRangeBegin", this->id).detail("Range", keys); auto shards = shardManager.getDataShardsByRange(keys); + for (DataShard* shard : shards) { + if (shard == nullptr || !shard->physicalShard->initialized()) { + return RangeResult(); + } + } + if (!shouldThrottle(type, keys.begin)) { auto a = new Reader::ReadRangeAction(keys, shards, rowLimit, byteLimit); auto res = a->result.getFuture(); @@ -2594,6 +2717,8 @@ TEST_CASE("noSim/ShardedRocksDB/ShardOps") { TEST_CASE("noSim/ShardedRocksDB/Metadata") { state std::string rocksDBTestDir = "sharded-rocksdb-kvs-test-db"; + state Key testSpecialKey = "\xff\xff/TestKey"_sr; + state Value testSpecialValue = "\xff\xff/TestValue"_sr; platform::eraseDirectoryRecursive(rocksDBTestDir); state ShardedRocksDBKeyValueStore* rocksdbStore = @@ -2601,6 +2726,15 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") { state IKeyValueStore* kvStore = rocksdbStore; wait(kvStore->init()); + Optional val = wait(kvStore->readValue(testSpecialKey)); + ASSERT(!val.present()); + + kvStore->set(KeyValueRef(testSpecialKey, testSpecialValue)); + wait(kvStore->commit(false)); + + Optional val = wait(kvStore->readValue(testSpecialKey)); + ASSERT(val.get() == testSpecialValue); + // Add some ranges. std::vector> addRangeFutures; addRangeFutures.push_back(kvStore->addRange(KeyRangeRef("a"_sr, "c"_sr), "shard-1")); @@ -2620,6 +2754,11 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") { kvStore = rocksdbStore; wait(kvStore->init()); + { + Optional val = wait(kvStore->readValue(testSpecialKey)); + ASSERT(val.get() == testSpecialValue); + } + // Read value back. Optional val = wait(kvStore->readValue("a1"_sr)); ASSERT(val == Optional("foo"_sr)); diff --git a/fdbserver/include/fdbserver/IKeyValueStore.h b/fdbserver/include/fdbserver/IKeyValueStore.h index 048213b235..942ff37539 100644 --- a/fdbserver/include/fdbserver/IKeyValueStore.h +++ b/fdbserver/include/fdbserver/IKeyValueStore.h @@ -161,6 +161,11 @@ extern IKeyValueStore* keyValueStoreRocksDB(std::string const& path, KeyValueStoreType storeType, bool checkChecksums = false, bool checkIntegrity = false); +extern IKeyValueStore* keyValueStoreShardedRocksDB(std::string const& path, + UID logID, + KeyValueStoreType storeType, + bool checkChecksums = false, + bool checkIntegrity = false); extern IKeyValueStore* keyValueStoreMemory(std::string const& basename, UID logID, int64_t memoryLimit, @@ -204,7 +209,7 @@ inline IKeyValueStore* openKVStore(KeyValueStoreType storeType, case KeyValueStoreType::SSD_ROCKSDB_V1: return keyValueStoreRocksDB(filename, logID, storeType); case KeyValueStoreType::SSD_SHARDED_ROCKSDB: - return keyValueStoreRocksDB(filename, logID, storeType); // TODO: to replace the KVS in the future + return keyValueStoreShardedRocksDB(filename, logID, storeType, checkChecksums, checkIntegrity); case KeyValueStoreType::MEMORY_RADIXTREE: return keyValueStoreMemory(filename, logID,