Merge branch 'main' into fix-ub-in-keyafter

This commit is contained in:
A.J. Beamon 2022-11-04 11:46:29 -07:00
commit ec088c98c1
5 changed files with 46 additions and 12 deletions

View File

@ -960,7 +960,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( KMS_CONNECTOR_TYPE, "RESTKmsConnector" );
// Blob granlues
init( BG_URL, isSimulated ? "file://fdbblob/" : "" ); // TODO: store in system key space or something, eventually
init( BG_URL, isSimulated ? "file://simfdb/fdbblob/" : "" ); // TODO: store in system key space or something, eventually
bool buggifyMediumGranules = simulationMediumShards || (randomize && BUGGIFY);
// BlobGranuleVerify* simulation tests use "knobs", BlobGranuleCorrectness* use "tenant", default in real clusters is "knobs"
init( BG_METADATA_SOURCE, "knobs" );

View File

@ -155,7 +155,7 @@ struct ShardedRocksDBState {
std::shared_ptr<rocksdb::Cache> rocksdb_block_cache = nullptr;
rocksdb::Slice toSlice(StringRef s) {
const rocksdb::Slice toSlice(StringRef s) {
return rocksdb::Slice(reinterpret_cast<const char*>(s.begin()), s.size());
}
@ -309,8 +309,20 @@ struct ReadIterator {
bool inUse;
std::shared_ptr<rocksdb::Iterator> iter;
double creationTime;
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, rocksdb::ReadOptions& options)
KeyRange keyRange;
std::shared_ptr<rocksdb::Slice> beginSlice, endSlice;
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, const rocksdb::ReadOptions& options)
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, const KeyRange& range)
: index(index), inUse(true), creationTime(now()), keyRange(range) {
auto options = getReadOptions();
beginSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.begin)));
options.iterate_lower_bound = beginSlice.get();
endSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.end)));
options.iterate_upper_bound = endSlice.get();
iter = std::shared_ptr<rocksdb::Iterator>(db->NewIterator(options, cf));
}
};
/*
@ -348,7 +360,8 @@ public:
}
// Called on every read operation.
ReadIterator getIterator() {
ReadIterator getIterator(const KeyRange& range) {
// Shared iterators are not bounded.
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
@ -364,7 +377,7 @@ public:
return iter;
} else {
index++;
ReadIterator iter(cf, index, db, readRangeOptions);
ReadIterator iter(cf, index, db, range);
return iter;
}
}
@ -511,7 +524,7 @@ struct PhysicalShard {
double deleteTimeSec;
};
int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit, int byteLimit, RangeResult* result) {
int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, int byteLimit, RangeResult* result) {
if (rowLimit == 0 || byteLimit == 0) {
return 0;
}
@ -523,7 +536,7 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit,
// When using a prefix extractor, ensure that keys are returned in order even if they cross
// a prefix boundary.
if (rowLimit >= 0) {
ReadIterator readIter = shard->readIterPool->getIterator();
ReadIterator readIter = shard->readIterPool->getIterator(range);
auto cursor = readIter.iter;
cursor->Seek(toSlice(range.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < range.end) {
@ -540,7 +553,7 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef& range, int rowLimit,
s = cursor->status();
shard->readIterPool->returnIterator(readIter);
} else {
ReadIterator readIter = shard->readIterPool->getIterator();
ReadIterator readIter = shard->readIterPool->getIterator(range);
auto cursor = readIter.iter;
cursor->SeekForPrev(toSlice(range.end));
if (cursor->Valid() && toStringRef(cursor->key()) == range.end) {
@ -2150,10 +2163,16 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
: keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()),
getHistograms(
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false) {
std::set<PhysicalShard*> usedShards;
for (const DataShard* shard : shards) {
if (shard != nullptr) {
shardRanges.emplace_back(shard->physicalShard, keys & shard->range);
}
ASSERT(shard);
shardRanges.emplace_back(shard->physicalShard, keys & shard->range);
usedShards.insert(shard->physicalShard);
}
if (usedShards.size() != shards.size()) {
TraceEvent("ReadRangeMetrics")
.detail("NumPhysicalShards", usedShards.size())
.detail("NumDataShards", shards.size());
}
}
double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }

View File

@ -2149,7 +2149,7 @@ int main(int argc, char* argv[]) {
auto dataFolder = opts.dataFolder.size() ? opts.dataFolder : "simfdb";
std::vector<std::string> directories = platform::listDirectories(dataFolder);
const std::set<std::string> allowedDirectories = { ".", "..", "backups", "unittests" };
const std::set<std::string> allowedDirectories = { ".", "..", "backups", "unittests", "fdbblob" };
for (const auto& dir : directories) {
if (dir.size() != 32 && allowedDirectories.count(dir) == 0 && dir.find("snap") == std::string::npos) {

View File

@ -44,6 +44,8 @@ const StringRef TLOG_MSGS_PTREE_UPDATES_LATENCY_HISTOGRAM = "TLogMsgsPTreeUpdate
const StringRef STORAGE_UPDATES_DURABLE_LATENCY_HISTOGRAM = "StorageUpdatesDurableLatency"_sr;
const StringRef STORAGE_COMMIT_LATENCY_HISTOGRAM = "StorageCommitLatency"_sr;
const StringRef SS_DURABLE_VERSION_UPDATE_LATENCY_HISTOGRAM = "SSDurableVersionUpdateLatency"_sr;
const StringRef SS_READ_RANGE_BYTES_RETURNED_HISTOGRAM = "SSReadRangeBytesReturned"_sr;
const StringRef SS_READ_RANGE_BYTES_LIMIT_HISTOGRAM = "SSReadRangeBytesLimit"_sr;
struct StorageMetricSample {
IndexedSet<Key, int64_t> sample;

View File

@ -735,6 +735,9 @@ public:
Reference<Histogram> storageUpdatesDurableLatencyHistogram;
Reference<Histogram> storageCommitLatencyHistogram;
Reference<Histogram> ssDurableVersionUpdateLatencyHistogram;
// Histograms of requests sent to KVS.
Reference<Histogram> readRangeBytesReturnedHistogram;
Reference<Histogram> readRangeBytesLimitHistogram;
// watch map operations
Reference<ServerWatchMetadata> getWatchMetadata(KeyRef key) const;
@ -1296,6 +1299,12 @@ public:
ssDurableVersionUpdateLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
SS_DURABLE_VERSION_UPDATE_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
readRangeBytesReturnedHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
SS_READ_RANGE_BYTES_RETURNED_HISTOGRAM,
Histogram::Unit::countLinear)),
readRangeBytesLimitHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP,
SS_READ_RANGE_BYTES_LIMIT_HISTOGRAM,
Histogram::Unit::countLinear)),
tag(invalidTag), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0),
storage(this, storage), shardChangeCounter(0), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
prevVersion(0), rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
@ -3460,6 +3469,8 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
RangeResult atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, options));
data->counters.kvScanBytes += atStorageVersion.logicalSize();
data->readRangeBytesReturnedHistogram->sample(atStorageVersion.logicalSize());
data->readRangeBytesLimitHistogram->sample(*pLimitBytes);
ASSERT(atStorageVersion.size() <= limit);
if (data->storageVersion() > version) {
@ -3555,6 +3566,8 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
RangeResult atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, options));
data->counters.kvScanBytes += atStorageVersion.logicalSize();
data->readRangeBytesReturnedHistogram->sample(atStorageVersion.logicalSize());
data->readRangeBytesLimitHistogram->sample(*pLimitBytes);
ASSERT(atStorageVersion.size() <= -limit);
if (data->storageVersion() > version)