diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc
index f15e6b62e5..ee79eb9f00 100644
--- a/db/db_impl/db_impl_open.cc
+++ b/db/db_impl/db_impl_open.cc
@@ -191,8 +191,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
 #endif  // !ROCKSDB_LITE
 
   // Supported wal compression types
-  if (result.wal_compression != kNoCompression &&
-      result.wal_compression != kZSTD) {
+  if (!StreamingCompressionTypeSupported(result.wal_compression)) {
     result.wal_compression = kNoCompression;
     ROCKS_LOG_WARN(result.info_log,
                    "wal_compression is disabled since only zstd is supported");
@@ -1586,7 +1585,9 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
         tmp_set.Contains(FileType::kWalFile)));
     *new_log = new log::Writer(std::move(file_writer), log_file_num,
                                immutable_db_options_.recycle_log_file_num > 0,
-                               immutable_db_options_.manual_wal_flush);
+                               immutable_db_options_.manual_wal_flush,
+                               immutable_db_options_.wal_compression);
+    io_s = (*new_log)->AddCompressionTypeRecord();
   }
   return io_s;
 }
diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc
index c46db33625..8a61dadaf9 100644
--- a/db/db_wal_test.cc
+++ b/db/db_wal_test.cc
@@ -1280,9 +1280,12 @@ class RecoveryTestHelper {
       ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(),
                                            fname, file_options, &file_writer,
                                            nullptr));
-      current_log_writer.reset(
+      log::Writer* log_writer =
           new log::Writer(std::move(file_writer), current_log_number,
-                          db_options.recycle_log_file_num > 0));
+                          db_options.recycle_log_file_num > 0, false,
+                          db_options.wal_compression);
+      ASSERT_OK(log_writer->AddCompressionTypeRecord());
+      current_log_writer.reset(log_writer);
 
       WriteBatch batch;
       for (int i = 0; i < kKeysPerWALFile; i++) {
@@ -1351,9 +1354,9 @@ class RecoveryTestHelper {
   }
 };
 
-class DBWALTestWithParams
-    : public DBWALTestBase,
-      public ::testing::WithParamInterface<std::tuple<bool, int, int>> {
+class DBWALTestWithParams : public DBWALTestBase,
+                            public ::testing::WithParamInterface<
+                                std::tuple<bool, int, int, CompressionType>> {
  public:
   DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {}
 };
@@ -1364,12 +1367,14 @@ INSTANTIATE_TEST_CASE_P(
                        ::testing::Range(RecoveryTestHelper::kWALFileOffset,
                                         RecoveryTestHelper::kWALFileOffset +
                                             RecoveryTestHelper::kWALFilesCount,
-                                        1)));
+                                        1),
+                       ::testing::Values(CompressionType::kNoCompression,
+                                         CompressionType::kZSTD)));
 
 class DBWALTestWithParamsVaryingRecoveryMode
     : public DBWALTestBase,
       public ::testing::WithParamInterface<
-          std::tuple<bool, int, int, WALRecoveryMode>> {
+          std::tuple<bool, int, int, WALRecoveryMode, CompressionType>> {
  public:
   DBWALTestWithParamsVaryingRecoveryMode()
       : DBWALTestBase("/db_wal_test_with_params_mode") {}
@@ -1386,7 +1391,9 @@ INSTANTIATE_TEST_CASE_P(
         ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
                           WALRecoveryMode::kAbsoluteConsistency,
                           WALRecoveryMode::kPointInTimeRecovery,
-                          WALRecoveryMode::kSkipAnyCorruptedRecords)));
+                          WALRecoveryMode::kSkipAnyCorruptedRecords),
+        ::testing::Values(CompressionType::kNoCompression,
+                          CompressionType::kZSTD)));
 
 // Test scope:
 // - We expect to open the data store when there is incomplete trailing writes
@@ -1432,6 +1439,9 @@ TEST_P(DBWALTestWithParams, kAbsoluteConsistency) {
   // Corruption offset position
   int corrupt_offset = std::get<1>(GetParam());
   int wal_file_id = std::get<2>(GetParam());  // WAL file
+  // WAL compression type
+  CompressionType compression_type = std::get<3>(GetParam());
+  options.wal_compression = compression_type;
 
   if (trunc && corrupt_offset == 0) {
     return;
@@ -1492,9 +1502,12 @@ TEST_P(DBWALTestWithParams, kPointInTimeRecovery) {
   // Corruption offset position
   int corrupt_offset = std::get<1>(GetParam());
   int wal_file_id = std::get<2>(GetParam());  // WAL file
+  // WAL compression type
+  CompressionType compression_type = std::get<3>(GetParam());
 
   // Fill data for testing
   Options options = CurrentOptions();
+  options.wal_compression = compression_type;
   const size_t row_count = RecoveryTestHelper::FillData(this, &options);
 
   // Corrupt the wal
@@ -1543,9 +1556,12 @@ TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) {
   // Corruption offset position
   int corrupt_offset = std::get<1>(GetParam());
   int wal_file_id = std::get<2>(GetParam());  // WAL file
+  // WAL compression type
+  CompressionType compression_type = std::get<3>(GetParam());
 
   // Fill data for testing
   Options options = CurrentOptions();
+  options.wal_compression = compression_type;
   const size_t row_count = RecoveryTestHelper::FillData(this, &options);
 
   // Corrupt the WAL
@@ -1769,8 +1785,11 @@ TEST_P(DBWALTestWithParamsVaryingRecoveryMode,
   int corrupt_offset = std::get<1>(GetParam());
   int wal_file_id = std::get<2>(GetParam());  // WAL file
   WALRecoveryMode recovery_mode = std::get<3>(GetParam());
+  // WAL compression type
+  CompressionType compression_type = std::get<4>(GetParam());
 
   options.wal_recovery_mode = recovery_mode;
+  options.wal_compression = compression_type;
   // Create corrupted WAL
   RecoveryTestHelper::FillData(this, &options);
   RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
diff --git a/db/log_format.h b/db/log_format.h
index c22e2b6bce..d397372f49 100644
--- a/db/log_format.h
+++ b/db/log_format.h
@@ -32,8 +32,11 @@ enum RecordType {
   kRecyclableFirstType = 6,
   kRecyclableMiddleType = 7,
   kRecyclableLastType = 8,
+
+  // Compression Type
+  kSetCompressionType = 9,
 };
-static const int kMaxRecordType = kRecyclableLastType;
+static const int kMaxRecordType = kSetCompressionType;
 
 static const unsigned int kBlockSize = 32768;
 
diff --git a/db/log_reader.cc b/db/log_reader.cc
index 84082e8ea8..d88eb08cb1 100644
--- a/db/log_reader.cc
+++ b/db/log_reader.cc
@@ -38,7 +38,10 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
       last_record_offset_(0),
       end_of_buffer_offset_(0),
       log_number_(log_num),
-      recycled_(false) {}
+      recycled_(false),
+      first_record_read_(false),
+      compression_type_(kNoCompression),
+      compression_type_record_read_(false) {}
 
 Reader::~Reader() {
   delete[] backing_store_;
@@ -79,6 +82,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
         scratch->clear();
         *record = fragment;
         last_record_offset_ = prospective_record_offset;
+        first_record_read_ = true;
         return true;
 
       case kFirstType:
@@ -114,6 +118,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
           scratch->append(fragment.data(), fragment.size());
           *record = Slice(*scratch);
           last_record_offset_ = prospective_record_offset;
+          first_record_read_ = true;
           return true;
         }
         break;
@@ -212,6 +217,30 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
         }
         break;
 
+      case kSetCompressionType: {
+        if (compression_type_record_read_) {
+          ReportCorruption(fragment.size(),
+                           "read multiple SetCompressionType records");
+        }
+        if (first_record_read_) {
+          ReportCorruption(fragment.size(),
+                           "SetCompressionType not the first record");
+        }
+        prospective_record_offset = physical_record_offset;
+        scratch->clear();
+        last_record_offset_ = prospective_record_offset;
+        CompressionTypeRecord compression_record(kNoCompression);
+        Status s = compression_record.DecodeFrom(&fragment);
+        if (!s.ok()) {
+          ReportCorruption(fragment.size(),
+                           "could not decode SetCompressionType record");
+        } else {
+          compression_type_ = compression_record.GetCompressionType();
+          compression_type_record_read_ = true;
+        }
+        break;
+      }
+
       default: {
         char buf[40];
         snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
@@ -449,6 +478,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
         *record = fragment;
         prospective_record_offset = physical_record_offset;
         last_record_offset_ = prospective_record_offset;
+        first_record_read_ = true;
         in_fragmented_record_ = false;
         return true;
 
@@ -483,6 +513,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
           fragments_.clear();
           *record = Slice(*scratch);
           last_record_offset_ = prospective_record_offset;
+          first_record_read_ = true;
           in_fragmented_record_ = false;
           return true;
         }
@@ -512,6 +543,30 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
         }
         break;
 
+      case kSetCompressionType: {
+        if (compression_type_record_read_) {
+          ReportCorruption(fragment.size(),
+                           "read multiple SetCompressionType records");
+        }
+        if (first_record_read_) {
+          ReportCorruption(fragment.size(),
+                           "SetCompressionType not the first record");
+        }
+        fragments_.clear();
+        prospective_record_offset = physical_record_offset;
+        last_record_offset_ = prospective_record_offset;
+        in_fragmented_record_ = false;
+        CompressionTypeRecord compression_record(kNoCompression);
+        Status s = compression_record.DecodeFrom(&fragment);
+        if (!s.ok()) {
+          ReportCorruption(fragment.size(),
+                           "could not decode SetCompressionType record");
+        } else {
+          compression_type_ = compression_record.GetCompressionType();
+        }
+        break;
+      }
+
       default: {
         char buf[40];
         snprintf(buf, sizeof(buf), "unknown record type %u",
diff --git a/db/log_reader.h b/db/log_reader.h
index 1ddc4abdd7..cd2d795c85 100644
--- a/db/log_reader.h
+++ b/db/log_reader.h
@@ -8,14 +8,16 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #pragma once
-#include <memory>
 #include <stdint.h>
 
+#include <memory>
+
 #include "db/log_format.h"
 #include "file/sequence_file_reader.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/status.h"
+#include "util/compression.h"
 
 namespace ROCKSDB_NAMESPACE {
 class Logger;
@@ -128,6 +130,13 @@ class Reader {
   // Whether this is a recycled log file
   bool recycled_;
 
+  // Whether the first record has been read or not.
+  bool first_record_read_;
+  // Type of compression used
+  CompressionType compression_type_;
+  // Track whether the compression type record has been read or not.
+  bool compression_type_record_read_;
+
   // Extend record types with the following special values
   enum {
     kEof = kMaxRecordType + 1,
diff --git a/db/log_test.cc b/db/log_test.cc
index 2e993d8f90..3723347c80 100644
--- a/db/log_test.cc
+++ b/db/log_test.cc
@@ -47,7 +47,8 @@ static std::string RandomSkewedString(int i, Random* rnd) {
 // Param type is tuple<int, bool>
 // get<0>(tuple): non-zero if recycling log, zero if regular log
 // get<1>(tuple): true if allow retry after read EOF, false otherwise
-class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
+class LogTest
+    : public ::testing::TestWithParam<std::tuple<int, bool, CompressionType>> {
  private:
   class StringSource : public FSSequentialFile {
    public:
@@ -143,23 +144,27 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
   test::StringSink* sink_;
   StringSource* source_;
   ReportCollector report_;
-  std::unique_ptr<Writer> writer_;
-  std::unique_ptr<Reader> reader_;
 
  protected:
+  std::unique_ptr<Writer> writer_;
+  std::unique_ptr<Reader> reader_;
   bool allow_retry_read_;
+  CompressionType compression_type_;
 
  public:
   LogTest()
       : reader_contents_(),
         sink_(new test::StringSink(&reader_contents_)),
         source_(new StringSource(reader_contents_, !std::get<1>(GetParam()))),
-        allow_retry_read_(std::get<1>(GetParam())) {
+        allow_retry_read_(std::get<1>(GetParam())),
+        compression_type_(std::get<2>(GetParam())) {
     std::unique_ptr<FSWritableFile> sink_holder(sink_);
     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
         std::move(sink_holder), "" /* don't care */, FileOptions()));
-    writer_.reset(
-        new Writer(std::move(file_writer), 123, std::get<0>(GetParam())));
+    Writer* writer =
+        new Writer(std::move(file_writer), 123, std::get<0>(GetParam()), false,
+                   compression_type_);
+    writer_.reset(writer);
     std::unique_ptr<FSSequentialFile> source_holder(source_);
     std::unique_ptr<SequentialFileReader> file_reader(
         new SequentialFileReader(std::move(source_holder), "" /* file name */));
@@ -676,11 +681,11 @@ TEST_P(LogTest, Recycle) {
   ASSERT_EQ("EOF", Read());
 }
 
-INSTANTIATE_TEST_CASE_P(bool, LogTest,
-                        ::testing::Values(std::make_tuple(0, false),
-                                          std::make_tuple(0, true),
-                                          std::make_tuple(1, false),
-                                          std::make_tuple(1, true)));
+// Do NOT enable compression for this instantiation.
+INSTANTIATE_TEST_CASE_P(
+    Log, LogTest,
+    ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
+                       ::testing::Values(CompressionType::kNoCompression)));
 
 class RetriableLogTest : public ::testing::TestWithParam<int> {
  private:
@@ -892,6 +897,27 @@ TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
 
 INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
 
+class CompressionLogTest : public LogTest {
+ public:
+  Status SetupTestEnv() { return writer_->AddCompressionTypeRecord(); }
+};
+
+TEST_P(CompressionLogTest, Empty) {
+  ASSERT_OK(SetupTestEnv());
+  const bool compression_enabled =
+      std::get<2>(GetParam()) == kNoCompression ? false : true;
+  // If WAL compression is enabled, a record is added for the compression type
+  const int compression_record_size = compression_enabled ? kHeaderSize + 4 : 0;
+  ASSERT_EQ(compression_record_size, WrittenBytes());
+  ASSERT_EQ("EOF", Read());
+}
+
+INSTANTIATE_TEST_CASE_P(
+    Compression, CompressionLogTest,
+    ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
+                       ::testing::Values(CompressionType::kNoCompression,
+                                         CompressionType::kZSTD)));
+
 }  // namespace log
 }  // namespace ROCKSDB_NAMESPACE
 
diff --git a/db/log_writer.cc b/db/log_writer.cc
index e2e596596a..410c634cc4 100644
--- a/db/log_writer.cc
+++ b/db/log_writer.cc
@@ -19,12 +19,14 @@ namespace ROCKSDB_NAMESPACE {
 namespace log {
 
 Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
-               bool recycle_log_files, bool manual_flush)
+               bool recycle_log_files, bool manual_flush,
+               CompressionType compression_type)
     : dest_(std::move(dest)),
       block_offset_(0),
       log_number_(log_number),
       recycle_log_files_(recycle_log_files),
-      manual_flush_(manual_flush) {
+      manual_flush_(manual_flush),
+      compression_type_(compression_type) {
   for (int i = 0; i <= kMaxRecordType; i++) {
     char t = static_cast<char>(i);
     type_crc_[i] = crc32c::Value(&t, 1);
@@ -112,6 +114,31 @@ IOStatus Writer::AddRecord(const Slice& slice) {
   return s;
 }
 
+IOStatus Writer::AddCompressionTypeRecord() {
+  // Should be the first record
+  assert(block_offset_ == 0);
+
+  if (compression_type_ == kNoCompression) {
+    // No need to add a record
+    return IOStatus::OK();
+  }
+
+  CompressionTypeRecord record(compression_type_);
+  std::string encode;
+  record.EncodeTo(&encode);
+  IOStatus s =
+      EmitPhysicalRecord(kSetCompressionType, encode.data(), encode.size());
+  if (s.ok()) {
+    if (!manual_flush_) {
+      s = dest_->Flush();
+    }
+  } else {
+    // Disable compression if the record could not be added.
+    compression_type_ = kNoCompression;
+  }
+  return s;
+}
+
 bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); }
 
 IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
@@ -126,7 +153,7 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
   buf[6] = static_cast<char>(t);
 
   uint32_t crc = type_crc_[t];
-  if (t < kRecyclableFullType) {
+  if (t < kRecyclableFullType || t == kSetCompressionType) {
     // Legacy record format
     assert(block_offset_ + kHeaderSize + n <= kBlockSize);
     header_size = kHeaderSize;
diff --git a/db/log_writer.h b/db/log_writer.h
index 1a91b21994..8f60049db3 100644
--- a/db/log_writer.h
+++ b/db/log_writer.h
@@ -12,6 +12,7 @@
 #include <memory>
 
 #include "db/log_format.h"
+#include "rocksdb/compression_type.h"
 #include "rocksdb/io_status.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/status.h"
@@ -72,7 +73,8 @@ class Writer {
   // "*dest" must remain live while this Writer is in use.
   explicit Writer(std::unique_ptr<WritableFileWriter>&& dest,
                   uint64_t log_number, bool recycle_log_files,
-                  bool manual_flush = false);
+                  bool manual_flush = false,
+                  CompressionType compressionType = kNoCompression);
   // No copying allowed
   Writer(const Writer&) = delete;
   void operator=(const Writer&) = delete;
@@ -80,6 +82,7 @@ class Writer {
   ~Writer();
 
   IOStatus AddRecord(const Slice& slice);
+  IOStatus AddCompressionTypeRecord();
 
   WritableFileWriter* file() { return dest_.get(); }
   const WritableFileWriter* file() const { return dest_.get(); }
@@ -108,6 +111,9 @@ class Writer {
   // If true, it does not flush after each write. Instead it relies on the upper
   // layer to manually does the flush by calling ::WriteBuffer()
   bool manual_flush_;
+
+  // Compression Type
+  CompressionType compression_type_;
 };
 
 }  // namespace log
diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h
index cbbf4cfd0e..bc47ba4108 100644
--- a/include/rocksdb/options.h
+++ b/include/rocksdb/options.h
@@ -1182,7 +1182,8 @@ struct DBOptions {
 
   // This feature is WORK IN PROGRESS
   // If enabled WAL records will be compressed before they are written.
-  // Only zstd is supported.
+  // Only zstd is supported. Compressed WAL records will be read in supported
+  // versions regardless of the wal_compression settings.
   CompressionType wal_compression = kNoCompression;
 
   // If true, RocksDB supports flushing multiple column families and committing
diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc
index b4df6a4323..4d5cd5b05e 100644
--- a/tools/db_bench_tool.cc
+++ b/tools/db_bench_tool.cc
@@ -763,7 +763,7 @@ DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
 DEFINE_bool(manual_wal_flush, false,
             "If true, buffer WAL until buffer is full or a manual FlushWAL().");
 
-DEFINE_string(wal_compression, "string",
+DEFINE_string(wal_compression, "none",
               "Algorithm to use for WAL compression. none to disable.");
 static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_wal_compression_e =
     ROCKSDB_NAMESPACE::kNoCompression;
diff --git a/util/compression.h b/util/compression.h
index a58b2b575d..8f21807e63 100644
--- a/util/compression.h
+++ b/util/compression.h
@@ -514,6 +514,26 @@ inline bool ZSTDNotFinal_Supported() {
 #endif
 }
 
+inline bool ZSTD_Streaming_Supported() {
+#ifdef ZSTD
+  return ZSTD_versionNumber() >= 10300;
+#else
+  return false;
+#endif
+}
+
+inline bool StreamingCompressionTypeSupported(
+    CompressionType compression_type) {
+  switch (compression_type) {
+    case kNoCompression:
+      return true;
+    case kZSTD:
+      return ZSTD_Streaming_Supported();
+    default:
+      return false;
+  }
+}
+
 inline bool CompressionTypeSupported(CompressionType compression_type) {
   switch (compression_type) {
     case kNoCompression:
@@ -1535,4 +1555,42 @@ inline CacheAllocationPtr UncompressData(
   }
 }
 
+// Records the compression type for subsequent WAL records.
+class CompressionTypeRecord {
+ public:
+  explicit CompressionTypeRecord(CompressionType compression_type)
+      : compression_type_(compression_type) {}
+
+  CompressionType GetCompressionType() const { return compression_type_; }
+
+  inline void EncodeTo(std::string* dst) const {
+    assert(dst != nullptr);
+    PutFixed32(dst, compression_type_);
+  }
+
+  inline Status DecodeFrom(Slice* src) {
+    constexpr char class_name[] = "CompressionTypeRecord";
+
+    uint32_t val;
+    if (!GetFixed32(src, &val)) {
+      return Status::Corruption(class_name,
+                                "Error decoding WAL compression type");
+    }
+    CompressionType compression_type = static_cast<CompressionType>(val);
+    if (!StreamingCompressionTypeSupported(compression_type)) {
+      return Status::Corruption(class_name,
+                                "WAL compression type not supported");
+    }
+    compression_type_ = compression_type;
+    return Status::OK();
+  }
+
+  inline std::string DebugString() const {
+    return "compression_type: " + CompressionTypeToString(compression_type_);
+  }
+
+ private:
+  CompressionType compression_type_;
+};
+
 }  // namespace ROCKSDB_NAMESPACE