From a6ce5c823ba766cb59e5d642c3255041634e3e8c Mon Sep 17 00:00:00 2001 From: Huisheng Liu Date: Tue, 24 Mar 2020 11:21:10 -0700 Subject: [PATCH] multiget support for timestamps (#6483) Summary: Add timestamp support for MultiGet(). timestamp from readoptions is honored, and timestamps can be returned along with values. MultiReadRandom perf test (10 minutes) on the same development machine ram drive with the same DB data shows no regression (within marge of error). The test is adapted from https://github.com/facebook/rocksdb/wiki/RocksDB-In-Memory-Workload-Performance-Benchmarks. base line (commit 17bef7d3a): multireadrandom : 104.173 micros/op 307167 ops/sec; (5462999 of 5462999 found) This PR: multireadrandom : 104.199 micros/op 307095 ops/sec; (5307999 of 5307999 found) .\db_bench --db=r:\rocksdb.github --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --cache_size=2147483648 --cache_numshardbits=6 --compression_type=none --compression_ratio=1 --min_level_to_compress=-1 --disable_seek_compaction=1 --hard_rate_limit=2 --write_buffer_size=134217728 --max_write_buffer_number=2 --level0_file_num_compaction_trigger=8 --target_file_size_base=134217728 --max_bytes_for_level_base=1073741824 --disable_wal=0 --wal_dir=r:\rocksdb.github\WAL_LOG --sync=0 --verify_checksum=1 --statistics=0 --stats_per_interval=0 --stats_interval=1048576 --histogram=0 --use_plain_table=1 --open_files=-1 --memtablerep=prefix_hash --bloom_bits=10 --bloom_locality=1 --duration=600 --benchmarks=multireadrandom --use_existing_db=1 --num=25000000 --threads=32 --allow_concurrent_memtable_write=0 Pull Request resolved: https://github.com/facebook/rocksdb/pull/6483 Reviewed By: anand1976 Differential Revision: D20498373 Pulled By: riversand963 fbshipit-source-id: 8505f22bc40fd791bc7dd05e48d7e67c91edb627 --- db/db_basic_test.cc | 129 +++++++++--------- db/db_impl/db_impl.cc | 50 +++++-- db/db_impl/db_impl.h | 16 +++ db/db_with_timestamp_basic_test.cc | 109 +++++++++++++-- db/memtable.cc | 6 +- db/version_set.cc | 4 +- include/rocksdb/db.h | 65 +++++++++ table/multiget_context.h | 4 +- .../write_batch_with_index.cc | 3 +- 9 files changed, 295 insertions(+), 91 deletions(-) diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index dedf938519..f2cfceae84 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -925,44 +925,44 @@ TEST_F(DBBasicTest, MmapAndBufferOptions) { #endif class TestEnv : public EnvWrapper { - public: - explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {} + public: + explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {} - class TestLogger : public Logger { - public: - using Logger::Logv; - explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; } - ~TestLogger() override { - if (!closed_) { - CloseHelper(); - } - } - void Logv(const char* /*format*/, va_list /*ap*/) override {} - - protected: - Status CloseImpl() override { return CloseHelper(); } - - private: - Status CloseHelper() { - env->CloseCountInc(); - ; - return Status::IOError(); - } - TestEnv* env; - }; - - void CloseCountInc() { close_count++; } - - int GetCloseCount() { return close_count; } - - Status NewLogger(const std::string& /*fname*/, - std::shared_ptr* result) override { - result->reset(new TestLogger(this)); - return Status::OK(); + class TestLogger : public Logger { + public: + using Logger::Logv; + explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; } + ~TestLogger() override { + if (!closed_) { + CloseHelper(); + } } + void Logv(const char* /*format*/, va_list /*ap*/) override {} + + protected: + Status CloseImpl() override { return CloseHelper(); } private: - int close_count; + Status CloseHelper() { + env->CloseCountInc(); + ; + return Status::IOError(); + } + TestEnv* env; + }; + + void CloseCountInc() { close_count++; } + + int GetCloseCount() { return close_count; } + + Status NewLogger(const std::string& /*fname*/, + std::shared_ptr* result) override { + result->reset(new TestLogger(this)); + return Status::OK(); + } + + private: + int close_count; }; TEST_F(DBBasicTest, DBClose) { @@ -1014,7 +1014,7 @@ TEST_F(DBBasicTest, DBCloseFlushError) { Options options = GetDefaultOptions(); options.create_if_missing = true; options.manual_wal_flush = true; - options.write_buffer_size=100; + options.write_buffer_size = 100; options.env = fault_injection_env.get(); Reopen(options); @@ -1464,7 +1464,8 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) { ASSERT_EQ(0, num_keys); for (int i = 0; i < 128; i += 9) { - ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); + ASSERT_OK( + Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); } std::vector keys; @@ -1702,8 +1703,8 @@ TEST_F(DBBasicTest, MultiGetIOBufferOverrun) { BlockBasedTableOptions table_options; table_options.pin_l0_filter_and_index_blocks_in_cache = true; table_options.block_size = 16 * 1024; - assert(table_options.block_size > - BlockBasedTable::kMultiGetReadStackBufSize); + ASSERT_TRUE(table_options.block_size > + BlockBasedTable::kMultiGetReadStackBufSize); options.table_factory.reset(new BlockBasedTableFactory(table_options)); Reopen(options); @@ -1938,7 +1939,7 @@ class DBBasicTestWithParallelIO if (!Snappy_Supported()) { compression_enabled_ = false; } -#endif //ROCKSDB_LITE +#endif // ROCKSDB_LITE table_options.block_cache = uncompressed_cache_; if (table_options.block_cache == nullptr) { @@ -2275,13 +2276,13 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) { ro.fill_cache = fill_cache(); SyncPoint::GetInstance()->SetCallBack( - "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) { - Status* s = static_cast(status); - read_count++; - if (read_count == 2) { - *s = Status::Corruption(); - } - }); + "RetrieveMultipleBlocks:VerifyChecksum", [&](void* status) { + Status* s = static_cast(status); + read_count++; + if (read_count == 2) { + *s = Status::Corruption(); + } + }); SyncPoint::GetInstance()->EnableProcessing(); // Warm up the cache first @@ -2294,7 +2295,7 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) { dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); ASSERT_TRUE(CheckValue(0, values[0].ToString())); - //ASSERT_TRUE(CheckValue(50, values[1].ToString())); + // ASSERT_TRUE(CheckValue(50, values[1].ToString())); ASSERT_EQ(statuses[0], Status::OK()); ASSERT_EQ(statuses[1], Status::Corruption()); @@ -2312,10 +2313,10 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) { ro.fill_cache = fill_cache(); SyncPoint::GetInstance()->SetCallBack( - "TableCache::MultiGet:FindTable", [&](void *status) { - Status* s = static_cast(status); - *s = Status::IOError(); - }); + "TableCache::MultiGet:FindTable", [&](void* status) { + Status* s = static_cast(status); + *s = Status::IOError(); + }); // DB open will create table readers unless we reduce the table cache // capacity. // SanitizeOptions will set max_open_files to minimum of 20. Table cache @@ -2324,10 +2325,10 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) { // prevent file open during DB open and force the file to be opened // during MultiGet SyncPoint::GetInstance()->SetCallBack( - "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) { - int* max_open_files = (int*)arg; - *max_open_files = 11; - }); + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); SyncPoint::GetInstance()->EnableProcessing(); Reopen(CurrentOptions()); @@ -2347,15 +2348,15 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) { SyncPoint::GetInstance()->DisableProcessing(); } -INSTANTIATE_TEST_CASE_P( - ParallelIO, DBBasicTestWithParallelIO, - // Params are as follows - - // Param 0 - Compressed cache enabled - // Param 1 - Uncompressed cache enabled - // Param 2 - Data compression enabled - // Param 3 - ReadOptions::fill_cache - ::testing::Combine(::testing::Bool(), ::testing::Bool(), - ::testing::Bool(), ::testing::Bool())); +INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO, + // Params are as follows - + // Param 0 - Compressed cache enabled + // Param 1 - Uncompressed cache enabled + // Param 2 - Data compression enabled + // Param 3 - ReadOptions::fill_cache + ::testing::Combine(::testing::Bool(), ::testing::Bool(), + ::testing::Bool(), + ::testing::Bool())); } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index af18ee8685..bc753223c9 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1687,12 +1687,20 @@ std::vector DBImpl::MultiGet( const ReadOptions& read_options, const std::vector& column_family, const std::vector& keys, std::vector* values) { + return MultiGet(read_options, column_family, keys, values, + /*timestamps*/ nullptr); +} + +std::vector DBImpl::MultiGet( + const ReadOptions& read_options, + const std::vector& column_family, + const std::vector& keys, std::vector* values, + std::vector* timestamps) { PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); StopWatch sw(env_, stats_, DB_MULTIGET); PERF_TIMER_GUARD(get_snapshot_time); SequenceNumber consistent_seqnum; - ; std::unordered_map multiget_cf_data( column_family.size()); @@ -1723,6 +1731,9 @@ std::vector DBImpl::MultiGet( size_t num_keys = keys.size(); std::vector stat_list(num_keys); values->resize(num_keys); + if (timestamps) { + timestamps->resize(num_keys); + } // Keep track of bytes that we read for statistics-recording later uint64_t bytes_read = 0; @@ -1737,8 +1748,9 @@ std::vector DBImpl::MultiGet( merge_context.Clear(); Status& s = stat_list[i]; std::string* value = &(*values)[i]; + std::string* timestamp = timestamps ? &(*timestamps)[i] : nullptr; - LookupKey lkey(keys[i], consistent_seqnum); + LookupKey lkey(keys[i], consistent_seqnum, read_options.timestamp); auto cfh = reinterpret_cast(column_family[i]); SequenceNumber max_covering_tombstone_seq = 0; auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); @@ -1750,13 +1762,12 @@ std::vector DBImpl::MultiGet( has_unpersisted_data_.load(std::memory_order_relaxed)); bool done = false; if (!skip_memtable) { - if (super_version->mem->Get(lkey, value, /*timestamp=*/nullptr, &s, - &merge_context, &max_covering_tombstone_seq, - read_options)) { + if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); } else if (super_version->imm->Get( - lkey, value, nullptr, &s, &merge_context, + lkey, value, timestamp, &s, &merge_context, &max_covering_tombstone_seq, read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); @@ -1765,8 +1776,8 @@ std::vector DBImpl::MultiGet( if (!done) { PinnableSlice pinnable_val; PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, &pinnable_val, - /*timestamp=*/nullptr, &s, &merge_context, + super_version->current->Get(read_options, lkey, &pinnable_val, timestamp, + &s, &merge_context, &max_covering_tombstone_seq); value->assign(pinnable_val.data(), pinnable_val.size()); RecordTick(stats_, MEMTABLE_MISS); @@ -1929,6 +1940,14 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, ColumnFamilyHandle** column_families, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input) { + return MultiGet(read_options, num_keys, column_families, keys, values, + /*timestamps*/ nullptr, statuses, sorted_input); +} + +void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, const bool sorted_input) { if (num_keys == 0) { return; } @@ -1937,7 +1956,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, sorted_keys.resize(num_keys); for (size_t i = 0; i < num_keys; ++i) { key_context.emplace_back(column_families[i], keys[i], &values[i], - &statuses[i]); + ×tamps[i], &statuses[i]); } for (size_t i = 0; i < num_keys; ++i) { sorted_keys[i] = &key_context[i]; @@ -2057,11 +2076,22 @@ void DBImpl::MultiGet(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const size_t num_keys, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input) { + return MultiGet(read_options, column_family, num_keys, keys, values, + /*timestamp=*/nullptr, statuses, sorted_input); +} + +void DBImpl::MultiGet(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const size_t num_keys, + const Slice* keys, PinnableSlice* values, + std::string* timestamps, Status* statuses, + const bool sorted_input) { autovector key_context; autovector sorted_keys; sorted_keys.resize(num_keys); for (size_t i = 0; i < num_keys; ++i) { - key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]); + key_context.emplace_back(column_family, keys[i], &values[i], + timestamps ? ×tamps[i] : nullptr, + &statuses[i]); } for (size_t i = 0; i < num_keys; ++i) { sorted_keys[i] = &key_context[i]; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 978ee8c9c6..2a057a8735 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -188,6 +188,11 @@ class DBImpl : public DB { const std::vector& column_family, const std::vector& keys, std::vector* values) override; + virtual std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, std::vector* values, + std::vector* timestamps) override; // This MultiGet is a batched version, which may be faster than calling Get // multiple times, especially if the keys have some spatial locality that @@ -201,11 +206,22 @@ class DBImpl : public DB { const size_t num_keys, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input = false) override; + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, + const bool sorted_input = false) override; virtual void MultiGet(const ReadOptions& options, const size_t num_keys, ColumnFamilyHandle** column_families, const Slice* keys, PinnableSlice* values, Status* statuses, const bool sorted_input = false) override; + virtual void MultiGet(const ReadOptions& options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, + const bool sorted_input = false) override; virtual void MultiGetWithCallback( const ReadOptions& options, ColumnFamilyHandle* column_family, diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 51678f787a..184f08b17f 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -580,7 +580,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { std::vector write_ts_list; std::vector read_ts_list; - const auto& verify_record_func = [&](size_t i, size_t k, + const auto& verify_records_func = [&](size_t i, size_t begin, size_t end, ColumnFamilyHandle* cfh) { std::string value; std::string timestamp; @@ -591,9 +591,11 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { std::string expected_timestamp = std::string(write_ts_list[i].data(), write_ts_list[i].size()); - ASSERT_OK(db_->Get(ropts, cfh, Key1(k), &value, ×tamp)); - ASSERT_EQ("value_" + std::to_string(k) + "_" + std::to_string(i), value); - ASSERT_EQ(expected_timestamp, timestamp); + for (size_t j = begin; j <= end; ++j) { + ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value, ×tamp)); + ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), value); + ASSERT_EQ(expected_timestamp, timestamp); + } }; for (size_t i = 0; i != kNumTimestamps; ++i) { @@ -609,9 +611,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { "value_" + std::to_string(j) + "_" + std::to_string(i), wopts)); if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) { - for (size_t k = memtable_get_start; k <= j; ++k) { - verify_record_func(i, k, handles_[cf]); - } + verify_records_func(i, memtable_get_start, j, handles_[cf]); memtable_get_start = j + 1; // flush all keys with the same timestamp to two sst files, split at @@ -641,15 +641,104 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { write_ts_list[i].size()); for (int cf = 0; cf != static_cast(num_cfs); ++cf) { ColumnFamilyHandle* cfh = handles_[cf]; - for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { - verify_record_func(i, j, cfh); - } + verify_records_func(i, 0, kNumKeysPerTimestamp - 1, cfh); } } }; verify_db_func(); Close(); } + +TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) { + const int kNumKeysPerFile = 8192; + const size_t kNumTimestamps = 2; + const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps; + Options options = CurrentOptions(); + options.create_if_missing = true; + options.env = env_; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + + size_t ts_sz = Timestamp(0, 0).size(); + TestComparator test_cmp(ts_sz); + options.comparator = &test_cmp; + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy( + 10 /*bits_per_key*/, false /*use_block_based_builder*/)); + bbto.whole_key_filtering = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(2, num_cfs); + std::vector write_ts_list; + std::vector read_ts_list; + + const auto& verify_records_func = [&](size_t i, ColumnFamilyHandle* cfh) { + std::vector keys; + std::vector key_vals; + std::vector values; + std::vector timestamps; + + for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { + key_vals.push_back(Key1(j)); + } + for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { + keys.push_back(key_vals[j]); + } + + ReadOptions ropts; + const Slice read_ts = read_ts_list[i]; + ropts.timestamp = &read_ts; + std::string expected_timestamp(write_ts_list[i].data(), + write_ts_list[i].size()); + + std::vector cfhs(keys.size(), cfh); + std::vector statuses = + db_->MultiGet(ropts, cfhs, keys, &values, ×tamps); + for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { + ASSERT_OK(statuses[j]); + ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), + values[j]); + ASSERT_EQ(expected_timestamp, timestamps[j]); + } + }; + + for (size_t i = 0; i != kNumTimestamps; ++i) { + write_ts_list.push_back(Timestamp(i * 2, 0)); + read_ts_list.push_back(Timestamp(1 + i * 2, 0)); + const Slice& write_ts = write_ts_list.back(); + for (int cf = 0; cf != static_cast(num_cfs); ++cf) { + WriteOptions wopts; + WriteBatch batch(0, 0, ts_sz); + for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { + ASSERT_OK( + batch.Put(handles_[cf], Key1(j), + "value_" + std::to_string(j) + "_" + std::to_string(i))); + } + batch.AssignTimestamp(write_ts); + ASSERT_OK(db_->Write(wopts, &batch)); + + verify_records_func(i, handles_[cf]); + + ASSERT_OK(Flush(cf)); + } + } + + const auto& verify_db_func = [&]() { + for (size_t i = 0; i != kNumTimestamps; ++i) { + ReadOptions ropts; + const Slice read_ts = read_ts_list[i]; + ropts.timestamp = &read_ts; + for (int cf = 0; cf != static_cast(num_cfs); ++cf) { + ColumnFamilyHandle* cfh = handles_[cf]; + verify_records_func(i, cfh); + } + } + }; + verify_db_func(); + Close(); +} + #endif // !ROCKSDB_LITE INSTANTIATE_TEST_CASE_P( diff --git a/db/memtable.cc b/db/memtable.cc index 82b0cc9287..efc73b7cae 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -919,9 +919,9 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); } GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, - callback, is_blob, iter->value->GetSelf(), - /*timestamp=*/nullptr, iter->s, &(iter->merge_context), &seq, - &found_final_value, &merge_in_progress); + callback, is_blob, iter->value->GetSelf(), iter->timestamp, + iter->s, &(iter->merge_context), &seq, &found_final_value, + &merge_in_progress); if (!found_final_value && merge_in_progress) { *(iter->s) = Status::MergeInProgress(); diff --git a/db/version_set.cc b/db/version_set.cc index 91d60f6151..c8f50b3337 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1897,8 +1897,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, get_ctx.emplace_back( user_comparator(), merge_operator_, info_log_, db_statistics_, iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, - iter->value, /*timestamp*/ nullptr, nullptr, &(iter->merge_context), - true, &iter->max_covering_tombstone_seq, this->env_, nullptr, + iter->value, iter->timestamp, nullptr, &(iter->merge_context), true, + &iter->max_covering_tombstone_seq, this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, tracing_mget_id); // MergeInProgress status, if set, has been transferred to the get_context diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index cbc17c2051..6de90fa0b0 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -480,6 +480,25 @@ class DB { keys, values); } + virtual std::vector MultiGet( + const ReadOptions& /*options*/, + const std::vector& /*column_family*/, + const std::vector& keys, std::vector* /*values*/, + std::vector* /*timestamps*/) { + return std::vector( + keys.size(), Status::NotSupported( + "MultiGet() returning timestamps not implemented.")); + } + virtual std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values, + std::vector* timestamps) { + return MultiGet( + options, + std::vector(keys.size(), DefaultColumnFamily()), + keys, values, timestamps); + } + // Overloaded MultiGet API that improves performance by batching operations // in the read path for greater efficiency. Currently, only the block based // table format with full filters are supported. Other table formats such @@ -521,6 +540,30 @@ class DB { } } + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, const bool /*sorted_input*/ = false) { + std::vector cf; + std::vector user_keys; + std::vector status; + std::vector vals; + std::vector tss; + + for (size_t i = 0; i < num_keys; ++i) { + cf.emplace_back(column_family); + user_keys.emplace_back(keys[i]); + } + status = MultiGet(options, cf, user_keys, &vals, &tss); + std::copy(status.begin(), status.end(), statuses); + std::copy(tss.begin(), tss.end(), timestamps); + for (auto& value : vals) { + values->PinSelf(value); + values++; + } + } + // Overloaded MultiGet API that improves performance by batching operations // in the read path for greater efficiency. Currently, only the block based // table format with full filters are supported. Other table formats such @@ -560,6 +603,28 @@ class DB { values++; } } + virtual void MultiGet(const ReadOptions& options, const size_t num_keys, + ColumnFamilyHandle** column_families, const Slice* keys, + PinnableSlice* values, std::string* timestamps, + Status* statuses, const bool /*sorted_input*/ = false) { + std::vector cf; + std::vector user_keys; + std::vector status; + std::vector vals; + std::vector tss; + + for (size_t i = 0; i < num_keys; ++i) { + cf.emplace_back(column_families[i]); + user_keys.emplace_back(keys[i]); + } + status = MultiGet(options, cf, user_keys, &vals, &tss); + std::copy(status.begin(), status.end(), statuses); + std::copy(tss.begin(), tss.end(), timestamps); + for (auto& value : vals) { + values->PinSelf(value); + values++; + } + } // If the key definitely does not exist in the database, then this method // returns false, else true. If the caller wants to obtain value when the key diff --git a/table/multiget_context.h b/table/multiget_context.h index 0c5848c822..3d30bf200e 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -29,10 +29,11 @@ struct KeyContext { bool key_exists; void* cb_arg; PinnableSlice* value; + std::string* timestamp; GetContext* get_context; KeyContext(ColumnFamilyHandle* col_family, const Slice& user_key, - PinnableSlice* val, Status* stat) + PinnableSlice* val, std::string* ts, Status* stat) : key(&user_key), lkey(nullptr), column_family(col_family), @@ -41,6 +42,7 @@ struct KeyContext { key_exists(false), cb_arg(nullptr), value(val), + timestamp(ts), get_context(nullptr) {} KeyContext() = default; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 2df6bcaf38..675e9752d7 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -982,7 +982,8 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || result == WriteBatchWithIndexInternal::Result::kNotFound); - key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]); + key_context.emplace_back(column_family, keys[i], &values[i], + /*timestamp*/ nullptr, &statuses[i]); merges.emplace_back(result, std::move(merge_context)); }