From 1e40696dd14d86b0e0330fd53d8114dc4d9011ad Mon Sep 17 00:00:00 2001
From: Cheng Chang <xcc@fb.com>
Date: Fri, 6 Nov 2020 16:30:44 -0800
Subject: [PATCH] Track WAL in MANIFEST: LogAndApply WAL events to MANIFEST
 (#7601)

Summary:
When a WAL is synced, an edit is written to MANIFEST.
After flushing memtables, the obsoleted WALs are piggybacked to MANIFEST while writing the new L0 files to MANIFEST.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7601

Test Plan:
`track_and_verify_wals_in_manifest` is enabled by default for all tests extending `DBBasicTest`, and in db_stress_test.
Unit test `wal_edit_test`, `version_edit_test`, and `version_set_test` are also updated.
Watch all tests to pass.

Reviewed By: ltamasi

Differential Revision: D24553957

Pulled By: cheng-chang

fbshipit-source-id: 66a569ff1bdced38e22900bd240b73113906e040
---
 db/db_impl/db_impl.cc                  | 50 +++++++++++---
 db/db_impl/db_impl.h                   | 14 +++-
 db/db_impl/db_impl_compaction_flush.cc |  6 +-
 db/db_impl/db_impl_files.cc            | 26 +++++---
 db/db_impl/db_impl_open.cc             | 14 ++--
 db/db_impl/db_impl_write.cc            | 12 +++-
 db/db_range_del_test.cc                |  2 +-
 db/db_test_util.cc                     |  1 +
 db/memtable_list.cc                    | 53 +++++++++++++--
 db/version_edit.cc                     | 29 ++++-----
 db/version_edit.h                      | 22 ++++---
 db/version_edit_handler.cc             |  3 +-
 db/version_edit_test.cc                | 29 +++------
 db/version_set.cc                      |  4 +-
 db/version_set_test.cc                 | 90 ++++++++++++++++++--------
 db/wal_edit.cc                         | 26 +-------
 db/wal_edit.h                          | 20 +++---
 db/wal_edit_test.cc                    | 16 ++---
 db_stress_tool/db_stress_test_base.cc  |  1 +
 19 files changed, 263 insertions(+), 155 deletions(-)

diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc
index f0737cafb4..5a1fde9aac 100644
--- a/db/db_impl/db_impl.cc
+++ b/db/db_impl/db_impl.cc
@@ -1283,7 +1283,11 @@ Status DBImpl::SyncWAL() {
   TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
   {
     InstrumentedMutexLock l(&mutex_);
-    MarkLogsSynced(current_log_number, need_log_dir_sync, status);
+    if (status.ok()) {
+      status = MarkLogsSynced(current_log_number, need_log_dir_sync);
+    } else {
+      MarkLogsNotSynced(current_log_number);
+    }
   }
   TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
 
@@ -1309,27 +1313,53 @@ Status DBImpl::UnlockWAL() {
   return Status::OK();
 }
 
-void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
-                            const Status& status) {
+Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
   mutex_.AssertHeld();
-  if (synced_dir && logfile_number_ == up_to && status.ok()) {
+  if (synced_dir && logfile_number_ == up_to) {
     log_dir_synced_ = true;
   }
+  VersionEdit synced_wals;
   for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
-    auto& log = *it;
-    assert(log.getting_synced);
-    if (status.ok() && logs_.size() > 1) {
-      logs_to_free_.push_back(log.ReleaseWriter());
+    auto& wal = *it;
+    assert(wal.getting_synced);
+    if (logs_.size() > 1) {
+      if (immutable_db_options_.track_and_verify_wals_in_manifest) {
+        synced_wals.AddWal(wal.number,
+                           WalMetadata(wal.writer->file()->GetFileSize()));
+      }
+      logs_to_free_.push_back(wal.ReleaseWriter());
       // To modify logs_ both mutex_ and log_write_mutex_ must be held
       InstrumentedMutexLock l(&log_write_mutex_);
       it = logs_.erase(it);
     } else {
-      log.getting_synced = false;
+      wal.getting_synced = false;
       ++it;
     }
   }
-  assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
+  assert(logs_.empty() || logs_[0].number > up_to ||
          (logs_.size() == 1 && !logs_[0].getting_synced));
+
+  Status s;
+  if (synced_wals.IsWalAddition()) {
+    // not empty, write to MANIFEST.
+    s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
+    if (!s.ok() && versions_->io_status().IsIOError()) {
+      s = error_handler_.SetBGError(versions_->io_status(),
+                                    BackgroundErrorReason::kManifestWrite);
+    }
+  }
+  log_sync_cv_.SignalAll();
+  return s;
+}
+
+void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
+  mutex_.AssertHeld();
+  for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
+       ++it) {
+    auto& wal = *it;
+    assert(wal.getting_synced);
+    wal.getting_synced = false;
+  }
   log_sync_cv_.SignalAll();
 }
 
diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h
index 68b053f312..1ffff8fa69 100644
--- a/db/db_impl/db_impl.h
+++ b/db/db_impl/db_impl.h
@@ -1702,7 +1702,9 @@ class DBImpl : public DB {
       std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);
 
   // helper function to call after some of the logs_ were synced
-  void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
+  Status MarkLogsSynced(uint64_t up_to, bool synced_dir);
+  // WALs with log number up to up_to are not synced successfully.
+  void MarkLogsNotSynced(uint64_t up_to);
 
   SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary,
                                 bool lock = true);
@@ -2204,12 +2206,18 @@ extern CompressionType GetCompressionFlush(
 // `memtables_to_flush`) will be flushed and thus will not depend on any WAL
 // file.
 // The function is only applicable to 2pc mode.
-extern uint64_t PrecomputeMinLogNumberToKeep(
+extern uint64_t PrecomputeMinLogNumberToKeep2PC(
     VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
-    autovector<VersionEdit*> edit_list,
+    const autovector<VersionEdit*>& edit_list,
     const autovector<MemTable*>& memtables_to_flush,
     LogsWithPrepTracker* prep_tracker);
 
+// In non-2PC mode, WALs with log number < the returned number can be
+// deleted after the cfd_to_flush column family is flushed successfully.
+extern uint64_t PrecomputeMinLogNumberToKeepNon2PC(
+    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
+    const autovector<VersionEdit*>& edit_list);
+
 // `cfd_to_flush` is the column family whose memtable will be flushed and thus
 // will not depend on any WAL file. nullptr means no memtable is being flushed.
 // The function is only applicable to 2pc mode.
diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc
index 742f8882fe..518cabf115 100644
--- a/db/db_impl/db_impl_compaction_flush.cc
+++ b/db/db_impl/db_impl_compaction_flush.cc
@@ -123,7 +123,11 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
 
     // "number <= current_log_number - 1" is equivalent to
     // "number < current_log_number".
-    MarkLogsSynced(current_log_number - 1, true, io_s);
+    if (io_s.ok()) {
+      io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true));
+    } else {
+      MarkLogsNotSynced(current_log_number - 1);
+    }
     if (!io_s.ok()) {
       if (total_log_size_ > 0) {
         error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush)
diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc
index 72faf8a037..d825c406ca 100644
--- a/db/db_impl/db_impl_files.cc
+++ b/db/db_impl/db_impl_files.cc
@@ -680,16 +680,10 @@ uint64_t FindMinPrepLogReferencedByMemTable(
   return min_log;
 }
 
-uint64_t PrecomputeMinLogNumberToKeep(
+uint64_t PrecomputeMinLogNumberToKeepNon2PC(
     VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
-    autovector<VersionEdit*> edit_list,
-    const autovector<MemTable*>& memtables_to_flush,
-    LogsWithPrepTracker* prep_tracker) {
+    const autovector<VersionEdit*>& edit_list) {
   assert(vset != nullptr);
-  assert(prep_tracker != nullptr);
-  // Calculate updated min_log_number_to_keep
-  // Since the function should only be called in 2pc mode, log number in
-  // the version edit should be sufficient.
 
   // Precompute the min log number containing unflushed data for the column
   // family being flushed (`cfd_to_flush`).
@@ -713,6 +707,22 @@ uint64_t PrecomputeMinLogNumberToKeep(
     min_log_number_to_keep =
         std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
   }
+  return min_log_number_to_keep;
+}
+
+uint64_t PrecomputeMinLogNumberToKeep2PC(
+    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
+    const autovector<VersionEdit*>& edit_list,
+    const autovector<MemTable*>& memtables_to_flush,
+    LogsWithPrepTracker* prep_tracker) {
+  assert(vset != nullptr);
+  assert(prep_tracker != nullptr);
+  // Calculate updated min_log_number_to_keep
+  // Since the function should only be called in 2pc mode, log number in
+  // the version edit should be sufficient.
+
+  uint64_t min_log_number_to_keep =
+      PrecomputeMinLogNumberToKeepNon2PC(vset, cfd_to_flush, edit_list);
 
   // if are 2pc we must consider logs containing prepared
   // sections of outstanding transactions.
diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc
index 8f660924e6..90759e9c5b 100644
--- a/db/db_impl/db_impl_open.cc
+++ b/db/db_impl/db_impl_open.cc
@@ -589,18 +589,20 @@ Status DBImpl::Recover(
     }
 
     if (immutable_db_options_.track_and_verify_wals_in_manifest) {
-      // Verify WALs in MANIFEST.
-      s = versions_->GetWalSet().CheckWals(env_, wal_files);
+      if (!immutable_db_options_.best_efforts_recovery) {
+        // Verify WALs in MANIFEST.
+        s = versions_->GetWalSet().CheckWals(env_, wal_files);
+      }  // else since best effort recovery does not recover from WALs, no need
+         // to check WALs.
     } else if (!versions_->GetWalSet().GetWals().empty()) {
       // Tracking is disabled, clear previously tracked WALs from MANIFEST,
       // otherwise, in the future, if WAL tracking is enabled again,
       // since the WALs deleted when WAL tracking is disabled are not persisted
       // into MANIFEST, WAL check may fail.
       VersionEdit edit;
-      for (const auto& wal : versions_->GetWalSet().GetWals()) {
-        WalNumber number = wal.first;
-        edit.DeleteWal(number);
-      }
+      WalNumber max_wal_number =
+          versions_->GetWalSet().GetWals().rbegin()->first;
+      edit.DeleteWalsBefore(max_wal_number + 1);
       s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
     }
     if (!s.ok()) {
diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc
index 5d4f88f0b8..1cab2b6c05 100644
--- a/db/db_impl/db_impl_write.cc
+++ b/db/db_impl/db_impl_write.cc
@@ -426,7 +426,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
 
   if (need_log_sync) {
     mutex_.Lock();
-    MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
+    if (status.ok()) {
+      status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
+    } else {
+      MarkLogsNotSynced(logfile_number_);
+    }
     mutex_.Unlock();
     // Requesting sync with two_write_queues_ is expected to be very rare. We
     // hence provide a simple implementation that is not necessarily efficient.
@@ -551,7 +555,11 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
 
     if (need_log_sync) {
       mutex_.Lock();
-      MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
+      if (w.status.ok()) {
+        w.status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
+      } else {
+        MarkLogsNotSynced(logfile_number_);
+      }
       mutex_.Unlock();
     }
 
diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc
index 9428ea6c61..403b221e68 100644
--- a/db/db_range_del_test.cc
+++ b/db/db_range_del_test.cc
@@ -159,7 +159,7 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
   // Want max_compaction_bytes to trigger the end of compaction output file, not
   // target_file_size_base, so make the latter much bigger
   opts.target_file_size_base = 100 * opts.max_compaction_bytes;
-  Reopen(opts);
+  DestroyAndReopen(opts);
 
   // snapshot protects range tombstone from dropping due to becoming obsolete.
   const Snapshot* snapshot = db_->GetSnapshot();
diff --git a/db/db_test_util.cc b/db/db_test_util.cc
index 0d52eebd90..42c5576430 100644
--- a/db/db_test_util.cc
+++ b/db/db_test_util.cc
@@ -340,6 +340,7 @@ Options DBTestBase::GetDefaultOptions() const {
   options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
   options.compaction_pri = CompactionPri::kByCompensatedSize;
   options.env = env_;
+  options.track_and_verify_wals_in_manifest = true;
   return options;
 }
 
diff --git a/db/memtable_list.cc b/db/memtable_list.cc
index 1e1758d595..77c2ba51e3 100644
--- a/db/memtable_list.cc
+++ b/db/memtable_list.cc
@@ -473,12 +473,27 @@ Status MemTableList::TryInstallMemtableFlushResults(
 
     // TODO(myabandeh): Not sure how batch_count could be 0 here.
     if (batch_count > 0) {
+      uint64_t min_wal_number_to_keep = 0;
       if (vset->db_options()->allow_2pc) {
         assert(edit_list.size() > 0);
+        min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
+            vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
         // We piggyback the information of  earliest log file to keep in the
         // manifest entry for the last file flushed.
-        edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
-            vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
+        edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
+      } else {
+        min_wal_number_to_keep =
+            PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
+      }
+
+      std::unique_ptr<VersionEdit> wal_deletion;
+      if (vset->db_options()->track_and_verify_wals_in_manifest) {
+        const auto& wals = vset->GetWalSet().GetWals();
+        if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
+          wal_deletion.reset(new VersionEdit);
+          wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
+          edit_list.push_back(wal_deletion.get());
+        }
       }
 
       const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
@@ -704,6 +719,10 @@ Status InstallMemtableAtomicFlushResults(
   if (imm_lists != nullptr) {
     assert(imm_lists->size() == num);
   }
+  if (num == 0) {
+    return Status::OK();
+  }
+
   for (size_t k = 0; k != num; ++k) {
 #ifndef NDEBUG
     const auto* imm =
@@ -732,12 +751,36 @@ Status InstallMemtableAtomicFlushResults(
     ++num_entries;
     edit_lists.emplace_back(edits);
   }
+
+  // TODO(cc): after https://github.com/facebook/rocksdb/pull/7570, handle 2pc
+  // here.
+  std::unique_ptr<VersionEdit> wal_deletion;
+  if (vset->db_options()->track_and_verify_wals_in_manifest) {
+    uint64_t min_wal_number_to_keep =
+        PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[0], edit_lists[0]);
+    for (size_t i = 1; i < cfds.size(); i++) {
+      min_wal_number_to_keep = std::min(
+          min_wal_number_to_keep,
+          PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[i], edit_lists[i]));
+    }
+    const auto& wals = vset->GetWalSet().GetWals();
+    if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
+      wal_deletion.reset(new VersionEdit);
+      wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
+      edit_lists.back().push_back(wal_deletion.get());
+      ++num_entries;
+    }
+  }
+
   // Mark the version edits as an atomic group if the number of version edits
   // exceeds 1.
   if (cfds.size() > 1) {
-    for (auto& edits : edit_lists) {
-      assert(edits.size() == 1);
-      edits[0]->MarkAtomicGroup(--num_entries);
+    for (size_t i = 0; i < edit_lists.size(); i++) {
+      assert((edit_lists[i].size() == 1) ||
+             ((edit_lists[i].size() == 2) && (i == edit_lists.size() - 1)));
+      for (auto& e : edit_lists[i]) {
+        e->MarkAtomicGroup(--num_entries);
+      }
     }
     assert(0 == num_entries);
   }
diff --git a/db/version_edit.cc b/db/version_edit.cc
index 8879f0e1bc..8f4fb5766a 100644
--- a/db/version_edit.cc
+++ b/db/version_edit.cc
@@ -89,7 +89,7 @@ void VersionEdit::Clear() {
   blob_file_additions_.clear();
   blob_file_garbages_.clear();
   wal_additions_.clear();
-  wal_deletions_.clear();
+  wal_deletion_.Reset();
   column_family_ = 0;
   is_column_family_add_ = false;
   is_column_family_drop_ = false;
@@ -229,9 +229,9 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
     wal_addition.EncodeTo(dst);
   }
 
-  for (const auto& wal_deletion : wal_deletions_) {
+  if (!wal_deletion_.IsEmpty()) {
     PutVarint32(dst, kWalDeletion);
-    wal_deletion.EncodeTo(dst);
+    wal_deletion_.EncodeTo(dst);
   }
 
   // 0 is default and does not need to be explicitly written
@@ -576,7 +576,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
           return s;
         }
 
-        wal_deletions_.emplace_back(std::move(wal_deletion));
+        wal_deletion_ = std::move(wal_deletion);
         break;
       }
 
@@ -725,9 +725,9 @@ std::string VersionEdit::DebugString(bool hex_key) const {
     r.append(wal_addition.DebugString());
   }
 
-  for (const auto& wal_deletion : wal_deletions_) {
+  if (!wal_deletion_.IsEmpty()) {
     r.append("\n  WalDeletion: ");
-    r.append(wal_deletion.DebugString());
+    r.append(wal_deletion_.DebugString());
   }
 
   r.append("\n  ColumnFamily: ");
@@ -854,18 +854,11 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
     jw.EndArray();
   }
 
-  if (!wal_deletions_.empty()) {
-    jw << "WalDeletions";
-
-    jw.StartArray();
-
-    for (const auto& wal_deletion : wal_deletions_) {
-      jw.StartArrayedObject();
-      jw << wal_deletion;
-      jw.EndArrayedObject();
-    }
-
-    jw.EndArray();
+  if (!wal_deletion_.IsEmpty()) {
+    jw << "WalDeletion";
+    jw.StartObject();
+    jw << wal_deletion_;
+    jw.EndObject();
   }
 
   jw << "ColumnFamily" << column_family_;
diff --git a/db/version_edit.h b/db/version_edit.h
index ff6a16c49c..9193a8369b 100644
--- a/db/version_edit.h
+++ b/db/version_edit.h
@@ -452,6 +452,7 @@ class VersionEdit {
   }
 
   // Add a WAL (either just created or closed).
+  // AddWal and DeleteWalsBefore cannot be called on the same VersionEdit.
   void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) {
     assert(NumEntries() == wal_additions_.size());
     wal_additions_.emplace_back(number, std::move(metadata));
@@ -463,22 +464,27 @@ class VersionEdit {
   bool IsWalAddition() const { return !wal_additions_.empty(); }
 
   // Delete a WAL (either directly deleted or archived).
-  void DeleteWal(WalNumber number) {
-    assert(NumEntries() == wal_deletions_.size());
-    wal_deletions_.emplace_back(number);
+  // AddWal and DeleteWalsBefore cannot be called on the same VersionEdit.
+  void DeleteWalsBefore(WalNumber number) {
+    assert((NumEntries() == 1) == !wal_deletion_.IsEmpty());
+    wal_deletion_ = WalDeletion(number);
   }
 
-  const WalDeletions& GetWalDeletions() const { return wal_deletions_; }
+  const WalDeletion& GetWalDeletion() const { return wal_deletion_; }
 
-  bool IsWalDeletion() const { return !wal_deletions_.empty(); }
+  bool IsWalDeletion() const { return !wal_deletion_.IsEmpty(); }
 
-  bool IsWalManipulation() const { return IsWalAddition() || IsWalDeletion(); }
+  bool IsWalManipulation() const {
+    size_t entries = NumEntries();
+    return (entries > 0) && ((entries == wal_additions_.size()) ||
+                             (entries == !wal_deletion_.IsEmpty()));
+  }
 
   // Number of edits
   size_t NumEntries() const {
     return new_files_.size() + deleted_files_.size() +
            blob_file_additions_.size() + blob_file_garbages_.size() +
-           wal_additions_.size() + wal_deletions_.size();
+           wal_additions_.size() + !wal_deletion_.IsEmpty();
   }
 
   void SetColumnFamily(uint32_t column_family_id) {
@@ -563,7 +569,7 @@ class VersionEdit {
   BlobFileGarbages blob_file_garbages_;
 
   WalAdditions wal_additions_;
-  WalDeletions wal_deletions_;
+  WalDeletion wal_deletion_;
 
   // Each version edit record should have column_family_ set
   // If it's not set, it is default (0)
diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc
index a1ce12d1b2..c5924f065c 100644
--- a/db/version_edit_handler.cc
+++ b/db/version_edit_handler.cc
@@ -201,7 +201,8 @@ Status VersionEditHandler::OnWalAddition(VersionEdit& edit) {
 
 Status VersionEditHandler::OnWalDeletion(VersionEdit& edit) {
   assert(edit.IsWalDeletion());
-  return version_set_->wals_.DeleteWals(edit.GetWalDeletions());
+  return version_set_->wals_.DeleteWalsBefore(
+      edit.GetWalDeletion().GetLogNumber());
 }
 
 Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc
index 7236e0a2d9..88e98606a0 100644
--- a/db/version_edit_test.cc
+++ b/db/version_edit_test.cc
@@ -470,9 +470,7 @@ TEST_F(VersionEditTest, AddWalDebug) {
 
 TEST_F(VersionEditTest, DeleteWalEncodeDecode) {
   VersionEdit edit;
-  for (uint64_t log_number = 1; log_number <= 20; log_number++) {
-    edit.DeleteWal(log_number);
-  }
+  edit.DeleteWalsBefore(rand() % 100);
   TestEncodeDecode(edit);
 }
 
@@ -481,36 +479,29 @@ TEST_F(VersionEditTest, DeleteWalDebug) {
   constexpr std::array<uint64_t, n> kLogNumbers{{10, 20}};
 
   VersionEdit edit;
-  for (int i = 0; i < n; i++) {
-    edit.DeleteWal(kLogNumbers[i]);
-  }
+  edit.DeleteWalsBefore(kLogNumbers[n - 1]);
 
-  const WalDeletions& wals = edit.GetWalDeletions();
+  const WalDeletion& wal = edit.GetWalDeletion();
 
   ASSERT_TRUE(edit.IsWalDeletion());
-  ASSERT_EQ(wals.size(), n);
-  for (int i = 0; i < n; i++) {
-    const WalDeletion& wal = wals[i];
-    ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]);
-  }
+  ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[n - 1]);
 
   std::string expected_str = "VersionEdit {\n";
-  for (int i = 0; i < n; i++) {
+  {
     std::stringstream ss;
-    ss << "  WalDeletion: log_number: " << kLogNumbers[i] << "\n";
+    ss << "  WalDeletion: log_number: " << kLogNumbers[n - 1] << "\n";
     expected_str += ss.str();
   }
   expected_str += "  ColumnFamily: 0\n}\n";
   ASSERT_EQ(edit.DebugString(true), expected_str);
 
-  std::string expected_json = "{\"EditNumber\": 4, \"WalDeletions\": [";
-  for (int i = 0; i < n; i++) {
+  std::string expected_json = "{\"EditNumber\": 4, \"WalDeletion\": ";
+  {
     std::stringstream ss;
-    ss << "{\"LogNumber\": " << kLogNumbers[i] << "}";
-    if (i < n - 1) ss << ", ";
+    ss << "{\"LogNumber\": " << kLogNumbers[n - 1] << "}";
     expected_json += ss.str();
   }
-  expected_json += "], \"ColumnFamily\": 0}";
+  expected_json += ", \"ColumnFamily\": 0}";
   ASSERT_EQ(edit.DebugJSON(4, true), expected_json);
 }
 
diff --git a/db/version_set.cc b/db/version_set.cc
index 0fe09ef34f..f15b94e775 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -4179,7 +4179,7 @@ Status VersionSet::ProcessManifestWrites(
       if (e->IsWalAddition()) {
         s = wals_.AddWals(e->GetWalAdditions());
       } else if (e->IsWalDeletion()) {
-        s = wals_.DeleteWals(e->GetWalDeletions());
+        s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
       }
       if (!s.ok()) {
         break;
@@ -4527,7 +4527,7 @@ Status VersionSet::ApplyOneVersionEditToBuilder(
       return s;
     }
   } else if (edit.IsWalDeletion()) {
-    Status s = wals_.DeleteWals(edit.GetWalDeletions());
+    Status s = wals_.DeleteWalsBefore(edit.GetWalDeletion().GetLogNumber());
     if (!s.ok()) {
       return s;
     }
diff --git a/db/version_set_test.cc b/db/version_set_test.cc
index fce8e025fd..4d54cec924 100644
--- a/db/version_set_test.cc
+++ b/db/version_set_test.cc
@@ -1192,10 +1192,8 @@ TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) {
     edits.back()->AddWal(i, WalMetadata(i));
   }
   // Delete the first half of the WALs.
-  for (uint64_t i = 1; i <= kNumWals; i++) {
-    edits.emplace_back(new VersionEdit);
-    edits.back()->DeleteWal(i);
-  }
+  edits.emplace_back(new VersionEdit);
+  edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
 
   autovector<Version*> versions;
   SyncPoint::GetInstance()->SetCallBack(
@@ -1228,10 +1226,8 @@ TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) {
     edits.back()->AddWal(i, WalMetadata(i));
   }
   // Delete the first half of the WALs.
-  for (uint64_t i = 1; i <= kNumWals; i++) {
-    edits.emplace_back(new VersionEdit);
-    edits.back()->DeleteWal(i);
-  }
+  edits.emplace_back(new VersionEdit);
+  edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
   edits.emplace_back(new VersionEdit);
   edits.back()->SetDBId(kDBId);
 
@@ -1411,7 +1407,7 @@ TEST_F(VersionSetTest, WalDeletion) {
   // Delete the closed WAL.
   {
     VersionEdit edit;
-    edit.DeleteWal(kClosedLogNumber);
+    edit.DeleteWalsBefore(kNonClosedLogNumber);
 
     ASSERT_OK(LogAndApplyToDefaultCF(edit));
 
@@ -1549,39 +1545,83 @@ TEST_F(VersionSetTest, AddWalWithSmallerSize) {
   }
 }
 
-TEST_F(VersionSetTest, DeleteNonExistingWal) {
+TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
   NewDB();
 
-  constexpr WalNumber kLogNumber = 10;
-  constexpr WalNumber kNonExistingNumber = 11;
+  constexpr WalNumber kLogNumber0 = 10;
+  constexpr WalNumber kLogNumber1 = 20;
+  constexpr WalNumber kNonExistingNumber = 15;
+  constexpr uint64_t kSizeInBytes = 111;
+
+  {
+    // Add closed WALs.
+    VersionEdit edit;
+    WalMetadata wal(kSizeInBytes);
+    edit.AddWal(kLogNumber0, wal);
+    edit.AddWal(kLogNumber1, wal);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+  }
+
+  {
+    // Delete WALs before a non-existing WAL.
+    VersionEdit edit;
+    edit.DeleteWalsBefore(kNonExistingNumber);
+
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+  }
+
+  // Recover a new VersionSet, WAL0 is deleted, WAL1 is not.
+  {
+    std::unique_ptr<VersionSet> new_versions(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    ASSERT_OK(new_versions->Recover(column_families_, false));
+    const auto& wals = new_versions->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 1);
+    ASSERT_TRUE(wals.find(kLogNumber1) != wals.end());
+  }
+}
+
+TEST_F(VersionSetTest, DeleteAllWals) {
+  NewDB();
+
+  constexpr WalNumber kMaxLogNumber = 10;
   constexpr uint64_t kSizeInBytes = 111;
 
   {
     // Add a closed WAL.
     VersionEdit edit;
     WalMetadata wal(kSizeInBytes);
-    edit.AddWal(kLogNumber, wal);
+    edit.AddWal(kMaxLogNumber, wal);
 
     ASSERT_OK(LogAndApplyToDefaultCF(edit));
   }
 
   {
-    // Delete a non-existing WAL.
     VersionEdit edit;
-    edit.DeleteWal(kNonExistingNumber);
+    edit.DeleteWalsBefore(kMaxLogNumber + 10);
 
-    Status s = LogAndApplyToDefaultCF(edit);
-    ASSERT_TRUE(s.IsCorruption());
-    ASSERT_TRUE(s.ToString().find("WAL 11 must exist before deletion") !=
-                std::string::npos)
-        << s.ToString();
+    ASSERT_OK(LogAndApplyToDefaultCF(edit));
+  }
+
+  // Recover a new VersionSet, all WALs are deleted.
+  {
+    std::unique_ptr<VersionSet> new_versions(
+        new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
+                       &write_buffer_manager_, &write_controller_,
+                       /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
+    ASSERT_OK(new_versions->Recover(column_families_, false));
+    const auto& wals = new_versions->GetWalSet().GetWals();
+    ASSERT_EQ(wals.size(), 0);
   }
 }
 
 TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
   NewDB();
 
-  constexpr int kAtomicGroupSize = 10;
+  constexpr int kAtomicGroupSize = 7;
   constexpr uint64_t kNumWals = 5;
   const std::string kDBId = "db_db";
 
@@ -1599,11 +1639,9 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
   edits.back()->SetDBId(kDBId);
   edits.back()->MarkAtomicGroup(--remaining);
   // Delete the first added 4 WALs.
-  for (uint64_t i = 1; i < kNumWals; i++) {
-    edits.emplace_back(new VersionEdit);
-    edits.back()->DeleteWal(i);
-    edits.back()->MarkAtomicGroup(--remaining);
-  }
+  edits.emplace_back(new VersionEdit);
+  edits.back()->DeleteWalsBefore(kNumWals);
+  edits.back()->MarkAtomicGroup(--remaining);
   ASSERT_EQ(remaining, 0);
 
   Status s = LogAndApplyToDefaultCF(edits);
diff --git a/db/wal_edit.cc b/db/wal_edit.cc
index 70965a770a..6d639823fb 100644
--- a/db/wal_edit.cc
+++ b/db/wal_edit.cc
@@ -141,33 +141,11 @@ Status WalSet::AddWals(const WalAdditions& wals) {
   return s;
 }
 
-Status WalSet::DeleteWal(const WalDeletion& wal) {
-  auto it = wals_.find(wal.GetLogNumber());
-  // The WAL must exist.
-  if (it == wals_.end()) {
-    std::stringstream ss;
-    ss << "WAL " << wal.GetLogNumber() << " must exist before deletion";
-    return Status::Corruption("WalSet", ss.str());
-  }
-  wals_.erase(it);
+Status WalSet::DeleteWalsBefore(WalNumber wal) {
+  wals_.erase(wals_.begin(), wals_.lower_bound(wal));
   return Status::OK();
 }
 
-Status WalSet::DeleteWals(const WalDeletions& wals) {
-  Status s;
-  for (const WalDeletion& wal : wals) {
-    s = DeleteWal(wal);
-    if (!s.ok()) {
-      break;
-    }
-  }
-  return s;
-}
-
-void WalSet::DeleteWalsBefore(WalNumber number) {
-  wals_.erase(wals_.begin(), wals_.lower_bound(number));
-}
-
 void WalSet::Reset() { wals_.clear(); }
 
 Status WalSet::CheckWals(
diff --git a/db/wal_edit.h b/db/wal_edit.h
index 78510a2e06..1ec7c4bf9d 100644
--- a/db/wal_edit.h
+++ b/db/wal_edit.h
@@ -89,7 +89,7 @@ JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal);
 
 using WalAdditions = std::vector<WalAddition>;
 
-// Records the event of deleting a WAL.
+// Records the event of deleting WALs before the specified log number.
 class WalDeletion {
  public:
   WalDeletion() : number_(kEmpty) {}
@@ -104,6 +104,10 @@ class WalDeletion {
 
   std::string DebugString() const;
 
+  bool IsEmpty() const { return number_ == kEmpty; }
+
+  void Reset() { number_ = kEmpty; }
+
  private:
   static constexpr WalNumber kEmpty = 0;
 
@@ -113,11 +117,9 @@ class WalDeletion {
 std::ostream& operator<<(std::ostream& os, const WalDeletion& wal);
 JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal);
 
-using WalDeletions = std::vector<WalDeletion>;
-
 // Used in VersionSet to keep the current set of WALs.
 //
-// When a WAL is created, closed, deleted, or archived,
+// When a WAL is synced or becomes obsoleted,
 // a VersionEdit is logged to MANIFEST and
 // the WAL is added to or deleted from WalSet.
 //
@@ -132,15 +134,9 @@ class WalSet {
   Status AddWal(const WalAddition& wal);
   Status AddWals(const WalAdditions& wals);
 
-  // Delete WAL(s).
-  // The WAL to be deleted must exist and be closed, otherwise,
-  // return Status::Corruption.
+  // Delete WALs with log number smaller than the specified wal number.
   // Can happen when applying a VersionEdit or recovering from MANIFEST.
-  Status DeleteWal(const WalDeletion& wal);
-  Status DeleteWals(const WalDeletions& wals);
-
-  // Delete WALs with log number < wal_number.
-  void DeleteWalsBefore(WalNumber wal_number);
+  Status DeleteWalsBefore(WalNumber wal);
 
   // Resets the internal state.
   void Reset();
diff --git a/db/wal_edit_test.cc b/db/wal_edit_test.cc
index cdfe1b9cf7..bc863e89ee 100644
--- a/db/wal_edit_test.cc
+++ b/db/wal_edit_test.cc
@@ -25,9 +25,7 @@ TEST(WalSet, AddDeleteReset) {
   ASSERT_EQ(wals.GetWals().size(), 10);
 
   // Delete WAL 1 - 5.
-  for (WalNumber log_number = 1; log_number <= 5; log_number++) {
-    wals.DeleteWal(WalDeletion(log_number));
-  }
+  wals.DeleteWalsBefore(6);
   ASSERT_EQ(wals.GetWals().size(), 5);
 
   WalNumber expected_log_number = 6;
@@ -74,13 +72,13 @@ TEST(WalSet, CreateTwice) {
               std::string::npos);
 }
 
-TEST(WalSet, DeleteNonExistingWal) {
-  constexpr WalNumber kNonExistingNumber = 100;
+TEST(WalSet, DeleteAllWals) {
+  constexpr WalNumber kMaxWalNumber = 10;
   WalSet wals;
-  Status s = wals.DeleteWal(WalDeletion(kNonExistingNumber));
-  ASSERT_TRUE(s.IsCorruption());
-  ASSERT_TRUE(s.ToString().find("WAL 100 must exist before deletion") !=
-              std::string::npos);
+  for (WalNumber i = 1; i <= kMaxWalNumber; i++) {
+    wals.AddWal(WalAddition(i));
+  }
+  ASSERT_OK(wals.DeleteWalsBefore(kMaxWalNumber + 1));
 }
 
 class WalSetTest : public DBTestBase {
diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc
index fe90a07960..da1098e274 100644
--- a/db_stress_tool/db_stress_test_base.cc
+++ b/db_stress_tool/db_stress_test_base.cc
@@ -2066,6 +2066,7 @@ void StressTest::Open() {
         FLAGS_level_compaction_dynamic_level_bytes;
     options_.file_checksum_gen_factory =
         GetFileChecksumImpl(FLAGS_file_checksum_impl);
+    options_.track_and_verify_wals_in_manifest = true;
   } else {
 #ifdef ROCKSDB_LITE
     fprintf(stderr, "--options_file not supported in lite mode\n");