Merge branch 'feature/dd-refactor-incremental' of https://github.com/sfc-gh-xwang/foundationdb into feature/dd-refactor-incremental

This commit is contained in:
Xiaoxi Wang 2022-07-08 16:35:31 -07:00
commit 5ce1d7342d
3 changed files with 215 additions and 72 deletions

View File

@ -498,19 +498,18 @@ ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
//}
int numShards = splitKeys.size() - 1;
if (deterministicRandom()->random01() < 0.01) {
TraceEvent("RelocateShardStartSplitx100", self->distributorId)
.detail("Begin", keys.begin)
.detail("End", keys.end)
.detail("MaxBytes", shardBounds.max.bytes)
.detail("MetricsBytes", metrics.bytes)
.detail("Bandwidth",
bandwidthStatus == BandwidthStatusHigh ? "High"
: bandwidthStatus == BandwidthStatusNormal ? "Normal"
: "Low")
.detail("BytesPerKSec", metrics.bytesPerKSecond)
.detail("NumShards", numShards);
}
TraceEvent("RelocateShardStartSplit", self->distributorId)
.suppressFor(1.0)
.detail("Begin", keys.begin)
.detail("End", keys.end)
.detail("MaxBytes", shardBounds.max.bytes)
.detail("MetricsBytes", metrics.bytes)
.detail("Bandwidth",
bandwidthStatus == BandwidthStatusHigh ? "High"
: bandwidthStatus == BandwidthStatusNormal ? "Normal"
: "Low")
.detail("BytesPerKSec", metrics.bytesPerKSecond)
.detail("NumShards", numShards);
if (numShards > 1) {
int skipRange = deterministicRandom()->randomInt(0, numShards);

View File

@ -474,6 +474,16 @@ struct PhysicalShard {
this->readIterPool->refreshIterators();
}
std::string toString() {
std::string ret = "[ID]: " + this->id + ", [CF]: ";
if (initialized()) {
ret += std::to_string(this->cf->GetID());
} else {
ret += "Not initialized";
}
return ret;
}
~PhysicalShard() {
if (!deletePending)
return;
@ -497,7 +507,7 @@ struct PhysicalShard {
std::atomic<bool> isInitialized;
};
int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int byteLimit, RangeResult* result) {
int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, int byteLimit, RangeResult* result) {
if (rowLimit == 0 || byteLimit == 0) {
return 0;
}
@ -509,14 +519,14 @@ int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int
rocksdb::Status s;
auto options = getReadOptions();
// TODO: define single shard read timeout.
const uint64_t deadlineMircos = shard->physicalShard->db->GetEnv()->NowMicros() + readRangeTimeout * 1000000;
const uint64_t deadlineMircos = shard->db->GetEnv()->NowMicros() + readRangeTimeout * 1000000;
options.deadline = std::chrono::microseconds(deadlineMircos / 1000000);
// When using a prefix extractor, ensure that keys are returned in order even if they cross
// a prefix boundary.
options.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
if (rowLimit >= 0) {
ReadIterator readIter = shard->physicalShard->readIterPool->getIterator();
ReadIterator readIter = shard->readIterPool->getIterator();
auto cursor = readIter.iter;
cursor->Seek(toSlice(range.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < range.end) {
@ -531,9 +541,9 @@ int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int
cursor->Next();
}
s = cursor->status();
shard->physicalShard->readIterPool->returnIterator(readIter);
shard->readIterPool->returnIterator(readIter);
} else {
ReadIterator readIter = shard->physicalShard->readIterPool->getIterator();
ReadIterator readIter = shard->readIterPool->getIterator();
auto cursor = readIter.iter;
cursor->SeekForPrev(toSlice(range.end));
if (cursor->Valid() && toStringRef(cursor->key()) == range.end) {
@ -551,7 +561,7 @@ int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int
cursor->Prev();
}
s = cursor->status();
shard->physicalShard->readIterPool->returnIterator(readIter);
shard->readIterPool->returnIterator(readIter);
}
if (!s.ok()) {
@ -566,9 +576,11 @@ int readRangeInDb(DataShard* shard, const KeyRangeRef& range, int rowLimit, int
// Manages physical shards and maintains logical shard mapping.
class ShardManager {
public:
ShardManager(std::string path) : path(path), dataShardMap(nullptr, specialKeys.end) {}
ShardManager(std::string path, UID logId) : path(path), logId(logId), dataShardMap(nullptr, specialKeys.end) {}
rocksdb::Status init() {
// Open instance.
TraceEvent(SevVerbose, "ShardManagerInitBegin", this->logId).detail("DataPath", path);
std::vector<std::string> columnFamilies;
rocksdb::Options options = getOptions();
rocksdb::Status status = rocksdb::DB::ListColumnFamilies(options, path, &columnFamilies);
@ -608,12 +620,14 @@ public:
TraceEvent(SevInfo, "ShardedRocskDB").detail("FoundShard", handle->GetName()).detail("Action", "Init");
}
RangeResult metadata;
DataShard shard = DataShard(prefixRange(shardMappingPrefix), metadataShard.get());
readRangeInDb(&shard, shard.range, UINT16_MAX, UINT16_MAX, &metadata);
readRangeInDb(metadataShard.get(), prefixRange(shardMappingPrefix), UINT16_MAX, UINT16_MAX, &metadata);
std::vector<std::pair<KeyRange, std::string>> mapping = decodeShardMapping(metadata, shardMappingPrefix);
for (const auto& [range, name] : mapping) {
TraceEvent(SevDebug, "ShardedRocksLoadPhysicalShard", this->logId)
.detail("Range", range)
.detail("PhysicalShard", name);
auto it = physicalShards.find(name);
// Create missing shards.
if (it == physicalShards.end()) {
@ -635,6 +649,7 @@ public:
std::unique_ptr<DataShard> dataShard = std::make_unique<DataShard>(specialKeys, defaultShard.get());
dataShardMap.insert(dataShard->range, dataShard.get());
defaultShard->dataShards[specialKeys.begin.toString()] = std::move(dataShard);
physicalShards[defaultShard->id] = defaultShard;
metadataShard = std::make_shared<PhysicalShard>(db, "kvs-metadata");
metadataShard->init();
@ -650,15 +665,23 @@ public:
return status;
}
metadataShard->readIterPool->update();
TraceEvent(SevVerbose, "InitializeMetaDataShard", this->logId)
.detail("MetadataShardCF", metadataShard->cf->GetID());
}
physicalShards["kvs-metadata"] = metadataShard;
writeBatch = std::make_unique<rocksdb::WriteBatch>();
dirtyShards = std::make_unique<std::set<PhysicalShard*>>();
TraceEvent(SevDebug, "ShardManagerInitEnd", this->logId).detail("DataPath", path);
return status;
}
DataShard* getDataShard(KeyRef key) { return dataShardMap.rangeContaining(key).value(); }
DataShard* getDataShard(KeyRef key) {
DataShard* shard = dataShardMap[key];
ASSERT(shard == nullptr || shard->range.contains(key));
return shard;
}
std::vector<DataShard*> getDataShardsByRange(KeyRangeRef range) {
std::vector<DataShard*> result;
@ -678,28 +701,43 @@ public:
}
PhysicalShard* addRange(KeyRange range, std::string id) {
TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId)
.detail("Range", range)
.detail("PhysicalShardID", id);
// Newly added range should not overlap with any existing range.
std::shared_ptr<PhysicalShard> shard;
auto it = physicalShards.find(id);
if (it == physicalShards.end()) {
shard = std::make_shared<PhysicalShard>(db, id);
physicalShards[id] = shard;
} else {
shard = it->second;
auto ranges = dataShardMap.intersectingRanges(range);
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
if (it.value() != nullptr && it.value()->physicalShard->id != id) {
TraceEvent(SevError, "ShardedRocksAddOverlappingRanges")
.detail("IntersectingRange", it->range())
.detail("DataShardRange", it->value()->range)
.detail("PhysicalShard", it->value()->physicalShard->toString());
}
}
auto [it, inserted] = physicalShards.emplace(id, std::make_shared<PhysicalShard>(db, id));
std::shared_ptr<PhysicalShard>& shard = it->second;
auto dataShard = std::make_unique<DataShard>(range, shard.get());
dataShardMap.insert(range, dataShard.get());
shard->dataShards[range.begin.toString()] = std::move(dataShard);
TraceEvent(SevDebug, "ShardedRocksDB")
.detail("Action", "AddRange")
.detail("BeginKey", range.begin)
.detail("EndKey", range.end);
validate();
TraceEvent(SevVerbose, "ShardedRocksAddRangeEnd", this->logId)
.detail("Range", range)
.detail("PhysicalShardID", id);
return shard.get();
}
std::vector<std::string> removeRange(KeyRange range) {
TraceEvent(SevVerbose, "ShardedRocksRemoveRangeBegin", this->logId).detail("Range", range);
std::vector<std::string> shardIds;
std::vector<DataShard*> newShards;
auto ranges = dataShardMap.intersectingRanges(range);
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
@ -710,35 +748,60 @@ public:
.detail("EndKey", range.end);
continue;
}
auto existingShard = it.value()->physicalShard;
auto shardRange = it.range();
TraceEvent(SevDebug, "ShardedRocksRemoveRange")
.detail("Range", range)
.detail("IntersectingRange", shardRange)
.detail("DataShardRange", it.value()->range)
.detail("PhysicalShard", existingShard->toString());
ASSERT(it.value()->range == shardRange); // Ranges should be consistent.
if (range.contains(shardRange)) {
existingShard->dataShards.erase(shardRange.begin.toString());
TraceEvent(SevInfo, "ShardedRocksRemovedRange")
.detail("Range", range)
.detail("RemovedRange", shardRange)
.detail("PhysicalShard", existingShard->toString());
if (existingShard->dataShards.size() == 0) {
TraceEvent(SevDebug, "ShardedRocksDB").detail("EmptyShardId", existingShard->id);
shardIds.push_back(existingShard->id);
}
continue;
}
// Range modification could result in more than one segments. Remove the original segment key here.
existingShard->dataShards.erase(shardRange.begin.toString());
if (shardRange.begin < range.begin) {
existingShard->dataShards[shardRange.begin.toString()] =
auto dataShard =
std::make_unique<DataShard>(KeyRange(KeyRangeRef(shardRange.begin, range.begin)), existingShard);
logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE);
newShards.push_back(dataShard.get());
const std::string msg = "Shrink shard from " + Traceable<KeyRangeRef>::toString(shardRange) + " to " +
Traceable<KeyRangeRef>::toString(dataShard->range);
existingShard->dataShards[shardRange.begin.toString()] = std::move(dataShard);
logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
}
if (shardRange.end > range.end) {
existingShard->dataShards[range.end.toString()] =
auto dataShard =
std::make_unique<DataShard>(KeyRange(KeyRangeRef(range.end, shardRange.end)), existingShard);
logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE);
newShards.push_back(dataShard.get());
const std::string msg = "Shrink shard from " + Traceable<KeyRangeRef>::toString(shardRange) + " to " +
Traceable<KeyRangeRef>::toString(dataShard->range);
existingShard->dataShards[range.end.toString()] = std::move(dataShard);
logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
}
}
dataShardMap.insert(range, nullptr);
for (DataShard* shard : newShards) {
dataShardMap.insert(shard->range, shard);
}
validate();
TraceEvent(SevVerbose, "ShardedRocksRemoveRangeEnd", this->logId).detail("Range", range);
return shardIds;
}
@ -760,8 +823,17 @@ public:
TraceEvent(SevError, "ShardedRocksDB").detail("Error", "write to non-exist shard").detail("WriteKey", key);
return;
}
TraceEvent(SevVerbose, "ShardManagerPut", this->logId)
.detail("WriteKey", key)
.detail("Value", value)
.detail("MapRange", it.range())
.detail("ShardRange", it.value()->range);
ASSERT(it.value()->range == (KeyRangeRef)it.range());
ASSERT(writeBatch != nullptr);
ASSERT(dirtyShards != nullptr);
writeBatch->Put(it.value()->physicalShard->cf, toSlice(key), toSlice(value));
dirtyShards->insert(it.value()->physicalShard);
TraceEvent(SevVerbose, "ShardManagerPutEnd", this->logId).detail("WriteKey", key).detail("Value", value);
}
void clear(KeyRef key) {
@ -894,8 +966,24 @@ public:
return dataMap;
}
void validate() {
TraceEvent(SevVerbose, "ValidateShardManager", this->logId);
for (auto s = dataShardMap.ranges().begin(); s != dataShardMap.ranges().end(); ++s) {
TraceEvent e(SevVerbose, "ValidateDataShardMap", this->logId);
e.detail("Range", s->range());
const DataShard* shard = s->value();
e.detail("ShardAddress", reinterpret_cast<std::uintptr_t>(shard));
if (shard != nullptr) {
e.detail("PhysicalShard", shard->physicalShard->id);
} else {
e.detail("Shard", "Empty");
}
}
}
private:
std::string path;
const std::string path;
const UID logId;
rocksdb::DB* db = nullptr;
std::unordered_map<std::string, std::shared_ptr<PhysicalShard>> physicalShards;
// Stores mapping between cf id and cf handle, used during compaction.
@ -1480,15 +1568,17 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
struct Writer : IThreadPoolReceiver {
const UID logId;
int threadIndex;
std::unordered_map<uint32_t, rocksdb::ColumnFamilyHandle*>* columnFamilyMap;
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
std::shared_ptr<rocksdb::RateLimiter> rateLimiter;
explicit Writer(int threadIndex,
explicit Writer(UID logId,
int threadIndex,
std::unordered_map<uint32_t, rocksdb::ColumnFamilyHandle*>* columnFamilyMap,
std::shared_ptr<RocksDBMetrics> rocksDBMetrics)
: threadIndex(threadIndex), columnFamilyMap(columnFamilyMap), rocksDBMetrics(rocksDBMetrics),
: logId(logId), threadIndex(threadIndex), columnFamilyMap(columnFamilyMap), rocksDBMetrics(rocksDBMetrics),
rateLimiter(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0
? rocksdb::NewGenericRateLimiter(
SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, // rate_bytes_per_sec
@ -1726,14 +1816,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
};
struct Reader : IThreadPoolReceiver {
const UID logId;
double readValueTimeout;
double readValuePrefixTimeout;
double readRangeTimeout;
int threadIndex;
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
explicit Reader(int threadIndex, std::shared_ptr<RocksDBMetrics> rocksDBMetrics)
: threadIndex(threadIndex), rocksDBMetrics(rocksDBMetrics) {
explicit Reader(UID logId, int threadIndex, std::shared_ptr<RocksDBMetrics> rocksDBMetrics)
: logId(logId), threadIndex(threadIndex), rocksDBMetrics(rocksDBMetrics) {
if (g_network->isSimulated()) {
// In simulation, increasing the read operation timeouts to 5 minutes, as some of the tests have
// very high load and single read thread cannot process all the load within the timeouts.
@ -1751,7 +1842,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
struct ReadValueAction : TypedAction<Reader, ReadValueAction> {
Key key;
DataShard* shard;
PhysicalShard* shard;
Optional<UID> debugID;
double startTime;
bool getHistograms;
@ -1759,7 +1850,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
bool logShardMemUsage;
ThreadReturnPromise<Optional<Value>> result;
ReadValueAction(KeyRef key, DataShard* shard, Optional<UID> debugID)
ReadValueAction(KeyRef key, PhysicalShard* shard, Optional<UID> debugID)
: key(key), shard(shard), debugID(debugID), startTime(timer_monotonic()),
getHistograms(
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false),
@ -1797,13 +1888,13 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
rocksdb::PinnableSlice value;
auto options = getReadOptions();
auto db = a.shard->physicalShard->db;
auto db = a.shard->db;
uint64_t deadlineMircos =
db->GetEnv()->NowMicros() + (readValueTimeout - (timer_monotonic() - a.startTime)) * 1000000;
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0;
auto s = db->Get(options, a.shard->physicalShard->cf, toSlice(a.key), &value);
auto s = db->Get(options, a.shard->cf, toSlice(a.key), &value);
if (a.getHistograms) {
rocksDBMetrics->getReadValueGetHistogram(threadIndex)
@ -1836,14 +1927,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
struct ReadValuePrefixAction : TypedAction<Reader, ReadValuePrefixAction> {
Key key;
int maxLength;
DataShard* shard;
PhysicalShard* shard;
Optional<UID> debugID;
double startTime;
bool getHistograms;
bool getPerfContext;
bool logShardMemUsage;
ThreadReturnPromise<Optional<Value>> result;
ReadValuePrefixAction(Key key, int maxLength, DataShard* shard, Optional<UID> debugID)
ReadValuePrefixAction(Key key, int maxLength, PhysicalShard* shard, Optional<UID> debugID)
: key(key), maxLength(maxLength), shard(shard), debugID(debugID), startTime(timer_monotonic()),
getHistograms(
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false),
@ -1854,6 +1946,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
: false){};
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
};
void action(ReadValuePrefixAction& a) {
if (a.getPerfContext) {
rocksDBMetrics->resetPerfContext();
@ -1881,14 +1974,14 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
rocksdb::PinnableSlice value;
auto options = getReadOptions();
auto db = a.shard->physicalShard->db;
auto db = a.shard->db;
uint64_t deadlineMircos =
db->GetEnv()->NowMicros() + (readValuePrefixTimeout - (timer_monotonic() - a.startTime)) * 1000000;
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0;
auto s = db->Get(options, a.shard->physicalShard->cf, toSlice(a.key), &value);
auto s = db->Get(options, a.shard->cf, toSlice(a.key), &value);
if (a.getHistograms) {
rocksDBMetrics->getReadPrefixGetHistogram(threadIndex)
@ -1922,7 +2015,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
KeyRange keys;
std::vector<DataShard*> shards;
std::vector<std::pair<PhysicalShard*, KeyRange>> shardRanges;
int rowLimit, byteLimit;
double startTime;
bool getHistograms;
@ -1930,16 +2023,21 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
bool logShardMemUsage;
ThreadReturnPromise<RangeResult> result;
ReadRangeAction(KeyRange keys, std::vector<DataShard*> shards, int rowLimit, int byteLimit)
: keys(keys), shards(shards), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()),
: keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()),
getHistograms(
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false),
getPerfContext(
(SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE != 0) &&
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE)
? true
: false) {}
: false) {
for (const DataShard* shard : shards) {
shardRanges.emplace_back(shard->physicalShard, keys & shard->range);
}
}
double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
};
void action(ReadRangeAction& a) {
if (a.getPerfContext) {
rocksDBMetrics->resetPerfContext();
@ -1966,7 +2064,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
if (rowLimit < 0) {
// Reverses the shard order so we could read range in reverse direction.
std::reverse(a.shards.begin(), a.shards.end());
std::reverse(a.shardRanges.begin(), a.shardRanges.end());
}
// TODO: consider multi-thread reads. It's possible to read multiple shards in parallel. However, the number
@ -1975,11 +2073,9 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
// performance improvement when the actual number of rows to read is very small.
int accumulatedBytes = 0;
int numShards = 0;
for (auto shard : a.shards) {
auto range = shard->range;
KeyRange readRange = KeyRange(a.keys & range);
auto bytesRead = readRangeInDb(shard, readRange, rowLimit, byteLimit, &result);
for (auto& [shard, range] : a.shardRanges) {
ASSERT(shard != nullptr && shard->initialized());
auto bytesRead = readRangeInDb(shard, range, rowLimit, byteLimit, &result);
if (bytesRead < 0) {
// Error reading an instance.
a.result.sendError(internal_error());
@ -2031,7 +2127,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
errorListener(std::make_shared<RocksDBErrorListener>()), errorFuture(errorListener->getFuture()),
shardManager(path), rocksDBMetrics(new RocksDBMetrics()) {
shardManager(path, id), rocksDBMetrics(new RocksDBMetrics()) {
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
// is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
// block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
@ -2049,10 +2145,10 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
writeThread = createGenericThreadPool();
readThreads = createGenericThreadPool();
}
writeThread->addThread(new Writer(0, shardManager.getColumnFamilyMap(), rocksDBMetrics), "fdb-rocksdb-wr");
writeThread->addThread(new Writer(id, 0, shardManager.getColumnFamilyMap(), rocksDBMetrics), "fdb-rocksdb-wr");
TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(new Reader(i, rocksDBMetrics), "fdb-rocksdb-re");
readThreads->addThread(new Reader(id, i, rocksDBMetrics), "fdb-rocksdb-re");
}
}
@ -2081,7 +2177,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
void close() override { doClose(this, false); }
KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); }
KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_SHARDED_ROCKSDB); }
Future<Void> init() override {
if (openFuture.isValid()) {
@ -2162,15 +2258,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType type, Optional<UID> debugID) override {
auto shard = shardManager.getDataShard(key);
if (shard == nullptr) {
auto* shard = shardManager.getDataShard(key);
if (shard == nullptr || !shard->physicalShard->initialized()) {
// TODO: read non-exist system key range should not cause an error.
TraceEvent(SevError, "ShardedRocksDB").detail("Detail", "Read non-exist key range").detail("ReadKey", key);
return Optional<Value>();
}
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValueAction(key, shard, debugID);
auto a = new Reader::ReadValueAction(key, shard->physicalShard, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
@ -2180,7 +2276,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValueAction>(key, shard, debugID);
auto a = std::make_unique<Reader::ReadValueAction>(key, shard->physicalShard, debugID);
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
@ -2188,8 +2284,28 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
int maxLength,
IKeyValueStore::ReadType type,
Optional<UID> debugID) override {
auto* shard = shardManager.getDataShard(key);
if (shard == nullptr || !shard->physicalShard->initialized()) {
// TODO: read non-exist system key range should not cause an error.
TraceEvent(SevWarnAlways, "ShardedRocksDB")
.detail("Detail", "Read non-exist key range")
.detail("ReadKey", key);
return Optional<Value>();
}
return Optional<Value>();
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, shard->physicalShard, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValuePrefixAction>(key, maxLength, shard->physicalShard, debugID);
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
ACTOR static Future<Standalone<RangeResultRef>> read(Reader::ReadRangeAction* action,
@ -2216,8 +2332,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
int rowLimit,
int byteLimit,
IKeyValueStore::ReadType type) override {
TraceEvent(SevVerbose, "ShardedRocksReadRangeBegin", this->id).detail("Range", keys);
auto shards = shardManager.getDataShardsByRange(keys);
for (DataShard* shard : shards) {
if (shard == nullptr || !shard->physicalShard->initialized()) {
return RangeResult();
}
}
if (!shouldThrottle(type, keys.begin)) {
auto a = new Reader::ReadRangeAction(keys, shards, rowLimit, byteLimit);
auto res = a->result.getFuture();
@ -2594,6 +2717,8 @@ TEST_CASE("noSim/ShardedRocksDB/ShardOps") {
TEST_CASE("noSim/ShardedRocksDB/Metadata") {
state std::string rocksDBTestDir = "sharded-rocksdb-kvs-test-db";
state Key testSpecialKey = "\xff\xff/TestKey"_sr;
state Value testSpecialValue = "\xff\xff/TestValue"_sr;
platform::eraseDirectoryRecursive(rocksDBTestDir);
state ShardedRocksDBKeyValueStore* rocksdbStore =
@ -2601,6 +2726,15 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
state IKeyValueStore* kvStore = rocksdbStore;
wait(kvStore->init());
Optional<Value> val = wait(kvStore->readValue(testSpecialKey));
ASSERT(!val.present());
kvStore->set(KeyValueRef(testSpecialKey, testSpecialValue));
wait(kvStore->commit(false));
Optional<Value> val = wait(kvStore->readValue(testSpecialKey));
ASSERT(val.get() == testSpecialValue);
// Add some ranges.
std::vector<Future<Void>> addRangeFutures;
addRangeFutures.push_back(kvStore->addRange(KeyRangeRef("a"_sr, "c"_sr), "shard-1"));
@ -2620,6 +2754,11 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
kvStore = rocksdbStore;
wait(kvStore->init());
{
Optional<Value> val = wait(kvStore->readValue(testSpecialKey));
ASSERT(val.get() == testSpecialValue);
}
// Read value back.
Optional<Value> val = wait(kvStore->readValue("a1"_sr));
ASSERT(val == Optional<Value>("foo"_sr));

View File

@ -161,6 +161,11 @@ extern IKeyValueStore* keyValueStoreRocksDB(std::string const& path,
KeyValueStoreType storeType,
bool checkChecksums = false,
bool checkIntegrity = false);
extern IKeyValueStore* keyValueStoreShardedRocksDB(std::string const& path,
UID logID,
KeyValueStoreType storeType,
bool checkChecksums = false,
bool checkIntegrity = false);
extern IKeyValueStore* keyValueStoreMemory(std::string const& basename,
UID logID,
int64_t memoryLimit,
@ -204,7 +209,7 @@ inline IKeyValueStore* openKVStore(KeyValueStoreType storeType,
case KeyValueStoreType::SSD_ROCKSDB_V1:
return keyValueStoreRocksDB(filename, logID, storeType);
case KeyValueStoreType::SSD_SHARDED_ROCKSDB:
return keyValueStoreRocksDB(filename, logID, storeType); // TODO: to replace the KVS in the future
return keyValueStoreShardedRocksDB(filename, logID, storeType, checkChecksums, checkIntegrity);
case KeyValueStoreType::MEMORY_RADIXTREE:
return keyValueStoreMemory(filename,
logID,