#ifdef SSD_ROCKSDB_EXPERIMENTAL #include #include #include #include #include #include #include #include #include "fdbserver/CoroFlow.h" #include "flow/flow.h" #include "flow/IThreadPool.h" #include "flow/ThreadHelper.actor.h" #include #include #include #endif // SSD_ROCKSDB_EXPERIMENTAL #include "fdbserver/IKeyValueStore.h" #include "flow/actorcompiler.h" // has to be last include #ifdef SSD_ROCKSDB_EXPERIMENTAL namespace { rocksdb::Slice toSlice(StringRef s) { return rocksdb::Slice(reinterpret_cast(s.begin()), s.size()); } StringRef toStringRef(rocksdb::Slice s) { return StringRef(reinterpret_cast(s.data()), s.size()); } rocksdb::ColumnFamilyOptions getCFOptions() { rocksdb::ColumnFamilyOptions options; options.level_compaction_dynamic_level_bytes = true; options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES); if (SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS > 0) { options.periodic_compaction_seconds = SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS; } // Compact sstables when there's too much deleted stuff. options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory(128, 1) }; return options; } rocksdb::Options getOptions() { rocksdb::Options options({}, getCFOptions()); options.avoid_unnecessary_blocking_io = true; options.create_if_missing = true; if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0) { options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM); } options.statistics = rocksdb::CreateDBStatistics(); options.statistics->set_stats_level(rocksdb::kExceptHistogramOrTimers); rocksdb::BlockBasedTableOptions bbOpts; // TODO: Add a knob for the block cache size. (Default is 8 MB) if (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0) { // Prefix blooms are used during Seek. options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(SERVER_KNOBS->ROCKSDB_PREFIX_LEN)); // Also turn on bloom filters in the memtable. // TODO: Make a knob for this as well. options.memtable_prefix_bloom_size_ratio = 0.1; // 5 -- Can be read by RocksDB's versions since 6.6.0. Full and partitioned // filters use a generally faster and more accurate Bloom filter // implementation, with a different schema. // https://github.com/facebook/rocksdb/blob/b77569f18bfc77fb1d8a0b3218f6ecf571bc4988/include/rocksdb/table.h#L391 bbOpts.format_version = 5; // Create and apply a bloom filter using the 10 bits // which should yield a ~1% false positive rate: // https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format bbOpts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10)); // The whole key blooms are only used for point lookups. // https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key bbOpts.whole_key_filtering = false; } if (SERVER_KNOBS->ROCKSDB_BLOCK_CACHE_SIZE > 0) { bbOpts.block_cache = rocksdb::NewLRUCache(SERVER_KNOBS->ROCKSDB_BLOCK_CACHE_SIZE); } options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbOpts)); return options; } // Set some useful defaults desired for all reads. rocksdb::ReadOptions getReadOptions() { rocksdb::ReadOptions options; options.background_purge_on_iterator_cleanup = true; return options; } ACTOR Future rocksDBMetricLogger(std::shared_ptr statistics, rocksdb::DB* db) { state std::vector> tickerStats = { { "StallMicros", rocksdb::STALL_MICROS, 0 }, { "BytesRead", rocksdb::BYTES_READ, 0 }, { "IterBytesRead", rocksdb::ITER_BYTES_READ, 0 }, { "BytesWritten", rocksdb::BYTES_WRITTEN, 0 }, { "BlockCacheMisses", rocksdb::BLOCK_CACHE_MISS, 0 }, { "BlockCacheHits", rocksdb::BLOCK_CACHE_HIT, 0 }, { "BloomFilterUseful", rocksdb::BLOOM_FILTER_USEFUL, 0 }, { "BloomFilterFullPositive", rocksdb::BLOOM_FILTER_FULL_POSITIVE, 0 }, { "BloomFilterTruePositive", rocksdb::BLOOM_FILTER_FULL_TRUE_POSITIVE, 0 }, { "BloomFilterMicros", rocksdb::BLOOM_FILTER_MICROS, 0 }, { "MemtableHit", rocksdb::MEMTABLE_HIT, 0 }, { "MemtableMiss", rocksdb::MEMTABLE_MISS, 0 }, { "GetHitL0", rocksdb::GET_HIT_L0, 0 }, { "GetHitL1", rocksdb::GET_HIT_L1, 0 }, { "GetHitL2AndUp", rocksdb::GET_HIT_L2_AND_UP, 0 }, { "CountKeysWritten", rocksdb::NUMBER_KEYS_WRITTEN, 0 }, { "CountKeysRead", rocksdb::NUMBER_KEYS_READ, 0 }, { "CountDBSeek", rocksdb::NUMBER_DB_SEEK, 0 }, { "CountDBNext", rocksdb::NUMBER_DB_NEXT, 0 }, { "CountDBPrev", rocksdb::NUMBER_DB_PREV, 0 }, { "BloomFilterPrefixChecked", rocksdb::BLOOM_FILTER_PREFIX_CHECKED, 0 }, { "BloomFilterPrefixUseful", rocksdb::BLOOM_FILTER_PREFIX_USEFUL, 0 }, { "BlockCacheCompressedMiss", rocksdb::BLOCK_CACHE_COMPRESSED_MISS, 0 }, { "BlockCacheCompressedHit", rocksdb::BLOCK_CACHE_COMPRESSED_HIT, 0 }, { "CountWalFileSyncs", rocksdb::WAL_FILE_SYNCED, 0 }, { "CountWalFileBytes", rocksdb::WAL_FILE_BYTES, 0 }, { "CompactReadBytes", rocksdb::COMPACT_READ_BYTES, 0 }, { "CompactWriteBytes", rocksdb::COMPACT_WRITE_BYTES, 0 }, { "FlushWriteBytes", rocksdb::FLUSH_WRITE_BYTES, 0 }, { "CountBlocksCompressed", rocksdb::NUMBER_BLOCK_COMPRESSED, 0 }, { "CountBlocksDecompressed", rocksdb::NUMBER_BLOCK_DECOMPRESSED, 0 }, { "RowCacheHit", rocksdb::ROW_CACHE_HIT, 0 }, { "RowCacheMiss", rocksdb::ROW_CACHE_MISS, 0 }, { "CountIterSkippedKeys", rocksdb::NUMBER_ITER_SKIP, 0 }, }; state std::vector> propertyStats = { { "NumCompactionsRunning", rocksdb::DB::Properties::kNumRunningCompactions }, { "NumImmutableMemtables", rocksdb::DB::Properties::kNumImmutableMemTable }, { "NumImmutableMemtablesFlushed", rocksdb::DB::Properties::kNumImmutableMemTableFlushed }, { "IsMemtableFlushPending", rocksdb::DB::Properties::kMemTableFlushPending }, { "NumRunningFlushes", rocksdb::DB::Properties::kNumRunningFlushes }, { "IsCompactionPending", rocksdb::DB::Properties::kCompactionPending }, { "NumRunningCompactions", rocksdb::DB::Properties::kNumRunningCompactions }, { "CumulativeBackgroundErrors", rocksdb::DB::Properties::kBackgroundErrors }, { "CurrentSizeActiveMemtable", rocksdb::DB::Properties::kCurSizeActiveMemTable }, { "AllMemtablesBytes", rocksdb::DB::Properties::kCurSizeAllMemTables }, { "ActiveMemtableBytes", rocksdb::DB::Properties::kSizeAllMemTables }, { "CountEntriesActiveMemtable", rocksdb::DB::Properties::kNumEntriesActiveMemTable }, { "CountEntriesImmutMemtables", rocksdb::DB::Properties::kNumEntriesImmMemTables }, { "CountDeletesActiveMemtable", rocksdb::DB::Properties::kNumDeletesActiveMemTable }, { "CountDeletesImmutMemtables", rocksdb::DB::Properties::kNumDeletesImmMemTables }, { "EstimatedCountKeys", rocksdb::DB::Properties::kEstimateNumKeys }, { "EstimateSstReaderBytes", rocksdb::DB::Properties::kEstimateTableReadersMem }, { "CountActiveSnapshots", rocksdb::DB::Properties::kNumSnapshots }, { "OldestSnapshotTime", rocksdb::DB::Properties::kOldestSnapshotTime }, { "CountLiveVersions", rocksdb::DB::Properties::kNumLiveVersions }, { "EstimateLiveDataSize", rocksdb::DB::Properties::kEstimateLiveDataSize }, { "BaseLevel", rocksdb::DB::Properties::kBaseLevel }, { "EstPendCompactBytes", rocksdb::DB::Properties::kEstimatePendingCompactionBytes }, }; loop { wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY)); TraceEvent e("RocksDBMetrics"); for (auto& t : tickerStats) { auto& [name, ticker, cum] = t; uint64_t val = statistics->getTickerCount(ticker); e.detail(name, val - cum); cum = val; } for (auto& p : propertyStats) { auto& [name, property] = p; uint64_t stat = 0; ASSERT(db->GetIntProperty(property, &stat)); e.detail(name, stat); } } } struct RocksDBKeyValueStore : IKeyValueStore { using DB = rocksdb::DB*; using CF = rocksdb::ColumnFamilyHandle*; struct Writer : IThreadPoolReceiver { DB& db; UID id; explicit Writer(DB& db, UID id) : db(db), id(id) {} ~Writer() override { if (db) { delete db; } } void init() override {} Error statusToError(const rocksdb::Status& s) { if (s == rocksdb::Status::IOError()) { return io_error(); } else { return unknown_error(); } } struct OpenAction : TypedAction { std::string path; ThreadReturnPromise done; Optional>& metrics; OpenAction(std::string path, Optional>& metrics) : path(std::move(path)), metrics(metrics) {} double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; void action(OpenAction& a) { std::vector defaultCF = { rocksdb::ColumnFamilyDescriptor{ "default", getCFOptions() } }; std::vector handle; auto options = getOptions(); auto status = rocksdb::DB::Open(options, a.path, defaultCF, &handle, &db); if (!status.ok()) { TraceEvent(SevError, "RocksDBError").detail("Error", status.ToString()).detail("Method", "Open"); a.done.sendError(statusToError(status)); } else { TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Open"); onMainThread([&] { a.metrics = rocksDBMetricLogger(options.statistics, db); return Future(true); }).blockUntilReady(); a.done.send(Void()); } } struct DeleteVisitor : public rocksdb::WriteBatch::Handler { VectorRef& deletes; Arena& arena; DeleteVisitor(VectorRef& deletes, Arena& arena) : deletes(deletes), arena(arena) {} rocksdb::Status DeleteRangeCF(uint32_t /*column_family_id*/, const rocksdb::Slice& begin, const rocksdb::Slice& end) override { KeyRangeRef kr(toStringRef(begin), toStringRef(end)); deletes.push_back_deep(arena, kr); return rocksdb::Status::OK(); } }; struct CommitAction : TypedAction { std::unique_ptr batchToCommit; ThreadReturnPromise done; double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; void action(CommitAction& a) { Standalone> deletes; DeleteVisitor dv(deletes, deletes.arena()); ASSERT(a.batchToCommit->Iterate(&dv).ok()); // If there are any range deletes, we should have added them to be deleted. ASSERT(!deletes.empty() || !a.batchToCommit->HasDeleteRange()); rocksdb::WriteOptions options; options.sync = !SERVER_KNOBS->ROCKSDB_UNSAFE_AUTO_FSYNC; auto s = db->Write(options, a.batchToCommit.get()); if (!s.ok()) { TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Commit"); a.done.sendError(statusToError(s)); } else { a.done.send(Void()); for (const auto& keyRange : deletes) { auto begin = toSlice(keyRange.begin); auto end = toSlice(keyRange.end); ASSERT(db->SuggestCompactRange(db->DefaultColumnFamily(), &begin, &end).ok()); } } } struct CloseAction : TypedAction { ThreadReturnPromise done; std::string path; bool deleteOnClose; CloseAction(std::string path, bool deleteOnClose) : path(path), deleteOnClose(deleteOnClose) {} double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; void action(CloseAction& a) { if (db == nullptr) { a.done.send(Void()); return; } auto s = db->Close(); if (!s.ok()) { TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Close"); } if (a.deleteOnClose) { std::vector defaultCF = { rocksdb::ColumnFamilyDescriptor{ "default", getCFOptions() } }; s = rocksdb::DestroyDB(a.path, getOptions(), defaultCF); if (!s.ok()) { TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Destroy"); } else { TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Destroy"); } } TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Close"); a.done.send(Void()); } }; struct Reader : IThreadPoolReceiver { DB& db; explicit Reader(DB& db) : db(db) {} void init() override {} struct ReadValueAction : TypedAction { Key key; Optional debugID; ThreadReturnPromise> result; ReadValueAction(KeyRef key, Optional debugID) : key(key), debugID(debugID) {} double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; } }; void action(ReadValueAction& a) { Optional traceBatch; if (a.debugID.present()) { traceBatch = { TraceBatch{} }; traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before"); } rocksdb::PinnableSlice value; auto s = db->Get(getReadOptions(), db->DefaultColumnFamily(), toSlice(a.key), &value); if (a.debugID.present()) { traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After"); traceBatch.get().dump(); } if (s.ok()) { a.result.send(Value(toStringRef(value))); } else { if (!s.IsNotFound()) { TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValue"); } a.result.send(Optional()); } } struct ReadValuePrefixAction : TypedAction { Key key; int maxLength; Optional debugID; ThreadReturnPromise> result; ReadValuePrefixAction(Key key, int maxLength, Optional debugID) : key(key), maxLength(maxLength), debugID(debugID){}; double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; } }; void action(ReadValuePrefixAction& a) { rocksdb::PinnableSlice value; Optional traceBatch; if (a.debugID.present()) { traceBatch = { TraceBatch{} }; traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask()); } auto s = db->Get(getReadOptions(), db->DefaultColumnFamily(), toSlice(a.key), &value); if (a.debugID.present()) { traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask()); traceBatch.get().dump(); } if (s.ok()) { a.result.send(Value(StringRef(reinterpret_cast(value.data()), std::min(value.size(), size_t(a.maxLength))))); } else { if (!s.IsNotFound()) { TraceEvent(SevError, "RocksDBError") .detail("Error", s.ToString()) .detail("Method", "ReadValuePrefix"); } a.result.send(Optional()); } } struct ReadRangeAction : TypedAction, FastAllocated { KeyRange keys; int rowLimit, byteLimit; ThreadReturnPromise result; ReadRangeAction(KeyRange keys, int rowLimit, int byteLimit) : keys(keys), rowLimit(rowLimit), byteLimit(byteLimit) {} double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; } }; void action(ReadRangeAction& a) { RangeResult result; if (a.rowLimit == 0 || a.byteLimit == 0) { a.result.send(result); } int accumulatedBytes = 0; rocksdb::Status s; auto options = getReadOptions(); // When using a prefix extractor, ensure that keys are returned in order even if they cross // a prefix boundary. options.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0); if (a.rowLimit >= 0) { auto endSlice = toSlice(a.keys.end); options.iterate_upper_bound = &endSlice; auto cursor = std::unique_ptr(db->NewIterator(options)); cursor->Seek(toSlice(a.keys.begin)); while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) { KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize(); result.push_back_deep(result.arena(), kv); // Calling `cursor->Next()` is potentially expensive, so short-circut here just in case. if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) { break; } cursor->Next(); } s = cursor->status(); } else { auto beginSlice = toSlice(a.keys.begin); options.iterate_lower_bound = &beginSlice; auto cursor = std::unique_ptr(db->NewIterator(options)); cursor->SeekForPrev(toSlice(a.keys.end)); if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) { cursor->Prev(); } while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin) { KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize(); result.push_back_deep(result.arena(), kv); // Calling `cursor->Prev()` is potentially expensive, so short-circut here just in case. if (result.size() >= -a.rowLimit || accumulatedBytes >= a.byteLimit) { break; } cursor->Prev(); } s = cursor->status(); } if (!s.ok()) { TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange"); } result.more = (result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit); if (result.more) { result.readThrough = result[result.size() - 1].key; } a.result.send(result); } }; DB db = nullptr; std::string path; UID id; Reference writeThread; Reference readThreads; Promise errorPromise; Promise closePromise; Future openFuture; std::unique_ptr writeBatch; Optional> metrics; explicit RocksDBKeyValueStore(const std::string& path, UID id) : path(path), id(id) { // In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine // is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also // block the network thread in a way that would be unacceptable in production but is a necessary evil here. When // performing the reads in background threads in simulation, the event loop thinks there is no work to do and // advances time faster than 1 sec/sec. By the time the blocking read actually finishes, simulation has advanced // time by more than 5 seconds, so every read fails with a transaction_too_old error. Doing blocking IO on the // main thread solves this issue. There are almost certainly better fixes, but my goal was to get a less // invasive change merged first and work on a more realistic version if/when we think that would provide // substantially more confidence in the correctness. // TODO: Adapt the simulation framework to not advance time quickly when background reads/writes are occurring. if (g_network->isSimulated()) { writeThread = CoroThreadPool::createThreadPool(); readThreads = CoroThreadPool::createThreadPool(); } else { writeThread = createGenericThreadPool(); readThreads = createGenericThreadPool(); } writeThread->addThread(new Writer(db, id), "fdb-rocksdb-wr"); for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) { readThreads->addThread(new Reader(db), "fdb-rocksdb-re"); } } Future getError() override { return errorPromise.getFuture(); } ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) { // The metrics future retains a reference to the DB, so stop it before we delete it. self->metrics.reset(); wait(self->readThreads->stop()); auto a = new Writer::CloseAction(self->path, deleteOnClose); auto f = a->done.getFuture(); self->writeThread->post(a); wait(f); wait(self->writeThread->stop()); if (self->closePromise.canBeSet()) self->closePromise.send(Void()); if (self->errorPromise.canBeSet()) self->errorPromise.send(Never()); delete self; } Future onClosed() override { return closePromise.getFuture(); } void dispose() override { doClose(this, true); } void close() override { doClose(this, false); } KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); } Future init() override { if (openFuture.isValid()) { return openFuture; } auto a = std::make_unique(path, metrics); openFuture = a->done.getFuture(); writeThread->post(a.release()); return openFuture; } void set(KeyValueRef kv, const Arena*) override { if (writeBatch == nullptr) { writeBatch.reset(new rocksdb::WriteBatch()); } writeBatch->Put(toSlice(kv.key), toSlice(kv.value)); } void clear(KeyRangeRef keyRange, const Arena*) override { if (writeBatch == nullptr) { writeBatch.reset(new rocksdb::WriteBatch()); } writeBatch->DeleteRange(toSlice(keyRange.begin), toSlice(keyRange.end)); } Future commit(bool) override { // If there is nothing to write, don't write. if (writeBatch == nullptr) { return Void(); } auto a = new Writer::CommitAction(); a->batchToCommit = std::move(writeBatch); auto res = a->done.getFuture(); writeThread->post(a); return res; } Future> readValue(KeyRef key, Optional debugID) override { auto a = new Reader::ReadValueAction(key, debugID); auto res = a->result.getFuture(); readThreads->post(a); return res; } Future> readValuePrefix(KeyRef key, int maxLength, Optional debugID) override { auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID); auto res = a->result.getFuture(); readThreads->post(a); return res; } Future readRange(KeyRangeRef keys, int rowLimit, int byteLimit) override { auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit); auto res = a->result.getFuture(); readThreads->post(a); return res; } StorageBytes getStorageBytes() const override { uint64_t live = 0; ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kLiveSstFilesSize, &live)); int64_t free; int64_t total; g_network->getDiskBytes(path, free, total); return StorageBytes(free, total, live, free); } }; } // namespace #endif // SSD_ROCKSDB_EXPERIMENTAL IKeyValueStore* keyValueStoreRocksDB(std::string const& path, UID logID, KeyValueStoreType storeType, bool checkChecksums, bool checkIntegrity) { #ifdef SSD_ROCKSDB_EXPERIMENTAL return new RocksDBKeyValueStore(path, logID); #else TraceEvent(SevError, "RocksDBEngineInitFailure").detail("Reason", "Built without RocksDB"); ASSERT(false); return nullptr; #endif // SSD_ROCKSDB_EXPERIMENTAL } #ifdef SSD_ROCKSDB_EXPERIMENTAL #include "flow/UnitTest.h" namespace { TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/Reopen") { state const std::string rocksDBTestDir = "rocksdb-kvstore-reopen-test-db"; platform::eraseDirectoryRecursive(rocksDBTestDir); state IKeyValueStore* kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID()); wait(kvStore->init()); kvStore->set({ LiteralStringRef("foo"), LiteralStringRef("bar") }); wait(kvStore->commit(false)); Optional val = wait(kvStore->readValue(LiteralStringRef("foo"))); ASSERT(Optional(LiteralStringRef("bar")) == val); Future closed = kvStore->onClosed(); kvStore->close(); wait(closed); kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID()); wait(kvStore->init()); // Confirm that `init()` is idempotent. wait(kvStore->init()); Optional val = wait(kvStore->readValue(LiteralStringRef("foo"))); ASSERT(Optional(LiteralStringRef("bar")) == val); Future closed = kvStore->onClosed(); kvStore->close(); wait(closed); platform::eraseDirectoryRecursive(rocksDBTestDir); return Void(); } } // namespace #endif // SSD_ROCKSDB_EXPERIMENTAL