From 9d77bf8f7b772495d7b615f9fc6624ed75f8f637 Mon Sep 17 00:00:00 2001
From: Changyu Bi <changyubi@fb.com>
Date: Fri, 5 Aug 2022 12:02:33 -0700
Subject: [PATCH] Fragment memtable range tombstone in the write path (#10380)

Summary:
- Right now each read fragments the memtable range tombstones https://github.com/facebook/rocksdb/issues/4808. This PR explores the idea of fragmenting memtable range tombstones in the write path and reads can just read this cached fragmented tombstone without any fragmenting cost. This PR only does the caching for immutable memtable, and does so right before a memtable is added to an immutable memtable list. The fragmentation is done without holding mutex to minimize its performance impact.
- db_bench is updated to print out the number of range deletions executed if there is any.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10380

Test Plan:
- CI, added asserts in various places to check whether a fragmented range tombstone list should have been constructed.
- Benchmark: as this PR only optimizes immutable memtable path, the number of writes in the benchmark is chosen such  an immutable memtable is created and range tombstones are in that memtable.

```
single thread:
./db_bench --benchmarks=fillrandom,readrandom --writes_per_range_tombstone=1 --max_write_buffer_number=100 --min_write_buffer_number_to_merge=100 --writes=500000 --reads=100000 --max_num_range_tombstones=100

multi_thread
./db_bench --benchmarks=fillrandom,readrandom --writes_per_range_tombstone=1 --max_write_buffer_number=100 --min_write_buffer_number_to_merge=100 --writes=15000 --reads=20000 --threads=32 --max_num_range_tombstones=100
```
Commit 99cdf16464a057ca44de2f747541dedf651bae9e is included in benchmark result. It was an earlier attempt where tombstones are fragmented for each write operation. Reader threads share it using a shared_ptr which would slow down multi-thread read performance as seen in benchmark results.
Results are averaged over 5 runs.

Single thread result:
| Max # tombstones  | main fillrandom micros/op | 99cdf16464a057ca44de2f747541dedf651bae9e | Post PR | main readrandom micros/op |  99cdf16464a057ca44de2f747541dedf651bae9e | Post PR |
| ------------- | ------------- |------------- |------------- |------------- |------------- |------------- |
| 0    |6.68     |6.57     |6.72     |4.72     |4.79     |4.54     |
| 1    |6.67     |6.58     |6.62     |5.41     |4.74     |4.72     |
| 10   |6.59     |6.5      |6.56     |7.83     |4.69     |4.59     |
| 100  |6.62     |6.75     |6.58     |29.57    |5.04     |5.09     |
| 1000 |6.54     |6.82     |6.61     |320.33   |5.22     |5.21     |

32-thread result: note that "Max # tombstones" is per thread.
| Max # tombstones  | main fillrandom micros/op | 99cdf16464a057ca44de2f747541dedf651bae9e | Post PR | main readrandom micros/op |  99cdf16464a057ca44de2f747541dedf651bae9e | Post PR |
| ------------- | ------------- |------------- |------------- |------------- |------------- |------------- |
| 0    |234.52   |260.25   |239.42   |5.06     |5.38     |5.09     |
| 1    |236.46   |262.0    |231.1    |19.57    |22.14    |5.45     |
| 10   |236.95   |263.84   |251.49   |151.73   |21.61    |5.73     |
| 100  |268.16   |296.8    |280.13   |2308.52  |22.27    |6.57     |

Reviewed By: ajkr

Differential Revision: D37916564

Pulled By: cbi42

fbshipit-source-id: 05d6d2e16df26c374c57ddcca13a5bfe9d5b731e
---
 HISTORY.md                      |  3 ++
 db/arena_wrapped_db_iter.cc     |  4 +-
 db/column_family.cc             |  4 +-
 db/db_impl/db_impl.cc           | 17 +++++---
 db/db_impl/db_impl_open.cc      |  6 ++-
 db/db_impl/db_impl_readonly.cc  |  3 +-
 db/db_impl/db_impl_secondary.cc |  4 +-
 db/db_impl/db_impl_write.cc     |  4 ++
 db/db_memtable_test.cc          |  3 +-
 db/flush_job.cc                 | 15 ++++---
 db/flush_job_test.cc            |  8 +++-
 db/forward_iterator.cc          |  6 ++-
 db/memtable.cc                  | 44 ++++++++++++++++----
 db/memtable.h                   | 74 ++++++++++++++++++++++++++-------
 db/memtable_list.cc             | 22 ++++++----
 db/memtable_list_test.cc        | 40 +++++++++++++-----
 db/repair.cc                    |  4 +-
 db/write_batch_test.cc          |  3 +-
 table/table_test.cc             |  3 +-
 tools/db_bench_tool.cc          |  6 +++
 20 files changed, 208 insertions(+), 65 deletions(-)

diff --git a/HISTORY.md b/HISTORY.md
index c524180f4e..2f6adf2356 100644
--- a/HISTORY.md
+++ b/HISTORY.md
@@ -28,6 +28,9 @@
 * PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env.
 * Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly.
 
+### Performance Improvements
+* Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables.  
+
 ## 7.5.0 (07/15/2022)
 ### New Features
 * Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`.
diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc
index bbb2b7493e..c14217f621 100644
--- a/db/arena_wrapped_db_iter.cc
+++ b/db/arena_wrapped_db_iter.cc
@@ -89,8 +89,8 @@ Status ArenaWrappedDBIter::Refresh() {
         ReadRangeDelAggregator* range_del_agg =
             db_iter_->GetRangeDelAggregator();
         std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
-        range_del_iter.reset(
-            sv->mem->NewRangeTombstoneIterator(read_options_, latest_seq));
+        range_del_iter.reset(sv->mem->NewRangeTombstoneIterator(
+            read_options_, latest_seq, false /* immutable_memtable */));
         range_del_agg->AddTombstones(std::move(range_del_iter));
         cfd_->ReturnThreadLocalSuperVersion(sv);
       }
diff --git a/db/column_family.cc b/db/column_family.cc
index 142f54a23b..95cb7cee45 100644
--- a/db/column_family.cc
+++ b/db/column_family.cc
@@ -1149,8 +1149,8 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
 
   auto read_seq = super_version->current->version_set()->LastSequence();
   ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
-  auto* active_range_del_iter =
-      super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
+  auto* active_range_del_iter = super_version->mem->NewRangeTombstoneIterator(
+      read_opts, read_seq, false /* immutable_memtable */);
   range_del_agg.AddTombstones(
       std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
   Status status;
diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc
index 38033b6e0c..44ace3745f 100644
--- a/db/db_impl/db_impl.cc
+++ b/db/db_impl/db_impl.cc
@@ -1761,8 +1761,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
   std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
   Status s;
   if (!read_options.ignore_range_deletions) {
-    range_del_iter.reset(
-        super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
+    range_del_iter.reset(super_version->mem->NewRangeTombstoneIterator(
+        read_options, sequence, false /* immutable_memtable */));
     range_del_agg->AddTombstones(std::move(range_del_iter));
   }
   // Collect all needed child iterators for immutable memtables
@@ -1982,7 +1982,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
     if (get_impl_options.get_value) {
       if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), timestamp, &s,
                        &merge_context, &max_covering_tombstone_seq,
-                       read_options, get_impl_options.callback,
+                       read_options, false /* immutable_memtable */,
+                       get_impl_options.callback,
                        get_impl_options.is_blob_index)) {
         done = true;
         get_impl_options.value->PinSelf();
@@ -2002,7 +2003,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
       // merged and raw values should be returned to the user.
       if (sv->mem->Get(lkey, /*value*/ nullptr, /*timestamp=*/nullptr, &s,
                        &merge_context, &max_covering_tombstone_seq,
-                       read_options, nullptr, nullptr, false)) {
+                       read_options, false /* immutable_memtable */, nullptr,
+                       nullptr, false)) {
         done = true;
         RecordTick(stats_, MEMTABLE_HIT);
       } else if ((s.ok() || s.IsMergeInProgress()) &&
@@ -2252,6 +2254,7 @@ std::vector<Status> DBImpl::MultiGet(
     if (!skip_memtable) {
       if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
                                   &max_covering_tombstone_seq, read_options,
+                                  false /* immutable_memtable */,
                                   read_callback)) {
         done = true;
         RecordTick(stats_, MEMTABLE_HIT);
@@ -2788,7 +2791,8 @@ Status DBImpl::MultiGetImpl(
         (read_options.read_tier == kPersistedTier &&
          has_unpersisted_data_.load(std::memory_order_relaxed));
     if (!skip_memtable) {
-      super_version->mem->MultiGet(read_options, &range, callback);
+      super_version->mem->MultiGet(read_options, &range, callback,
+                                   false /* immutable_memtable */);
       if (!range.empty()) {
         super_version->imm->MultiGet(read_options, &range, callback);
       }
@@ -4859,7 +4863,8 @@ Status DBImpl::GetLatestSequenceForKey(
   // Check if there is a record for this key in the latest memtable
   sv->mem->Get(lkey, /*value=*/nullptr, timestamp, &s, &merge_context,
                &max_covering_tombstone_seq, seq, read_options,
-               nullptr /*read_callback*/, is_blob_index);
+               false /* immutable_memtable */, nullptr /*read_callback*/,
+               is_blob_index);
 
   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
     // unexpected error reading memtable.
diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc
index fa32f4fd49..f85778fe07 100644
--- a/db/db_impl/db_impl_open.cc
+++ b/db/db_impl/db_impl_open.cc
@@ -1546,7 +1546,11 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
       std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
           range_del_iters;
       auto range_del_iter =
-          mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
+          // This is called during recovery, where a live memtable is flushed
+          // directly. In this case, no fragmented tombstone list is cached in
+          // this memtable yet.
+          mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
+                                         false /* immutable_memtable */);
       if (range_del_iter != nullptr) {
         range_del_iters.emplace_back(range_del_iter);
       }
diff --git a/db/db_impl/db_impl_readonly.cc b/db/db_impl/db_impl_readonly.cc
index 5548ebc09d..f817228a2f 100644
--- a/db/db_impl/db_impl_readonly.cc
+++ b/db/db_impl/db_impl_readonly.cc
@@ -87,7 +87,8 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
   PERF_TIMER_STOP(get_snapshot_time);
   if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), ts, &s,
                               &merge_context, &max_covering_tombstone_seq,
-                              read_options, &read_cb)) {
+                              read_options, false /* immutable_memtable */,
+                              &read_cb)) {
     pinnable_val->PinSelf();
     RecordTick(stats_, MEMTABLE_HIT);
   } else {
diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc
index 6dd34c49bf..78d81c023a 100644
--- a/db/db_impl/db_impl_secondary.cc
+++ b/db/db_impl/db_impl_secondary.cc
@@ -261,6 +261,7 @@ Status DBImplSecondary::RecoverLogFiles(
             MemTable* new_mem =
                 cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
             cfd->mem()->SetNextLogNumber(log_number);
+            cfd->mem()->ConstructFragmentedRangeTombstones();
             cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
             new_mem->Ref();
             cfd->SetMemtable(new_mem);
@@ -391,7 +392,8 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
   std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
   if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), ts, &s,
                               &merge_context, &max_covering_tombstone_seq,
-                              read_options, &read_cb)) {
+                              read_options, false /* immutable_memtable */,
+                              &read_cb)) {
     done = true;
     pinnable_val->PinSelf();
     RecordTick(stats_, MEMTABLE_HIT);
diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc
index 0f539e2fa3..baa2f7460c 100644
--- a/db/db_impl/db_impl_write.cc
+++ b/db/db_impl/db_impl_write.cc
@@ -2065,6 +2065,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
                  "[%s] New memtable created with log file: #%" PRIu64
                  ". Immutable memtables: %d.\n",
                  cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
+  // There should be no concurrent write as the thread is at the front of
+  // writer queue
+  cfd->mem()->ConstructFragmentedRangeTombstones();
+
   mutex_.Lock();
   if (recycle_log_number != 0) {
     // Since renaming the file is done outside DB mutex, we need to ensure
diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc
index 306feaa398..81865ab120 100644
--- a/db/db_memtable_test.cc
+++ b/db/db_memtable_test.cc
@@ -263,7 +263,8 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
   SequenceNumber max_covering_tombstone_seq = 0;
   LookupKey lkey("key", kMaxSequenceNumber);
   bool res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status,
-                      &merge_context, &max_covering_tombstone_seq, roptions);
+                      &merge_context, &max_covering_tombstone_seq, roptions,
+                      false /* immutable_memtable */);
   ASSERT_OK(status);
   ASSERT_TRUE(res);
   uint64_t ivalue = DecodeFixed64(Slice(value).data());
diff --git a/db/flush_job.cc b/db/flush_job.cc
index 32cf1e1731..7304d6e156 100644
--- a/db/flush_job.cc
+++ b/db/flush_job.cc
@@ -390,7 +390,8 @@ Status FlushJob::MemPurge() {
       range_del_iters;
   for (MemTable* m : mems_) {
     memtables.push_back(m->NewIterator(ro, &arena));
-    auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
+    auto* range_del_iter = m->NewRangeTombstoneIterator(
+        ro, kMaxSequenceNumber, true /* immutable_memtable */);
     if (range_del_iter != nullptr) {
       range_del_iters.emplace_back(range_del_iter);
     }
@@ -585,6 +586,8 @@ Status FlushJob::MemPurge() {
       // as in need of being flushed.
       if (new_mem->ApproximateMemoryUsage() < maxSize &&
           !(new_mem->ShouldFlushNow())) {
+        // Construct fragmented memtable range tombstones without mutex
+        new_mem->ConstructFragmentedRangeTombstones();
         db_mutex_->Lock();
         uint64_t new_mem_id = mems_[0]->GetID();
 
@@ -732,7 +735,8 @@ bool FlushJob::MemPurgeDecider(double threshold) {
 
       // Estimate if the sample entry is valid or not.
       get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
-                        &max_covering_tombstone_seq, &sqno, ro);
+                        &max_covering_tombstone_seq, &sqno, ro,
+                        true /* immutable_memtable */);
       if (!get_res) {
         ROCKS_LOG_WARN(
             db_options_.info_log,
@@ -773,7 +777,8 @@ bool FlushJob::MemPurgeDecider(double threshold) {
              next_mem_iter != std::end(mems_); next_mem_iter++) {
           if ((*next_mem_iter)
                   ->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
-                        &max_covering_tombstone_seq, &sqno, ro)) {
+                        &max_covering_tombstone_seq, &sqno, ro,
+                        true /* immutable_memtable */)) {
             not_in_next_mems = false;
             break;
           }
@@ -857,8 +862,8 @@ Status FlushJob::WriteLevel0Table() {
           "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
           cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
       memtables.push_back(m->NewIterator(ro, &arena));
-      auto* range_del_iter =
-          m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
+      auto* range_del_iter = m->NewRangeTombstoneIterator(
+          ro, kMaxSequenceNumber, true /* immutable_memtable */);
       if (range_del_iter != nullptr) {
         range_del_iters.emplace_back(range_del_iter);
       }
diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc
index 3e67f09fe8..4b35bc9b40 100644
--- a/db/flush_job_test.cc
+++ b/db/flush_job_test.cc
@@ -242,6 +242,7 @@ TEST_F(FlushJobTest, NonEmpty) {
   mock::SortKVVector(&inserted_keys);
 
   autovector<MemTable*> to_delete;
+  new_mem->ConstructFragmentedRangeTombstones();
   cfd->imm()->Add(new_mem, &to_delete);
   for (auto& m : to_delete) {
     delete m;
@@ -303,6 +304,7 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
 
   autovector<MemTable*> to_delete;
   for (auto mem : new_mems) {
+    mem->ConstructFragmentedRangeTombstones();
     cfd->imm()->Add(mem, &to_delete);
   }
 
@@ -372,7 +374,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
         ASSERT_OK(mem->Add(curr_seqno++, kTypeValue, key, value,
                            nullptr /* kv_prot_info */));
       }
-
+      mem->ConstructFragmentedRangeTombstones();
       cfd->imm()->Add(mem, &to_delete);
     }
     largest_seqs.push_back(curr_seqno - 1);
@@ -505,6 +507,7 @@ TEST_F(FlushJobTest, Snapshots) {
   mock::SortKVVector(&inserted_keys);
 
   autovector<MemTable*> to_delete;
+  new_mem->ConstructFragmentedRangeTombstones();
   cfd->imm()->Add(new_mem, &to_delete);
   for (auto& m : to_delete) {
     delete m;
@@ -559,6 +562,7 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) {
 
   autovector<MemTable*> to_delete;
   for (auto mem : new_mems) {
+    mem->ConstructFragmentedRangeTombstones();
     cfd->imm()->Add(mem, &to_delete);
   }
 
@@ -638,6 +642,7 @@ TEST_F(FlushJobTimestampTest, AllKeysExpired) {
     SequenceNumber seq = (curr_seq_++);
     AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
                           ValueType::kTypeDeletionWithTimestamp, "");
+    new_mem->ConstructFragmentedRangeTombstones();
     cfd->imm()->Add(new_mem, &to_delete);
   }
 
@@ -690,6 +695,7 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) {
       AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
                             ValueType::kTypeValue, "0_value");
     }
+    new_mem->ConstructFragmentedRangeTombstones();
     cfd->imm()->Add(new_mem, &to_delete);
   }
 
diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc
index 683a151643..29ae1d0ad9 100644
--- a/db/forward_iterator.cc
+++ b/db/forward_iterator.cc
@@ -668,7 +668,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
   if (!read_options_.ignore_range_deletions) {
     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
         sv_->mem->NewRangeTombstoneIterator(
-            read_options_, sv_->current->version_set()->LastSequence()));
+            read_options_, sv_->current->version_set()->LastSequence(),
+            false /* immutable_memtable */));
     range_del_agg.AddTombstones(std::move(range_del_iter));
     // Always return Status::OK().
     Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
@@ -733,7 +734,8 @@ void ForwardIterator::RenewIterators() {
   if (!read_options_.ignore_range_deletions) {
     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
         svnew->mem->NewRangeTombstoneIterator(
-            read_options_, sv_->current->version_set()->LastSequence()));
+            read_options_, sv_->current->version_set()->LastSequence(),
+            false /* immutable_memtable */));
     range_del_agg.AddTombstones(std::move(range_del_iter));
     // Always return Status::OK().
     Status temp_s = svnew->imm->AddRangeTombstoneIterators(
diff --git a/db/memtable.cc b/db/memtable.cc
index ab8c6e2ac9..645dd065f2 100644
--- a/db/memtable.cc
+++ b/db/memtable.cc
@@ -445,16 +445,28 @@ InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
 }
 
 FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
-    const ReadOptions& read_options, SequenceNumber read_seq) {
+    const ReadOptions& read_options, SequenceNumber read_seq,
+    bool immutable_memtable) {
   if (read_options.ignore_range_deletions ||
       is_range_del_table_empty_.load(std::memory_order_relaxed)) {
     return nullptr;
   }
-  return NewRangeTombstoneIteratorInternal(read_options, read_seq);
+  return NewRangeTombstoneIteratorInternal(read_options, read_seq,
+                                           immutable_memtable);
 }
 
 FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
-    const ReadOptions& read_options, SequenceNumber read_seq) {
+    const ReadOptions& read_options, SequenceNumber read_seq,
+    bool immutable_memtable) {
+  if (immutable_memtable) {
+    // Note that caller should already have verified that
+    // !is_range_del_table_empty_
+    assert(IsFragmentedRangeTombstonesConstructed());
+    return new FragmentedRangeTombstoneIterator(
+        fragmented_range_tombstone_list_.get(), comparator_.comparator,
+        read_seq);
+  }
+
   auto* unfragmented_iter = new MemTableIterator(
       *this, read_options, nullptr /* arena */, true /* use_range_del_table */);
   auto fragmented_tombstone_list =
@@ -467,6 +479,21 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
   return fragmented_iter;
 }
 
+void MemTable::ConstructFragmentedRangeTombstones() {
+  assert(!IsFragmentedRangeTombstonesConstructed(false));
+  // There should be no concurrent Construction
+  if (!is_range_del_table_empty_.load(std::memory_order_relaxed)) {
+    auto* unfragmented_iter =
+        new MemTableIterator(*this, ReadOptions(), nullptr /* arena */,
+                             true /* use_range_del_table */);
+
+    fragmented_range_tombstone_list_ =
+        std::make_unique<FragmentedRangeTombstoneList>(
+            std::unique_ptr<InternalIterator>(unfragmented_iter),
+            comparator_.comparator);
+  }
+}
+
 port::RWMutex* MemTable::GetLock(const Slice& key) {
   return &locks_[GetSliceRangedNPHash(key, locks_.size())];
 }
@@ -885,7 +912,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
                    MergeContext* merge_context,
                    SequenceNumber* max_covering_tombstone_seq,
                    SequenceNumber* seq, const ReadOptions& read_opts,
-                   ReadCallback* callback, bool* is_blob_index, bool do_merge) {
+                   bool immutable_memtable, ReadCallback* callback,
+                   bool* is_blob_index, bool do_merge) {
   // The sequence number is updated synchronously in version_set.h
   if (IsEmpty()) {
     // Avoiding recording stats for speed.
@@ -895,7 +923,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
 
   std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
       NewRangeTombstoneIterator(read_opts,
-                                GetInternalKeySeqno(key.internal_key())));
+                                GetInternalKeySeqno(key.internal_key()),
+                                immutable_memtable));
   if (range_del_iter != nullptr) {
     *max_covering_tombstone_seq =
         std::max(*max_covering_tombstone_seq,
@@ -977,7 +1006,7 @@ void MemTable::GetFromTable(const LookupKey& key,
 }
 
 void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
-                        ReadCallback* callback) {
+                        ReadCallback* callback, bool immutable_memtable) {
   // The sequence number is updated synchronously in version_set.h
   if (IsEmpty()) {
     // Avoiding recording stats for speed.
@@ -1024,7 +1053,8 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
     if (!no_range_del) {
       std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
           NewRangeTombstoneIteratorInternal(
-              read_options, GetInternalKeySeqno(iter->lkey->internal_key())));
+              read_options, GetInternalKeySeqno(iter->lkey->internal_key()),
+              immutable_memtable));
       iter->max_covering_tombstone_seq = std::max(
           iter->max_covering_tombstone_seq,
           range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
diff --git a/db/memtable.h b/db/memtable.h
index fe038a90b7..80d23657c2 100644
--- a/db/memtable.h
+++ b/db/memtable.h
@@ -202,8 +202,19 @@ class MemTable {
   //        those allocated in arena.
   InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
 
+  // Returns an iterator that yields the range tombstones of the memtable.
+  // The caller must ensure that the underlying MemTable remains live
+  // while the returned iterator is live.
+  // @param immutable_memtable Whether this memtable is an immutable memtable.
+  // This information is not stored in memtable itself, so it needs to be
+  // specified by the caller. This flag is used internally to decide whether a
+  // cached fragmented range tombstone list can be returned. This cached version
+  // is constructed when a memtable becomes immutable. Setting the flag to false
+  // will always yield correct result, but may incur performance penalty as it
+  // always creates a new fragmented range tombstone list.
   FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
-      const ReadOptions& read_options, SequenceNumber read_seq);
+      const ReadOptions& read_options, SequenceNumber read_seq,
+      bool immutable_memtable);
 
   Status VerifyEncodedEntry(Slice encoded,
                             const ProtectionInfoKVOS64& kv_prot_info);
@@ -244,35 +255,44 @@ class MemTable {
   // If do_merge = false then any Merge Operands encountered for key are simply
   // stored in merge_context.operands_list and never actually merged to get a
   // final value. The raw Merge Operands are eventually returned to the user.
+  // @param immutable_memtable Whether this memtable is immutable. Used
+  // internally by NewRangeTombstoneIterator(). See comment above
+  // NewRangeTombstoneIterator() for more detail.
   bool Get(const LookupKey& key, std::string* value, Status* s,
            MergeContext* merge_context,
            SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
-           const ReadOptions& read_opts, ReadCallback* callback = nullptr,
-           bool* is_blob_index = nullptr, bool do_merge = true) {
+           const ReadOptions& read_opts, bool immutable_memtable,
+           ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
+           bool do_merge = true) {
     return Get(key, value, /*timestamp=*/nullptr, s, merge_context,
-               max_covering_tombstone_seq, seq, read_opts, callback,
-               is_blob_index, do_merge);
+               max_covering_tombstone_seq, seq, read_opts, immutable_memtable,
+               callback, is_blob_index, do_merge);
   }
 
   bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
            Status* s, MergeContext* merge_context,
            SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
-           const ReadOptions& read_opts, ReadCallback* callback = nullptr,
-           bool* is_blob_index = nullptr, bool do_merge = true);
+           const ReadOptions& read_opts, bool immutable_memtable,
+           ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
+           bool do_merge = true);
 
   bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
            Status* s, MergeContext* merge_context,
            SequenceNumber* max_covering_tombstone_seq,
-           const ReadOptions& read_opts, ReadCallback* callback = nullptr,
-           bool* is_blob_index = nullptr, bool do_merge = true) {
+           const ReadOptions& read_opts, bool immutable_memtable,
+           ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
+           bool do_merge = true) {
     SequenceNumber seq;
     return Get(key, value, timestamp, s, merge_context,
-               max_covering_tombstone_seq, &seq, read_opts, callback,
-               is_blob_index, do_merge);
+               max_covering_tombstone_seq, &seq, read_opts, immutable_memtable,
+               callback, is_blob_index, do_merge);
   }
 
+  // @param immutable_memtable Whether this memtable is immutable. Used
+  // internally by NewRangeTombstoneIterator(). See comment above
+  // NewRangeTombstoneIterator() for more detail.
   void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
-                ReadCallback* callback);
+                ReadCallback* callback, bool immutable_memtable);
 
   // If `key` exists in current memtable with type value_type and the existing
   // value is at least as large as the new value, updates it in-place. Otherwise
@@ -502,6 +522,23 @@ class MemTable {
   // Returns a heuristic flush decision
   bool ShouldFlushNow();
 
+  void ConstructFragmentedRangeTombstones();
+
+  // Returns whether a fragmented range tombstone list is already constructed
+  // for this memtable. It should be constructed right before a memtable is
+  // added to an immutable memtable list. Note that if a memtable does not have
+  // any range tombstone, then no range tombstone list will ever be constructed.
+  // @param allow_empty Specifies whether a memtable with no range tombstone is
+  // considered to have its fragmented range tombstone list constructed.
+  bool IsFragmentedRangeTombstonesConstructed(bool allow_empty = true) const {
+    if (allow_empty) {
+      return fragmented_range_tombstone_list_.get() != nullptr ||
+             is_range_del_table_empty_;
+    } else {
+      return fragmented_range_tombstone_list_.get() != nullptr;
+    }
+  }
+
  private:
   enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
 
@@ -601,9 +638,18 @@ class MemTable {
                     MergeContext* merge_context, SequenceNumber* seq,
                     bool* found_final_value, bool* merge_in_progress);
 
-  // Always returns non-null and assumes certain pre-checks are done
+  // Always returns non-null and assumes certain pre-checks (e.g.,
+  // is_range_del_table_empty_) are done. This is only valid during the lifetime
+  // of the underlying memtable.
   FragmentedRangeTombstoneIterator* NewRangeTombstoneIteratorInternal(
-      const ReadOptions& read_options, SequenceNumber read_seq);
+      const ReadOptions& read_options, SequenceNumber read_seq,
+      bool immutable_memtable);
+
+  // The fragmented range tombstones of this memtable.
+  // This is constructed when this memtable becomes immutable
+  // if !is_range_del_table_empty_.
+  std::unique_ptr<FragmentedRangeTombstoneList>
+      fragmented_range_tombstone_list_;
 };
 
 extern const char* EncodeKey(std::string* scratch, const Slice& target);
diff --git a/db/memtable_list.cc b/db/memtable_list.cc
index 72f309fedf..858404a0e1 100644
--- a/db/memtable_list.cc
+++ b/db/memtable_list.cc
@@ -119,7 +119,8 @@ void MemTableListVersion::MultiGet(const ReadOptions& read_options,
                                    MultiGetRange* range,
                                    ReadCallback* callback) {
   for (auto memtable : memlist_) {
-    memtable->MultiGet(read_options, range, callback);
+    memtable->MultiGet(read_options, range, callback,
+                       true /* immutable_memtable */);
     if (range->empty()) {
       return;
     }
@@ -130,9 +131,10 @@ bool MemTableListVersion::GetMergeOperands(
     const LookupKey& key, Status* s, MergeContext* merge_context,
     SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
   for (MemTable* memtable : memlist_) {
-    bool done = memtable->Get(key, /*value*/ nullptr, /*timestamp*/ nullptr, s,
-                              merge_context, max_covering_tombstone_seq,
-                              read_opts, nullptr, nullptr, false);
+    bool done =
+        memtable->Get(key, /*value*/ nullptr, /*timestamp*/ nullptr, s,
+                      merge_context, max_covering_tombstone_seq, read_opts,
+                      true /* immutable_memtable */, nullptr, nullptr, false);
     if (done) {
       return true;
     }
@@ -157,11 +159,13 @@ bool MemTableListVersion::GetFromList(
   *seq = kMaxSequenceNumber;
 
   for (auto& memtable : *list) {
+    assert(memtable->IsFragmentedRangeTombstonesConstructed());
     SequenceNumber current_seq = kMaxSequenceNumber;
 
-    bool done = memtable->Get(key, value, timestamp, s, merge_context,
-                              max_covering_tombstone_seq, &current_seq,
-                              read_opts, callback, is_blob_index);
+    bool done =
+        memtable->Get(key, value, timestamp, s, merge_context,
+                      max_covering_tombstone_seq, &current_seq, read_opts,
+                      true /* immutable_memtable */, callback, is_blob_index);
     if (*seq == kMaxSequenceNumber) {
       // Store the most recent sequence number of any operation on this key.
       // Since we only care about the most recent change, we only need to
@@ -194,8 +198,10 @@ Status MemTableListVersion::AddRangeTombstoneIterators(
                                 ? read_opts.snapshot->GetSequenceNumber()
                                 : kMaxSequenceNumber;
   for (auto& m : memlist_) {
+    assert(m->IsFragmentedRangeTombstonesConstructed());
     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
-        m->NewRangeTombstoneIterator(read_opts, read_seq));
+        m->NewRangeTombstoneIterator(read_opts, read_seq,
+                                     true /* immutable_memtable */));
     range_del_agg->AddTombstones(std::move(range_del_iter));
   }
   return Status::OK();
diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc
index 6804e311cd..9831da231b 100644
--- a/db/memtable_list_test.cc
+++ b/db/memtable_list_test.cc
@@ -267,22 +267,25 @@ TEST_F(MemTableListTest, GetTest) {
   // Fetch the newly written keys
   merge_context.Clear();
   found = mem->Get(LookupKey("key1", seq), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value1");
 
   merge_context.Clear();
   found = mem->Get(LookupKey("key1", 2), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   // MemTable found out that this key is *not* found (at this sequence#)
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
   found = mem->Get(LookupKey("key2", seq), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value2.2");
 
@@ -290,6 +293,9 @@ TEST_F(MemTableListTest, GetTest) {
   ASSERT_EQ(1, mem->num_deletes());
 
   // Add memtable to list
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem->ConstructFragmentedRangeTombstones();
   list.Add(mem, &to_delete);
 
   SequenceNumber saved_seq = seq;
@@ -306,6 +312,9 @@ TEST_F(MemTableListTest, GetTest) {
                       nullptr /* kv_prot_info */));
 
   // Add second memtable to list
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem2->ConstructFragmentedRangeTombstones();
   list.Add(mem2, &to_delete);
 
   // Fetch keys via MemTableList
@@ -388,19 +397,24 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
   // Fetch the newly written keys
   merge_context.Clear();
   found = mem->Get(LookupKey("key1", seq), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   // MemTable found out that this key is *not* found (at this sequence#)
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
   found = mem->Get(LookupKey("key2", seq), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value2.2");
 
   // Add memtable to list
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem->ConstructFragmentedRangeTombstones();
   list.Add(mem, &to_delete);
   ASSERT_EQ(0, to_delete.size());
 
@@ -472,6 +486,9 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
                       nullptr /* kv_prot_info */));
 
   // Add second memtable to list
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem2->ConstructFragmentedRangeTombstones();
   list.Add(mem2, &to_delete);
   ASSERT_EQ(0, to_delete.size());
 
@@ -493,6 +510,9 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
   MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
                                 kMaxSequenceNumber, 0 /* column_family_id */);
   mem3->Ref();
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem3->ConstructFragmentedRangeTombstones();
   list.Add(mem3, &to_delete);
   ASSERT_EQ(1, list.NumNotFlushed());
   ASSERT_EQ(1, list.NumFlushed());
diff --git a/db/repair.cc b/db/repair.cc
index 014db4e8ad..482c3af7d9 100644
--- a/db/repair.cc
+++ b/db/repair.cc
@@ -431,8 +431,8 @@ class Repairer {
       auto write_hint = cfd->CalculateSSTWriteHint(0);
       std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
           range_del_iters;
-      auto range_del_iter =
-          mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
+      auto range_del_iter = mem->NewRangeTombstoneIterator(
+          ro, kMaxSequenceNumber, false /* immutable_memtable */);
       if (range_del_iter != nullptr) {
         range_del_iters.emplace_back(range_del_iter);
       }
diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc
index d8779435b0..40f23f54b7 100644
--- a/db/write_batch_test.cc
+++ b/db/write_batch_test.cc
@@ -60,7 +60,8 @@ static std::string PrintContents(WriteBatch* b,
       arena_iter_guard.set(iter);
     } else {
       iter = mem->NewRangeTombstoneIterator(ReadOptions(),
-                                            kMaxSequenceNumber /* read_seq */);
+                                            kMaxSequenceNumber /* read_seq */,
+                                            false /* immutable_memtable */);
       iter_guard.reset(iter);
     }
     if (iter == nullptr) {
diff --git a/table/table_test.cc b/table/table_test.cc
index fc45ce3476..62ff4ed66d 100644
--- a/table/table_test.cc
+++ b/table/table_test.cc
@@ -4186,7 +4186,8 @@ TEST_F(MemTableTest, Simple) {
       arena_iter_guard.set(iter);
     } else {
       iter = GetMemTable()->NewRangeTombstoneIterator(
-          ReadOptions(), kMaxSequenceNumber /* read_seq */);
+          ReadOptions(), kMaxSequenceNumber /* read_seq */,
+          false /* immutable_memtable */);
       iter_guard.reset(iter);
     }
     if (iter == nullptr) {
diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc
index 8ee1497051..1ea410318e 100644
--- a/tools/db_bench_tool.cc
+++ b/tools/db_bench_tool.cc
@@ -5099,6 +5099,7 @@ class Benchmark {
     int64_t num_written = 0;
     int64_t next_seq_db_at = num_ops;
     size_t id = 0;
+    int64_t num_range_deletions = 0;
 
     while ((num_per_key_gen != 0) && !duration.Done(entries_per_batch_)) {
       if (duration.GetStage() != stage) {
@@ -5296,6 +5297,7 @@ class Benchmark {
             (num_written - writes_before_delete_range_) %
                     writes_per_range_tombstone_ ==
                 0) {
+          num_range_deletions++;
           int64_t begin_num = key_gens[id]->Next();
           if (FLAGS_expand_range_tombstones) {
             for (int64_t offset = 0; offset < range_tombstone_width_;
@@ -5401,6 +5403,10 @@ class Benchmark {
               ".\nNumber of 'disposable entry delete': %" PRIu64 "\n",
               num_written, num_selective_deletes);
     }
+    if (num_range_deletions > 0) {
+      std::cout << "Number of range deletions: " << num_range_deletions
+                << std::endl;
+    }
     thread->stats.AddBytes(bytes);
   }