diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index dca0e22058..3dc2dd8528 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -421,6 +421,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi // Enable this knob only for experminatal purpose, never enable this in production. // If enabled, all the committed in-memory memtable writes are lost on a crash. init( ROCKSDB_DISABLE_WAL_EXPERIMENTAL, false ); + // If ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE is enabled, disable ENABLE_CLEAR_RANGE_EAGER_READS knob. + // These knobs have contrary functionality. + init( ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE, false ); if( randomize && BUGGIFY ) ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE = deterministicRandom()->coinflip() ? false : true; + init( ROCKSDB_SINGLEKEY_DELETES_BYTES_LIMIT, 200000 ); // 200KB // Can commit will delay ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD seconds for // ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD times, if rocksdb overloaded. // Set ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD to 0, to disable @@ -788,7 +792,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( RANGESTREAM_LIMIT_BYTES, 2e6 ); if( randomize && BUGGIFY ) RANGESTREAM_LIMIT_BYTES = 1; init( CHANGEFEEDSTREAM_LIMIT_BYTES, 1e6 ); if( randomize && BUGGIFY ) CHANGEFEEDSTREAM_LIMIT_BYTES = 1; init( BLOBWORKERSTATUSSTREAM_LIMIT_BYTES, 1e4 ); if( randomize && BUGGIFY ) BLOBWORKERSTATUSSTREAM_LIMIT_BYTES = 1; - init( ENABLE_CLEAR_RANGE_EAGER_READS, true ); + init( ENABLE_CLEAR_RANGE_EAGER_READS, true ); if( randomize && BUGGIFY ) ENABLE_CLEAR_RANGE_EAGER_READS = deterministicRandom()->coinflip() ? false : true; init( CHECKPOINT_TRANSFER_BLOCK_BYTES, 40e6 ); init( QUICK_GET_VALUE_FALLBACK, true ); init( QUICK_GET_KEY_VALUES_FALLBACK, true ); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 8b172bd438..0f9cc0ccaa 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -345,6 +345,8 @@ public: int ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD; int ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD; bool ROCKSDB_DISABLE_WAL_EXPERIMENTAL; + bool ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE; + int64_t ROCKSDB_SINGLEKEY_DELETES_BYTES_LIMIT; int64_t ROCKSDB_COMPACTION_READAHEAD_SIZE; int64_t ROCKSDB_BLOCK_SIZE; bool ENABLE_SHARDED_ROCKSDB; diff --git a/fdbserver/KeyValueStoreCompressTestData.actor.cpp b/fdbserver/KeyValueStoreCompressTestData.actor.cpp index a5098baf4e..7aa99b21ba 100644 --- a/fdbserver/KeyValueStoreCompressTestData.actor.cpp +++ b/fdbserver/KeyValueStoreCompressTestData.actor.cpp @@ -53,7 +53,11 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore { void set(KeyValueRef keyValue, const Arena* arena = nullptr) override { store->set(KeyValueRef(keyValue.key, pack(keyValue.value)), arena); } - void clear(KeyRangeRef range, const Arena* arena = nullptr) override { store->clear(range, arena); } + void clear(KeyRangeRef range, + const StorageServerMetrics* storageMetrics = nullptr, + const Arena* arena = nullptr) override { + store->clear(range, storageMetrics, arena); + } Future<Void> commit(bool sequential = false) override { return store->commit(sequential); } Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options) override { diff --git a/fdbserver/KeyValueStoreMemory.actor.cpp b/fdbserver/KeyValueStoreMemory.actor.cpp index e055bab003..73478aaa41 100644 --- a/fdbserver/KeyValueStoreMemory.actor.cpp +++ b/fdbserver/KeyValueStoreMemory.actor.cpp @@ -130,7 +130,7 @@ public: } } - void clear(KeyRangeRef range, const Arena* arena) override { + void clear(KeyRangeRef range, const StorageServerMetrics* storageMetrics, const Arena* arena) override { // A commit that occurs with no available space returns Never, so we can throw out all modifications if (getAvailableSize() <= 0) return; diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 911c074f06..19470b9877 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -1846,22 +1846,52 @@ struct RocksDBKeyValueStore : IKeyValueStore { void set(KeyValueRef kv, const Arena*) override { if (writeBatch == nullptr) { writeBatch.reset(new rocksdb::WriteBatch()); + keysSet.clear(); } ASSERT(defaultFdbCF != nullptr); writeBatch->Put(defaultFdbCF, toSlice(kv.key), toSlice(kv.value)); + if (SERVER_KNOBS->ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE) { + keysSet.insert(kv.key); + } } - void clear(KeyRangeRef keyRange, const Arena*) override { + void clear(KeyRangeRef keyRange, const StorageServerMetrics* storageMetrics, const Arena*) override { if (writeBatch == nullptr) { writeBatch.reset(new rocksdb::WriteBatch()); + keysSet.clear(); } ASSERT(defaultFdbCF != nullptr); - if (keyRange.singleKeyRange()) { writeBatch->Delete(defaultFdbCF, toSlice(keyRange.begin)); } else { - writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end)); + if (SERVER_KNOBS->ROCKSDB_SINGLEKEY_DELETES_ON_CLEARRANGE && storageMetrics != nullptr && + storageMetrics->byteSample.getEstimate(keyRange) < + SERVER_KNOBS->ROCKSDB_SINGLEKEY_DELETES_BYTES_LIMIT) { + rocksdb::ReadOptions options = sharedState->getReadOptions(); + auto beginSlice = toSlice(keyRange.begin); + auto endSlice = toSlice(keyRange.end); + options.iterate_lower_bound = &beginSlice; + options.iterate_upper_bound = &endSlice; + auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options, defaultFdbCF)); + cursor->Seek(toSlice(keyRange.begin)); + while (cursor->Valid() && toStringRef(cursor->key()) < keyRange.end) { + writeBatch->Delete(defaultFdbCF, cursor->key()); + cursor->Next(); + } + if (!cursor->status().ok()) { + // if readrange iteration fails, then do a deleteRange. + writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end)); + } else { + auto it = keysSet.lower_bound(keyRange.begin); + while (it != keysSet.end() && *it < keyRange.end) { + writeBatch->Delete(defaultFdbCF, toSlice(*it)); + it++; + } + } + } else { + writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end)); + } } } @@ -1890,6 +1920,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { } auto a = new Writer::CommitAction(); a->batchToCommit = std::move(writeBatch); + keysSet.clear(); auto res = a->done.getFuture(); writeThread->post(a); return res; @@ -2083,6 +2114,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { Promise<Void> closePromise; Future<Void> openFuture; std::unique_ptr<rocksdb::WriteBatch> writeBatch; + std::set<Key> keysSet; Optional<Future<Void>> metrics; FlowLock readSemaphore; int numReadWaiters; diff --git a/fdbserver/KeyValueStoreSQLite.actor.cpp b/fdbserver/KeyValueStoreSQLite.actor.cpp index 634beb190c..1c95c64e8a 100644 --- a/fdbserver/KeyValueStoreSQLite.actor.cpp +++ b/fdbserver/KeyValueStoreSQLite.actor.cpp @@ -1596,7 +1596,9 @@ public: StorageBytes getStorageBytes() const override; void set(KeyValueRef keyValue, const Arena* arena = nullptr) override; - void clear(KeyRangeRef range, const Arena* arena = nullptr) override; + void clear(KeyRangeRef range, + const StorageServerMetrics* storageMetrics = nullptr, + const Arena* arena = nullptr) override; Future<Void> commit(bool sequential = false) override; Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> optionss) override; @@ -2215,7 +2217,7 @@ void KeyValueStoreSQLite::set(KeyValueRef keyValue, const Arena* arena) { ++writesRequested; writeThread->post(new Writer::SetAction(keyValue)); } -void KeyValueStoreSQLite::clear(KeyRangeRef range, const Arena* arena) { +void KeyValueStoreSQLite::clear(KeyRangeRef range, const StorageServerMetrics* storageMetrics, const Arena* arena) { ++writesRequested; writeThread->post(new Writer::ClearAction(range)); } diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 64a65dec48..150b1c7213 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -2316,7 +2316,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { void set(KeyValueRef kv, const Arena*) override { shardManager.put(kv.key, kv.value); } - void clear(KeyRangeRef range, const Arena*) override { + void clear(KeyRangeRef range, const StorageServerMetrics*, const Arena*) override { if (range.singleKeyRange()) { shardManager.clear(range.begin); } else { diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index dbe1973b3e..2c39ea23aa 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -8187,7 +8187,9 @@ public: Future<Void> getError() const override { return delayed(m_error.getFuture()); }; - void clear(KeyRangeRef range, const Arena* arena = 0) override { + void clear(KeyRangeRef range, + const StorageServerMetrics* storageMetrics = nullptr, + const Arena* arena = 0) override { debug_printf("CLEAR %s\n", printable(range).c_str()); m_tree->clear(range); } diff --git a/fdbserver/include/fdbserver/IKeyValueStore.h b/fdbserver/include/fdbserver/IKeyValueStore.h index b9679d9f92..3069527d7a 100644 --- a/fdbserver/include/fdbserver/IKeyValueStore.h +++ b/fdbserver/include/fdbserver/IKeyValueStore.h @@ -29,6 +29,7 @@ #include "fdbserver/IClosable.h" #include "fdbserver/IPageEncryptionKeyProvider.actor.h" #include "fdbserver/ServerDBInfo.h" +#include "fdbserver/StorageMetrics.h" struct CheckpointRequest { const Version version; // The FDB version at which the checkpoint is created. @@ -52,7 +53,9 @@ public: // persistRangeMapping(). virtual bool shardAware() const { return false; } virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0; - virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0; + virtual void clear(KeyRangeRef range, + const StorageServerMetrics* storageMetrics = nullptr, + const Arena* arena = nullptr) = 0; virtual Future<Void> canCommit() { return Void(); } virtual Future<Void> commit( bool sequential = false) = 0; // returns when prior sets and clears are (atomically) durable diff --git a/fdbserver/include/fdbserver/RemoteIKeyValueStore.actor.h b/fdbserver/include/fdbserver/RemoteIKeyValueStore.actor.h index e110c480c8..745df0dc9d 100644 --- a/fdbserver/include/fdbserver/RemoteIKeyValueStore.actor.h +++ b/fdbserver/include/fdbserver/RemoteIKeyValueStore.actor.h @@ -390,7 +390,9 @@ struct RemoteIKeyValueStore : public IKeyValueStore { void set(KeyValueRef keyValue, const Arena* arena = nullptr) override { interf.set.send(IKVSSetRequest{ keyValue, ReplyPromise<Void>() }); } - void clear(KeyRangeRef range, const Arena* arena = nullptr) override { + void clear(KeyRangeRef range, + const StorageServerMetrics* storageMetrics = nullptr, + const Arena* arena = nullptr) override { interf.clear.send(IKVSClearRequest{ range, ReplyPromise<Void>() }); } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index f942d8394f..aebec03a41 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -9474,7 +9474,7 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) } void StorageServerDisk::clearRange(KeyRangeRef keys) { - storage->clear(keys); + storage->clear(keys, &data->metrics); ++(*kvClearRanges); } @@ -9488,7 +9488,7 @@ void StorageServerDisk::writeMutation(MutationRef mutation) { storage->set(KeyValueRef(mutation.param1, mutation.param2)); *kvCommitLogicalBytes += mutation.expectedSize(); } else if (mutation.type == MutationRef::ClearRange) { - storage->clear(KeyRangeRef(mutation.param1, mutation.param2)); + storage->clear(KeyRangeRef(mutation.param1, mutation.param2), &data->metrics); ++(*kvClearRanges); } else ASSERT(false); @@ -9503,7 +9503,7 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations, storage->set(KeyValueRef(m.param1, m.param2)); *kvCommitLogicalBytes += m.expectedSize(); } else if (m.type == MutationRef::ClearRange) { - storage->clear(KeyRangeRef(m.param1, m.param2)); + storage->clear(KeyRangeRef(m.param1, m.param2), &data->metrics); ++(*kvClearRanges); } } @@ -9932,7 +9932,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor ++data->counters.kvSystemClearRanges; // TODO(alexmiller): Figure out how to selectively enable spammy data distribution events. // DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange); - storage->clear(clearRange); + storage->clear(clearRange, &data->metrics); ++data->counters.kvSystemClearRanges; data->byteSampleApplyClear(clearRange, invalidVersion); }