mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 09:58:50 +08:00
Rocksdb storage using single_key_deletes instead of deleterange on clearrange operation. (#8452)
This commit is contained in:
parent
9987da5a4f
commit
a1eb1d4a48
@ -421,6 +421,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||
// Enable this knob only for experminatal purpose, never enable this in production.
|
||||
// If enabled, all the committed in-memory memtable writes are lost on a crash.
|
||||
init( ROCKSDB_DISABLE_WAL_EXPERIMENTAL, false );
|
||||
// If ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE is enabled, disable ENABLE_CLEAR_RANGE_EAGER_READS knob.
|
||||
// These knobs have contrary functionality.
|
||||
init( ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE, false ); if( randomize && BUGGIFY ) ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE = deterministicRandom()->coinflip() ? false : true;
|
||||
init( ROCKSDB_SINGLEKEY_DELETES_BYTES_LIMIT, 200000 ); // 200KB
|
||||
// Can commit will delay ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD seconds for
|
||||
// ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD times, if rocksdb overloaded.
|
||||
// Set ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD to 0, to disable
|
||||
@ -788,7 +792,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||
init( RANGESTREAM_LIMIT_BYTES, 2e6 ); if( randomize && BUGGIFY ) RANGESTREAM_LIMIT_BYTES = 1;
|
||||
init( CHANGEFEEDSTREAM_LIMIT_BYTES, 1e6 ); if( randomize && BUGGIFY ) CHANGEFEEDSTREAM_LIMIT_BYTES = 1;
|
||||
init( BLOBWORKERSTATUSSTREAM_LIMIT_BYTES, 1e4 ); if( randomize && BUGGIFY ) BLOBWORKERSTATUSSTREAM_LIMIT_BYTES = 1;
|
||||
init( ENABLE_CLEAR_RANGE_EAGER_READS, true );
|
||||
init( ENABLE_CLEAR_RANGE_EAGER_READS, true ); if( randomize && BUGGIFY ) ENABLE_CLEAR_RANGE_EAGER_READS = deterministicRandom()->coinflip() ? false : true;
|
||||
init( CHECKPOINT_TRANSFER_BLOCK_BYTES, 40e6 );
|
||||
init( QUICK_GET_VALUE_FALLBACK, true );
|
||||
init( QUICK_GET_KEY_VALUES_FALLBACK, true );
|
||||
|
@ -345,6 +345,8 @@ public:
|
||||
int ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD;
|
||||
int ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD;
|
||||
bool ROCKSDB_DISABLE_WAL_EXPERIMENTAL;
|
||||
bool ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE;
|
||||
int64_t ROCKSDB_SINGLEKEY_DELETES_BYTES_LIMIT;
|
||||
int64_t ROCKSDB_COMPACTION_READAHEAD_SIZE;
|
||||
int64_t ROCKSDB_BLOCK_SIZE;
|
||||
bool ENABLE_SHARDED_ROCKSDB;
|
||||
|
@ -53,7 +53,11 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
|
||||
void set(KeyValueRef keyValue, const Arena* arena = nullptr) override {
|
||||
store->set(KeyValueRef(keyValue.key, pack(keyValue.value)), arena);
|
||||
}
|
||||
void clear(KeyRangeRef range, const Arena* arena = nullptr) override { store->clear(range, arena); }
|
||||
void clear(KeyRangeRef range,
|
||||
const StorageServerMetrics* storageMetrics = nullptr,
|
||||
const Arena* arena = nullptr) override {
|
||||
store->clear(range, storageMetrics, arena);
|
||||
}
|
||||
Future<Void> commit(bool sequential = false) override { return store->commit(sequential); }
|
||||
|
||||
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options) override {
|
||||
|
@ -130,7 +130,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void clear(KeyRangeRef range, const Arena* arena) override {
|
||||
void clear(KeyRangeRef range, const StorageServerMetrics* storageMetrics, const Arena* arena) override {
|
||||
// A commit that occurs with no available space returns Never, so we can throw out all modifications
|
||||
if (getAvailableSize() <= 0)
|
||||
return;
|
||||
|
@ -1846,22 +1846,52 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
void set(KeyValueRef kv, const Arena*) override {
|
||||
if (writeBatch == nullptr) {
|
||||
writeBatch.reset(new rocksdb::WriteBatch());
|
||||
keysSet.clear();
|
||||
}
|
||||
ASSERT(defaultFdbCF != nullptr);
|
||||
writeBatch->Put(defaultFdbCF, toSlice(kv.key), toSlice(kv.value));
|
||||
if (SERVER_KNOBS->ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE) {
|
||||
keysSet.insert(kv.key);
|
||||
}
|
||||
}
|
||||
|
||||
void clear(KeyRangeRef keyRange, const Arena*) override {
|
||||
void clear(KeyRangeRef keyRange, const StorageServerMetrics* storageMetrics, const Arena*) override {
|
||||
if (writeBatch == nullptr) {
|
||||
writeBatch.reset(new rocksdb::WriteBatch());
|
||||
keysSet.clear();
|
||||
}
|
||||
|
||||
ASSERT(defaultFdbCF != nullptr);
|
||||
|
||||
if (keyRange.singleKeyRange()) {
|
||||
writeBatch->Delete(defaultFdbCF, toSlice(keyRange.begin));
|
||||
} else {
|
||||
writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end));
|
||||
if (SERVER_KNOBS->ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE && storageMetrics != nullptr &&
|
||||
storageMetrics->byteSample.getEstimate(keyRange) <
|
||||
SERVER_KNOBS->ROCKSDB_SINGLEKEY_DELETES_BYTES_LIMIT) {
|
||||
rocksdb::ReadOptions options = sharedState->getReadOptions();
|
||||
auto beginSlice = toSlice(keyRange.begin);
|
||||
auto endSlice = toSlice(keyRange.end);
|
||||
options.iterate_lower_bound = &beginSlice;
|
||||
options.iterate_upper_bound = &endSlice;
|
||||
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options, defaultFdbCF));
|
||||
cursor->Seek(toSlice(keyRange.begin));
|
||||
while (cursor->Valid() && toStringRef(cursor->key()) < keyRange.end) {
|
||||
writeBatch->Delete(defaultFdbCF, cursor->key());
|
||||
cursor->Next();
|
||||
}
|
||||
if (!cursor->status().ok()) {
|
||||
// if readrange iteration fails, then do a deleteRange.
|
||||
writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end));
|
||||
} else {
|
||||
auto it = keysSet.lower_bound(keyRange.begin);
|
||||
while (it != keysSet.end() && *it < keyRange.end) {
|
||||
writeBatch->Delete(defaultFdbCF, toSlice(*it));
|
||||
it++;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1890,6 +1920,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
}
|
||||
auto a = new Writer::CommitAction();
|
||||
a->batchToCommit = std::move(writeBatch);
|
||||
keysSet.clear();
|
||||
auto res = a->done.getFuture();
|
||||
writeThread->post(a);
|
||||
return res;
|
||||
@ -2083,6 +2114,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
Promise<Void> closePromise;
|
||||
Future<Void> openFuture;
|
||||
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
|
||||
std::set<Key> keysSet;
|
||||
Optional<Future<Void>> metrics;
|
||||
FlowLock readSemaphore;
|
||||
int numReadWaiters;
|
||||
|
@ -1596,7 +1596,9 @@ public:
|
||||
StorageBytes getStorageBytes() const override;
|
||||
|
||||
void set(KeyValueRef keyValue, const Arena* arena = nullptr) override;
|
||||
void clear(KeyRangeRef range, const Arena* arena = nullptr) override;
|
||||
void clear(KeyRangeRef range,
|
||||
const StorageServerMetrics* storageMetrics = nullptr,
|
||||
const Arena* arena = nullptr) override;
|
||||
Future<Void> commit(bool sequential = false) override;
|
||||
|
||||
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> optionss) override;
|
||||
@ -2215,7 +2217,7 @@ void KeyValueStoreSQLite::set(KeyValueRef keyValue, const Arena* arena) {
|
||||
++writesRequested;
|
||||
writeThread->post(new Writer::SetAction(keyValue));
|
||||
}
|
||||
void KeyValueStoreSQLite::clear(KeyRangeRef range, const Arena* arena) {
|
||||
void KeyValueStoreSQLite::clear(KeyRangeRef range, const StorageServerMetrics* storageMetrics, const Arena* arena) {
|
||||
++writesRequested;
|
||||
writeThread->post(new Writer::ClearAction(range));
|
||||
}
|
||||
|
@ -2316,7 +2316,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
||||
|
||||
void set(KeyValueRef kv, const Arena*) override { shardManager.put(kv.key, kv.value); }
|
||||
|
||||
void clear(KeyRangeRef range, const Arena*) override {
|
||||
void clear(KeyRangeRef range, const StorageServerMetrics*, const Arena*) override {
|
||||
if (range.singleKeyRange()) {
|
||||
shardManager.clear(range.begin);
|
||||
} else {
|
||||
|
@ -8187,7 +8187,9 @@ public:
|
||||
|
||||
Future<Void> getError() const override { return delayed(m_error.getFuture()); };
|
||||
|
||||
void clear(KeyRangeRef range, const Arena* arena = 0) override {
|
||||
void clear(KeyRangeRef range,
|
||||
const StorageServerMetrics* storageMetrics = nullptr,
|
||||
const Arena* arena = 0) override {
|
||||
debug_printf("CLEAR %s\n", printable(range).c_str());
|
||||
m_tree->clear(range);
|
||||
}
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include "fdbserver/IClosable.h"
|
||||
#include "fdbserver/IPageEncryptionKeyProvider.actor.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/StorageMetrics.h"
|
||||
|
||||
struct CheckpointRequest {
|
||||
const Version version; // The FDB version at which the checkpoint is created.
|
||||
@ -52,7 +53,9 @@ public:
|
||||
// persistRangeMapping().
|
||||
virtual bool shardAware() const { return false; }
|
||||
virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0;
|
||||
virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0;
|
||||
virtual void clear(KeyRangeRef range,
|
||||
const StorageServerMetrics* storageMetrics = nullptr,
|
||||
const Arena* arena = nullptr) = 0;
|
||||
virtual Future<Void> canCommit() { return Void(); }
|
||||
virtual Future<Void> commit(
|
||||
bool sequential = false) = 0; // returns when prior sets and clears are (atomically) durable
|
||||
|
@ -390,7 +390,9 @@ struct RemoteIKeyValueStore : public IKeyValueStore {
|
||||
void set(KeyValueRef keyValue, const Arena* arena = nullptr) override {
|
||||
interf.set.send(IKVSSetRequest{ keyValue, ReplyPromise<Void>() });
|
||||
}
|
||||
void clear(KeyRangeRef range, const Arena* arena = nullptr) override {
|
||||
void clear(KeyRangeRef range,
|
||||
const StorageServerMetrics* storageMetrics = nullptr,
|
||||
const Arena* arena = nullptr) override {
|
||||
interf.clear.send(IKVSClearRequest{ range, ReplyPromise<Void>() });
|
||||
}
|
||||
|
||||
|
@ -9474,7 +9474,7 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
|
||||
}
|
||||
|
||||
void StorageServerDisk::clearRange(KeyRangeRef keys) {
|
||||
storage->clear(keys);
|
||||
storage->clear(keys, &data->metrics);
|
||||
++(*kvClearRanges);
|
||||
}
|
||||
|
||||
@ -9488,7 +9488,7 @@ void StorageServerDisk::writeMutation(MutationRef mutation) {
|
||||
storage->set(KeyValueRef(mutation.param1, mutation.param2));
|
||||
*kvCommitLogicalBytes += mutation.expectedSize();
|
||||
} else if (mutation.type == MutationRef::ClearRange) {
|
||||
storage->clear(KeyRangeRef(mutation.param1, mutation.param2));
|
||||
storage->clear(KeyRangeRef(mutation.param1, mutation.param2), &data->metrics);
|
||||
++(*kvClearRanges);
|
||||
} else
|
||||
ASSERT(false);
|
||||
@ -9503,7 +9503,7 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
|
||||
storage->set(KeyValueRef(m.param1, m.param2));
|
||||
*kvCommitLogicalBytes += m.expectedSize();
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
storage->clear(KeyRangeRef(m.param1, m.param2));
|
||||
storage->clear(KeyRangeRef(m.param1, m.param2), &data->metrics);
|
||||
++(*kvClearRanges);
|
||||
}
|
||||
}
|
||||
@ -9932,7 +9932,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
||||
++data->counters.kvSystemClearRanges;
|
||||
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
|
||||
// DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange);
|
||||
storage->clear(clearRange);
|
||||
storage->clear(clearRange, &data->metrics);
|
||||
++data->counters.kvSystemClearRanges;
|
||||
data->byteSampleApplyClear(clearRange, invalidVersion);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user