Fix Compaction Stats for Remote Compaction and Tiered Storage (#13464)

Summary:
## Background

Compaction statistics are collected at various levels across different classes and structs.

* `InternalStats::CompactionStats`: Per-level Compaction Stats within a job (can be at subcompaction level which later get aggregated to the compaction level)
* `InternalStats::CompactionStatsFull`: Contains two per-level compaction stats - `output_level_stats` for primary output level stats and `proximal_level_stats` for proximal level stats. Proximal level statistics are only relevant when using Tiered Storage with the per-key placement feature enabled.
* `InternalStats::CompactionOutputsStats`: Simplified version of `InternalStats::CompactionStats`. Only has a subset of fields from `InternalStats::CompactionStats`
* `CompactionJobStats`: Job-level Compaction Stats. (can be at subcompaction level which later get aggregated to the compaction level)

Please note that some fields in Job-level stats are not in Per-level stats and they don't map 1-to-1 today.

## Issues

* In non-remote compactions, proximal level compaction statistics were not being aggregated into job-level statistics. Job level statistics were missing stats for proximal level for tiered storage compactions with per-key-replacement feature enabled.
* During remote compactions, proximal level compaction statistics were pre-aggregated into job-level statistics on the remote side. However, per-level compaction statistics were not part of the serialized compaction result, so that primary host lost that information and weren't able to populate `per_key_placement_comp_stats_` and `internal_stats_.proximal_level_stats` properly during the installation.
* `TieredCompactionTest` was only checking if (expected stats > 0 && actual stats > 0) instead actual value comparison

## Fixes

* Renamed `compaction_stats_` to `internal_stats_` for `InternalStats::CompactionStatsFull` in `CompactionJob` for better readability
* Removed the usage of `InternalStats::CompactionOutputsStats` and consolidated them to `InternalStats::CompactionStats`.
* Remote Compactions now include the internal stats in the serialized `CompactionServiceResult`. `output_level_stats` and `proximal_level_stats` get later propagated in sub_compact output stats accordingly.
* `CompactionJob::UpdateCompactionJobStats()` now takes `CompactionStatsFull` and aggregates the `proximal_level_stats` as well
* `TieredCompactionTest` is now doing the actual value comparisons for input/output file counts and record counts. Follow up is needed to do the same for the bytes read / written.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13464

Test Plan:
Unit Tests updated to verify stats

```
./compaction_service_test
```
```
./tiered_compaction_test
```

Reviewed By: pdillinger

Differential Revision: D71220393

Pulled By: jaykorean

fbshipit-source-id: ad70bffd9614ced683f90c7570a17def9b5c8f3f
This commit is contained in:
Jay Huh 2025-03-18 16:28:18 -07:00 committed by Facebook GitHub Bot
parent 17ac19f2c4
commit cc487ba367
13 changed files with 783 additions and 361 deletions

View File

@ -147,7 +147,7 @@ CompactionJob::CompactionJob(
BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
int* bg_bottom_compaction_scheduled)
: compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1),
internal_stats_(compaction->compaction_reason(), 1),
db_options_(db_options),
mutable_db_options_copy_(mutable_db_options),
log_buffer_(log_buffer),
@ -155,7 +155,7 @@ CompactionJob::CompactionJob(
stats_(stats),
bottommost_level_(false),
write_hint_(Env::WLTH_NOT_SET),
compaction_job_stats_(compaction_job_stats),
job_stats_(compaction_job_stats),
job_id_(job_id),
dbname_(dbname),
db_id_(db_id),
@ -191,7 +191,7 @@ CompactionJob::CompactionJob(
extra_num_subcompaction_threads_reserved_(0),
bg_compaction_scheduled_(bg_compaction_scheduled),
bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) {
assert(compaction_job_stats_ != nullptr);
assert(job_stats_ != nullptr);
assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data();
@ -240,9 +240,8 @@ void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
// to ensure GetThreadList() can always show them all together.
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
compaction_job_stats_->is_manual_compaction =
compaction->is_manual_compaction();
compaction_job_stats_->is_full_compaction = compaction->is_full_compaction();
job_stats_->is_manual_compaction = compaction->is_manual_compaction();
job_stats_->is_full_compaction = compaction->is_full_compaction();
}
void CompactionJob::Prepare(
@ -695,17 +694,17 @@ Status CompactionJob::Run() {
thread.join();
}
compaction_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros);
internal_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros);
for (auto& state : compact_->sub_compact_states) {
compaction_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros);
internal_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros);
state.RemoveLastEmptyOutput();
}
RecordTimeToHistogram(stats_, COMPACTION_TIME,
compaction_stats_.stats.micros);
internal_stats_.output_level_stats.micros);
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
compaction_stats_.stats.cpu_micros);
internal_stats_.output_level_stats.cpu_micros);
TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
@ -855,46 +854,54 @@ Status CompactionJob::Run() {
// compaction_service is set. We now know whether each sub_compaction was
// done remotely or not. Reset is_remote_compaction back to false and allow
// AggregateCompactionStats() to set the right value.
compaction_job_stats_->is_remote_compaction = false;
job_stats_->is_remote_compaction = false;
// Finish up all bookkeeping to unify the subcompaction results.
compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
uint64_t num_input_range_del = 0;
bool ok = UpdateCompactionStats(&num_input_range_del);
// (Sub)compactions returned ok, do sanity check on the number of input keys.
if (status.ok() && ok && compaction_job_stats_->has_num_input_records) {
size_t ts_sz = compact_->compaction->column_family_data()
->user_comparator()
->timestamp_size();
// When trim_ts_ is non-empty, CompactionIterator takes
// HistoryTrimmingIterator as input iterator and sees a trimmed view of
// input keys. So the number of keys it processed is not suitable for
// verification here.
// TODO: support verification when trim_ts_ is non-empty.
if (!(ts_sz > 0 && !trim_ts_.empty())) {
assert(compaction_stats_.stats.num_input_records > 0);
// TODO: verify the number of range deletion entries.
uint64_t expected =
compaction_stats_.stats.num_input_records - num_input_range_del;
uint64_t actual = compaction_job_stats_->num_input_records;
if (expected != actual) {
char scratch[2345];
compact_->compaction->Summary(scratch, sizeof(scratch));
std::string msg =
"Compaction number of input keys does not match "
"number of keys processed. Expected " +
std::to_string(expected) + " but processed " +
std::to_string(actual) + ". Compaction summary: " + scratch;
ROCKS_LOG_WARN(
db_options_.info_log, "[%s] [JOB %d] Compaction with status: %s",
compact_->compaction->column_family_data()->GetName().c_str(),
job_context_->job_id, msg.c_str());
if (db_options_.compaction_verify_record_count) {
status = Status::Corruption(msg);
compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
// For remote compactions, internal_stats_.output_level_stats were part of the
// compaction_result already. No need to re-update it.
if (job_stats_->is_remote_compaction == false) {
uint64_t num_input_range_del = 0;
bool ok = UpdateOutputLevelCompactionStats(&num_input_range_del);
// (Sub)compactions returned ok, do sanity check on the number of input
// keys.
if (status.ok() && ok && job_stats_->has_num_input_records) {
size_t ts_sz = compact_->compaction->column_family_data()
->user_comparator()
->timestamp_size();
// When trim_ts_ is non-empty, CompactionIterator takes
// HistoryTrimmingIterator as input iterator and sees a trimmed view of
// input keys. So the number of keys it processed is not suitable for
// verification here.
// TODO: support verification when trim_ts_ is non-empty.
if (!(ts_sz > 0 && !trim_ts_.empty())) {
assert(internal_stats_.output_level_stats.num_input_records > 0);
// TODO: verify the number of range deletion entries.
uint64_t expected =
internal_stats_.output_level_stats.num_input_records -
num_input_range_del;
uint64_t actual = job_stats_->num_input_records;
if (expected != actual) {
char scratch[2345];
compact_->compaction->Summary(scratch, sizeof(scratch));
std::string msg =
"Compaction number of input keys does not match "
"number of keys processed. Expected " +
std::to_string(expected) + " but processed " +
std::to_string(actual) + ". Compaction summary: " + scratch;
ROCKS_LOG_WARN(
db_options_.info_log, "[%s] [JOB %d] Compaction with status: %s",
compact_->compaction->column_family_data()->GetName().c_str(),
job_context_->job_id, msg.c_str());
if (db_options_.compaction_verify_record_count) {
status = Status::Corruption(msg);
}
}
}
}
}
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run():End");
@ -916,7 +923,7 @@ Status CompactionJob::Install(bool* compaction_released) {
int output_level = compact_->compaction->output_level();
cfd->internal_stats()->AddCompactionStats(output_level, thread_pri_,
compaction_stats_);
internal_stats_);
if (status.ok()) {
status = InstallCompactionResults(compaction_released);
@ -927,7 +934,7 @@ Status CompactionJob::Install(bool* compaction_released) {
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
const auto& stats = compaction_stats_.stats;
const auto& stats = internal_stats_.output_level_stats;
double read_write_amp = 0.0;
double write_amp = 0.0;
@ -993,19 +1000,21 @@ Status CompactionJob::Install(bool* compaction_released) {
blob_files.back()->GetBlobFileNumber());
}
if (compaction_stats_.has_proximal_level_output) {
if (internal_stats_.has_proximal_level_output) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] has Proximal Level output: %" PRIu64
", level %d, number of files: %" PRIu64
", number of records: %" PRIu64,
column_family_name.c_str(),
compaction_stats_.proximal_level_stats.bytes_written,
internal_stats_.proximal_level_stats.bytes_written,
compact_->compaction->GetProximalLevel(),
compaction_stats_.proximal_level_stats.num_output_files,
compaction_stats_.proximal_level_stats.num_output_records);
internal_stats_.proximal_level_stats.num_output_files,
internal_stats_.proximal_level_stats.num_output_records);
}
UpdateCompactionJobStats(stats);
UpdateCompactionJobStats(internal_stats_);
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Install:AfterUpdateCompactionJobStats", job_stats_);
auto stream = event_logger_->LogToBuffer(log_buffer_, 8192);
stream << "job" << job_id_ << "event" << "compaction_finished"
@ -1027,17 +1036,16 @@ Status CompactionJob::Install(bool* compaction_released) {
<< CompressionTypeToString(compact_->compaction->output_compression());
stream << "num_single_delete_mismatches"
<< compaction_job_stats_->num_single_del_mismatch;
<< job_stats_->num_single_del_mismatch;
stream << "num_single_delete_fallthrough"
<< compaction_job_stats_->num_single_del_fallthru;
<< job_stats_->num_single_del_fallthru;
if (measure_io_stats_) {
stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
stream << "file_range_sync_nanos"
<< compaction_job_stats_->file_range_sync_nanos;
stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
stream << "file_write_nanos" << job_stats_->file_write_nanos;
stream << "file_range_sync_nanos" << job_stats_->file_range_sync_nanos;
stream << "file_fsync_nanos" << job_stats_->file_fsync_nanos;
stream << "file_prepare_write_nanos"
<< compaction_job_stats_->file_prepare_write_nanos;
<< job_stats_->file_prepare_write_nanos;
}
stream << "lsm_state";
@ -1055,9 +1063,9 @@ Status CompactionJob::Install(bool* compaction_released) {
stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
}
if (compaction_stats_.has_proximal_level_output) {
if (internal_stats_.has_proximal_level_output) {
InternalStats::CompactionStats& pl_stats =
compaction_stats_.proximal_level_stats;
internal_stats_.proximal_level_stats;
stream << "proximal_level_num_output_files" << pl_stats.num_output_files;
stream << "proximal_level_bytes_written" << pl_stats.bytes_written;
stream << "proximal_level_num_output_records"
@ -1812,22 +1820,22 @@ Status CompactionJob::InstallCompactionResults(bool* compaction_released) {
{
Compaction::InputLevelSummaryBuffer inputs_summary;
if (compaction_stats_.has_proximal_level_output) {
if (internal_stats_.has_proximal_level_output) {
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] [JOB %d] Compacted %s => output_to_proximal_level: %" PRIu64
" bytes + last: %" PRIu64 " bytes. Total: %" PRIu64 " bytes",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction->InputLevelSummary(&inputs_summary),
compaction_stats_.proximal_level_stats.bytes_written,
compaction_stats_.stats.bytes_written,
compaction_stats_.TotalBytesWritten());
internal_stats_.proximal_level_stats.bytes_written,
internal_stats_.output_level_stats.bytes_written,
internal_stats_.TotalBytesWritten());
} else {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
compaction->column_family_data()->GetName().c_str(),
job_id_, compaction->InputLevelSummary(&inputs_summary),
compaction_stats_.TotalBytesWritten());
internal_stats_.TotalBytesWritten());
}
}
@ -2087,12 +2095,13 @@ void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
}
} // namespace
bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) {
bool CompactionJob::UpdateOutputLevelCompactionStats(
uint64_t* num_input_range_del) {
assert(compact_);
Compaction* compaction = compact_->compaction;
compaction_stats_.stats.num_input_files_in_non_output_levels = 0;
compaction_stats_.stats.num_input_files_in_output_level = 0;
internal_stats_.output_level_stats.num_input_files_in_non_output_levels = 0;
internal_stats_.output_level_stats.num_input_files_in_output_level = 0;
bool has_error = false;
const ReadOptions read_options(Env::IOActivity::kCompaction);
@ -2104,13 +2113,14 @@ bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) {
size_t num_input_files = flevel->num_files;
uint64_t* bytes_read;
if (compaction->level(input_level) != compaction->output_level()) {
compaction_stats_.stats.num_input_files_in_non_output_levels +=
internal_stats_.output_level_stats.num_input_files_in_non_output_levels +=
static_cast<int>(num_input_files);
bytes_read = &compaction_stats_.stats.bytes_read_non_output_levels;
bytes_read =
&internal_stats_.output_level_stats.bytes_read_non_output_levels;
} else {
compaction_stats_.stats.num_input_files_in_output_level +=
internal_stats_.output_level_stats.num_input_files_in_output_level +=
static_cast<int>(num_input_files);
bytes_read = &compaction_stats_.stats.bytes_read_output_level;
bytes_read = &internal_stats_.output_level_stats.bytes_read_output_level;
}
for (size_t i = 0; i < num_input_files; ++i) {
const FileMetaData* file_meta = flevel->files[i].file_metadata;
@ -2130,7 +2140,8 @@ bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) {
has_error = true;
}
}
compaction_stats_.stats.num_input_records += file_input_entries;
internal_stats_.output_level_stats.num_input_records +=
file_input_entries;
if (num_input_range_del) {
*num_input_range_del += file_num_range_del;
}
@ -2141,62 +2152,116 @@ bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) {
size_t num_filtered_input_files = filtered_flevel.size();
uint64_t* bytes_skipped;
if (compaction->level(input_level) != compaction->output_level()) {
compaction_stats_.stats.num_filtered_input_files_in_non_output_levels +=
internal_stats_.output_level_stats
.num_filtered_input_files_in_non_output_levels +=
static_cast<int>(num_filtered_input_files);
bytes_skipped = &compaction_stats_.stats.bytes_skipped_non_output_levels;
bytes_skipped =
&internal_stats_.output_level_stats.bytes_skipped_non_output_levels;
} else {
compaction_stats_.stats.num_filtered_input_files_in_output_level +=
internal_stats_.output_level_stats
.num_filtered_input_files_in_output_level +=
static_cast<int>(num_filtered_input_files);
bytes_skipped = &compaction_stats_.stats.bytes_skipped_output_level;
bytes_skipped =
&internal_stats_.output_level_stats.bytes_skipped_output_level;
}
for (const FileMetaData* filtered_file_meta : filtered_flevel) {
*bytes_skipped += filtered_file_meta->fd.GetFileSize();
}
}
assert(compaction_job_stats_);
compaction_stats_.stats.bytes_read_blob =
compaction_job_stats_->total_blob_bytes_read;
assert(job_stats_);
internal_stats_.output_level_stats.bytes_read_blob =
job_stats_->total_blob_bytes_read;
compaction_stats_.stats.num_dropped_records =
compaction_stats_.DroppedRecords();
internal_stats_.output_level_stats.num_dropped_records =
internal_stats_.DroppedRecords();
return !has_error;
}
void CompactionJob::UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const {
compaction_job_stats_->elapsed_micros = stats.micros;
const InternalStats::CompactionStatsFull& internal_stats) const {
assert(job_stats_);
job_stats_->elapsed_micros = internal_stats.output_level_stats.micros;
job_stats_->cpu_micros = internal_stats.output_level_stats.cpu_micros;
// input information
compaction_job_stats_->total_input_bytes =
stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
compaction_job_stats_->num_input_records = stats.num_input_records;
compaction_job_stats_->num_input_files =
stats.num_input_files_in_non_output_levels +
stats.num_input_files_in_output_level;
compaction_job_stats_->num_input_files_at_output_level =
stats.num_input_files_in_output_level;
compaction_job_stats_->num_filtered_input_files =
stats.num_filtered_input_files_in_non_output_levels +
stats.num_filtered_input_files_in_output_level;
compaction_job_stats_->num_filtered_input_files_at_output_level =
stats.num_filtered_input_files_in_output_level;
compaction_job_stats_->total_skipped_input_bytes =
stats.bytes_skipped_non_output_levels + stats.bytes_skipped_output_level;
job_stats_->total_input_bytes =
internal_stats.output_level_stats.bytes_read_non_output_levels +
internal_stats.output_level_stats.bytes_read_output_level;
job_stats_->num_input_records =
internal_stats.output_level_stats.num_input_records;
job_stats_->num_input_files =
internal_stats.output_level_stats.num_input_files_in_non_output_levels +
internal_stats.output_level_stats.num_input_files_in_output_level;
job_stats_->num_input_files_at_output_level =
internal_stats.output_level_stats.num_input_files_in_output_level;
job_stats_->num_filtered_input_files =
internal_stats.output_level_stats
.num_filtered_input_files_in_non_output_levels +
internal_stats.output_level_stats
.num_filtered_input_files_in_output_level;
job_stats_->num_filtered_input_files_at_output_level =
internal_stats.output_level_stats
.num_filtered_input_files_in_output_level;
job_stats_->total_skipped_input_bytes =
internal_stats.output_level_stats.bytes_skipped_non_output_levels +
internal_stats.output_level_stats.bytes_skipped_output_level;
// output information
compaction_job_stats_->total_output_bytes = stats.bytes_written;
compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob;
compaction_job_stats_->num_output_records = stats.num_output_records;
compaction_job_stats_->num_output_files = stats.num_output_files;
compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob;
job_stats_->total_output_bytes =
internal_stats.output_level_stats.bytes_written;
job_stats_->total_output_bytes_blob =
internal_stats.output_level_stats.bytes_written_blob;
job_stats_->num_output_records =
internal_stats.output_level_stats.num_output_records;
job_stats_->num_output_files =
internal_stats.output_level_stats.num_output_files;
job_stats_->num_output_files_blob =
internal_stats.output_level_stats.num_output_files_blob;
if (stats.num_output_files > 0) {
// If proximal level output exists
if (internal_stats.has_proximal_level_output) {
job_stats_->total_input_bytes +=
internal_stats.proximal_level_stats.bytes_read_non_output_levels +
internal_stats.proximal_level_stats.bytes_read_output_level;
job_stats_->num_input_records +=
internal_stats.proximal_level_stats.num_input_records;
job_stats_->num_input_files +=
internal_stats.proximal_level_stats
.num_input_files_in_non_output_levels +
internal_stats.proximal_level_stats.num_input_files_in_output_level;
job_stats_->num_input_files_at_output_level +=
internal_stats.proximal_level_stats.num_input_files_in_output_level;
job_stats_->num_filtered_input_files +=
internal_stats.proximal_level_stats
.num_filtered_input_files_in_non_output_levels +
internal_stats.proximal_level_stats
.num_filtered_input_files_in_output_level;
job_stats_->num_filtered_input_files_at_output_level +=
internal_stats.proximal_level_stats
.num_filtered_input_files_in_output_level;
job_stats_->total_skipped_input_bytes +=
internal_stats.proximal_level_stats.bytes_skipped_non_output_levels +
internal_stats.proximal_level_stats.bytes_skipped_output_level;
job_stats_->total_output_bytes +=
internal_stats.proximal_level_stats.bytes_written;
job_stats_->total_output_bytes_blob +=
internal_stats.proximal_level_stats.bytes_written_blob;
job_stats_->num_output_records +=
internal_stats.proximal_level_stats.num_output_records;
job_stats_->num_output_files +=
internal_stats.proximal_level_stats.num_output_files;
job_stats_->num_output_files_blob +=
internal_stats.proximal_level_stats.num_output_files_blob;
}
if (job_stats_->num_output_files > 0) {
CopyPrefix(compact_->SmallestUserKey(),
CompactionJobStats::kMaxPrefixLength,
&compaction_job_stats_->smallest_output_key_prefix);
&job_stats_->smallest_output_key_prefix);
CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
&compaction_job_stats_->largest_output_key_prefix);
&job_stats_->largest_output_key_prefix);
}
}

View File

@ -67,7 +67,7 @@ class SubcompactionState;
// if needed.
//
// CompactionJob has 2 main stats:
// 1. CompactionJobStats compaction_job_stats_
// 1. CompactionJobStats job_stats_
// CompactionJobStats is a public data structure which is part of Compaction
// event listener that rocksdb share the job stats with the user.
// Internally it's an aggregation of all the compaction_job_stats from each
@ -81,7 +81,7 @@ class SubcompactionState;
// +------------------------+ |
// | CompactionJob | | +------------------------+
// | | | | SubcompactionState |
// | compaction_job_stats +-----+ | |
// | job_stats +-----+ | |
// | | +--------->| compaction_job_stats |
// | | | | |
// +------------------------+ | +------------------------+
@ -98,16 +98,13 @@ class SubcompactionState;
// +--------->+ |
// +------------------------+
//
// 2. CompactionStatsFull compaction_stats_
// 2. CompactionStatsFull internal_stats_
// `CompactionStatsFull` is an internal stats about the compaction, which
// is eventually sent to `ColumnFamilyData::internal_stats_` and used for
// logging and public metrics.
// Internally, it's an aggregation of stats_ from each `SubcompactionState`.
// It has 2 parts, normal stats about the main compaction information and
// the proximal level output stats.
// `SubcompactionState` maintains the CompactionOutputs for ordinary level
// output and the proximal level output if exists, the per_level stats is
// stored with the outputs.
// It has 2 parts, ordinary output level stats and the proximal level output
// stats.
// +---------------------------+
// | SubcompactionState |
// | |
@ -121,9 +118,9 @@ class SubcompactionState;
// +--------------------------------+ | | | CompactionOutputs | |
// | CompactionJob | | | | (proximal_level) | |
// | | +--------->| stats_ | |
// | compaction_stats_ | | | | +----------------------+ |
// | internal_stats_ | | | | +----------------------+ |
// | +-------------------------+ | | | | |
// | |stats (normal) |------|----+ +---------------------------+
// | |output_level_stats |------|----+ +---------------------------+
// | +-------------------------+ | | |
// | | | |
// | +-------------------------+ | | | +---------------------------+
@ -199,7 +196,7 @@ class CompactionJob {
IOStatus io_status() const { return io_status_; }
protected:
// Update the following stats in compaction_stats_.stats
// Update the following stats in internal_stats_.output_level_stats
// - num_input_files_in_non_output_levels
// - num_input_files_in_output_level
// - bytes_read_non_output_levels
@ -211,11 +208,12 @@ class CompactionJob {
// @param num_input_range_del if non-null, will be set to the number of range
// deletion entries in this compaction input.
//
// Returns true iff compaction_stats_.stats.num_input_records and
// Returns true iff internal_stats_.output_level_stats.num_input_records and
// num_input_range_del are calculated successfully.
bool UpdateCompactionStats(uint64_t* num_input_range_del = nullptr);
virtual void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const;
bool UpdateOutputLevelCompactionStats(
uint64_t* num_input_range_del = nullptr);
void UpdateCompactionJobStats(
const InternalStats::CompactionStatsFull& internal_stats) const;
void LogCompaction();
virtual void RecordCompactionIOStats();
void CleanupCompaction();
@ -224,7 +222,7 @@ class CompactionJob {
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
CompactionState* compact_;
InternalStats::CompactionStatsFull compaction_stats_;
InternalStats::CompactionStatsFull internal_stats_;
const ImmutableDBOptions& db_options_;
const MutableDBOptions mutable_db_options_copy_;
LogBuffer* log_buffer_;
@ -237,7 +235,7 @@ class CompactionJob {
IOStatus io_status_;
CompactionJobStats* compaction_job_stats_;
CompactionJobStats* job_stats_;
private:
friend class CompactionJobTestBase;
@ -475,8 +473,21 @@ struct CompactionServiceResult {
uint64_t bytes_read = 0;
uint64_t bytes_written = 0;
// Job-level Compaction Stats.
//
// NOTE: Job level stats cannot be rebuilt from scratch by simply aggregating
// per-level stats due to some fields populated directly during compaction
// (e.g. RecordDroppedKeys()). This is why we need both job-level stats and
// per-level in the serialized result. If rebuilding job-level stats from
// per-level stats become possible in the future, consider deprecating this
// field.
CompactionJobStats stats;
// Per-level Compaction Stats for both output_level_stats and
// proximal_level_stats
InternalStats::CompactionStatsFull internal_stats;
// serialization interface to read and write the object
static Status Read(const std::string& data_str, CompactionServiceResult* obj);
Status Write(std::string* output);
@ -522,9 +533,6 @@ class CompactionServiceCompactionJob : private CompactionJob {
protected:
void RecordCompactionIOStats() override;
void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const override;
private:
// Get table file name in output_path
std::string GetTableFileName(uint64_t file_number) override;

View File

@ -54,7 +54,7 @@ Status CompactionOutputs::Finish(
}
current_output().finished = true;
stats_.bytes_written += current_bytes;
stats_.num_output_files = outputs_.size();
stats_.num_output_files = static_cast<int>(outputs_.size());
return s;
}

View File

@ -66,11 +66,6 @@ class CompactionOutputs {
file_writer_.reset(writer);
}
// TODO: Remove it when remote compaction support tiered compaction
void AddBytesWritten(uint64_t bytes) { stats_.bytes_written += bytes; }
void SetNumOutputRecords(uint64_t num) { stats_.num_output_records = num; }
void SetNumOutputFiles(uint64_t num) { stats_.num_output_files = num; }
// TODO: Move the BlobDB builder into CompactionOutputs
const std::vector<BlobFileAddition>& GetBlobFileAdditions() const {
if (is_proximal_level_) {
@ -103,7 +98,8 @@ class CompactionOutputs {
void UpdateBlobStats() {
assert(!is_proximal_level_);
stats_.num_output_files_blob = blob_file_additions_.size();
stats_.num_output_files_blob =
static_cast<int>(blob_file_additions_.size());
for (const auto& blob : blob_file_additions_) {
stats_.bytes_written_blob += blob.GetTotalBlobBytes();
}
@ -307,8 +303,8 @@ class CompactionOutputs {
std::vector<BlobFileAddition> blob_file_additions_;
std::unique_ptr<BlobGarbageMeter> blob_garbage_meter_;
// Basic compaction output stats for this level's outputs
InternalStats::CompactionOutputsStats stats_;
// Per level's output stat
InternalStats::CompactionStats stats_;
// indicate if this CompactionOutputs obj for proximal_level, should always
// be false if per_key_placement feature is not enabled.

View File

@ -249,12 +249,24 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
false, true, file.paranoid_hash);
compaction_outputs->UpdateTableProperties(file.table_properties);
}
// Set per-level stats
auto compaction_output_stats =
sub_compact->OutputStats(false /* is_proximal_level */);
assert(compaction_output_stats);
compaction_output_stats->Add(
compaction_result.internal_stats.output_level_stats);
if (compaction->SupportsPerKeyPlacement()) {
compaction_output_stats =
sub_compact->OutputStats(true /* is_proximal_level */);
assert(compaction_output_stats);
compaction_output_stats->Add(
compaction_result.internal_stats.proximal_level_stats);
}
// Set job stats
sub_compact->compaction_job_stats = compaction_result.stats;
sub_compact->Current().SetNumOutputRecords(
compaction_result.stats.num_output_records);
sub_compact->Current().SetNumOutputFiles(
compaction_result.stats.num_output_files);
sub_compact->Current().AddBytesWritten(compaction_result.bytes_written);
RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
compaction_result.bytes_written);
@ -274,16 +286,6 @@ void CompactionServiceCompactionJob::RecordCompactionIOStats() {
CompactionJob::RecordCompactionIOStats();
}
void CompactionServiceCompactionJob::UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const {
// output information only in remote compaction
compaction_job_stats_->total_output_bytes += stats.bytes_written;
compaction_job_stats_->total_output_bytes_blob += stats.bytes_written_blob;
compaction_job_stats_->num_output_records += stats.num_output_records;
compaction_job_stats_->num_output_files += stats.num_output_files;
compaction_job_stats_->num_output_files_blob += stats.num_output_files_blob;
}
CompactionServiceCompactionJob::CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
@ -345,15 +347,14 @@ Status CompactionServiceCompactionJob::Run() {
ProcessKeyValueCompaction(sub_compact);
compaction_job_stats_->elapsed_micros =
db_options_.clock->NowMicros() - start_micros;
compaction_job_stats_->cpu_micros =
sub_compact->compaction_job_stats.cpu_micros;
uint64_t elapsed_micros = db_options_.clock->NowMicros() - start_micros;
internal_stats_.SetMicros(elapsed_micros);
internal_stats_.AddCpuMicros(elapsed_micros);
RecordTimeToHistogram(stats_, COMPACTION_TIME,
compaction_job_stats_->elapsed_micros);
internal_stats_.output_level_stats.micros);
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
compaction_job_stats_->cpu_micros);
internal_stats_.output_level_stats.cpu_micros);
Status status = sub_compact->status;
IOStatus io_s = sub_compact->io_status;
@ -383,28 +384,44 @@ Status CompactionServiceCompactionJob::Run() {
// Build Compaction Job Stats
// 1. Aggregate CompactionOutputStats into Internal Compaction Stats
// (compaction_stats_) and aggregate Compaction Job Stats
// (compaction_job_stats_) from the sub compactions
compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
// 1. Aggregate internal stats and job stats for all subcompactions
// internal stats: sub_compact.proximal_level_outputs_.stats and
// sub_compact.compaction_outputs_.stats into
// internal_stats_.output_level_stats and
// internal_stats_.proximal_level_stats
// job-level stats: sub_compact.compaction_job_stats into compact.job_stats_
//
// For remote compaction, there's only one subcompaction.
compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
// 2. Update the Output information in the Compaction Job Stats with
// aggregated Internal Compaction Stats.
UpdateCompactionJobStats(compaction_stats_.stats);
if (compaction_stats_.has_proximal_level_output) {
UpdateCompactionJobStats(compaction_stats_.proximal_level_stats);
// 2. Update the following stats in internal_stats_.output_level_stats
// - num_input_files_in_non_output_levels
// - num_input_files_in_output_level
// - bytes_read_non_output_levels
// - bytes_read_output_level
// - num_input_records
// - bytes_read_blob
// - num_dropped_records
uint64_t num_input_range_del = 0;
const bool ok = UpdateOutputLevelCompactionStats(&num_input_range_del);
if (status.ok() && ok && job_stats_->has_num_input_records) {
// TODO(jaykorean) - verify record count
assert(job_stats_->num_input_records > 0);
}
// 3. Set fields that are not propagated as part of aggregations above
// 3. Update job-level stats with the aggregated internal_stats_
UpdateCompactionJobStats(internal_stats_);
// and set fields that are not propagated as part of the update
compaction_result_->stats.is_manual_compaction = c->is_manual_compaction();
compaction_result_->stats.is_full_compaction = c->is_full_compaction();
compaction_result_->stats.is_remote_compaction = true;
// 4. Update IO Stats that are not part of the aggregations above (bytes_read,
// bytes_written)
// 4. Update IO Stats that are not part of the the update above
// (bytes_read, bytes_written)
RecordCompactionIOStats();
// Build Output
compaction_result_->internal_stats = internal_stats_;
compaction_result_->output_level = compact_->compaction->output_level();
compaction_result_->output_path = output_path_;
if (status.ok()) {
@ -724,6 +741,125 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo>
compaction_stats_type_info = {
{"micros",
{offsetof(struct InternalStats::CompactionStats, micros),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"cpu_micros",
{offsetof(struct InternalStats::CompactionStats, cpu_micros),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_read_non_output_levels",
{offsetof(struct InternalStats::CompactionStats,
bytes_read_non_output_levels),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_read_output_level",
{offsetof(struct InternalStats::CompactionStats,
bytes_read_output_level),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_skipped_non_output_levels",
{offsetof(struct InternalStats::CompactionStats,
bytes_skipped_non_output_levels),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_skipped_output_level",
{offsetof(struct InternalStats::CompactionStats,
bytes_skipped_output_level),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_read_blob",
{offsetof(struct InternalStats::CompactionStats, bytes_read_blob),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_written",
{offsetof(struct InternalStats::CompactionStats, bytes_written),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_written_blob",
{offsetof(struct InternalStats::CompactionStats, bytes_written_blob),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_moved",
{offsetof(struct InternalStats::CompactionStats, bytes_moved),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files_in_non_output_levels",
{offsetof(struct InternalStats::CompactionStats,
num_input_files_in_non_output_levels),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files_in_output_level",
{offsetof(struct InternalStats::CompactionStats,
num_input_files_in_output_level),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_filtered_input_files_in_non_output_levels",
{offsetof(struct InternalStats::CompactionStats,
num_filtered_input_files_in_non_output_levels),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_filtered_input_files_in_output_level",
{offsetof(struct InternalStats::CompactionStats,
num_filtered_input_files_in_output_level),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files",
{offsetof(struct InternalStats::CompactionStats, num_output_files),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files_blob",
{offsetof(struct InternalStats::CompactionStats,
num_output_files_blob),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_records",
{offsetof(struct InternalStats::CompactionStats, num_input_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_dropped_records",
{offsetof(struct InternalStats::CompactionStats, num_dropped_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_records",
{offsetof(struct InternalStats::CompactionStats, num_output_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"count",
{offsetof(struct InternalStats::CompactionStats, count),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"counts", OptionTypeInfo::Array<
int, static_cast<int>(CompactionReason::kNumOfReasons)>(
offsetof(struct InternalStats::CompactionStats, counts),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kInt})},
};
static std::unordered_map<std::string, OptionTypeInfo>
compaction_internal_stats_type_info = {
{"output_level_stats",
OptionTypeInfo::Struct(
"output_level_stats", &compaction_stats_type_info,
offsetof(struct InternalStats::CompactionStatsFull,
output_level_stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
{"has_proximal_level_output",
{offsetof(struct InternalStats::CompactionStatsFull,
has_proximal_level_output),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"proximal_level_stats",
OptionTypeInfo::Struct(
"proximal_level_stats", &compaction_stats_type_info,
offsetof(struct InternalStats::CompactionStatsFull,
proximal_level_stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
};
namespace {
// this is a helper struct to serialize and deserialize class Status, because
// Status's members are not public.
@ -830,6 +966,11 @@ static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
"stats", &compaction_job_stats_type_info,
offsetof(struct CompactionServiceResult, stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
{"internal_stats",
OptionTypeInfo::Struct(
"internal_stats", &compaction_internal_stats_type_info,
offsetof(struct CompactionServiceResult, internal_stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
};
Status CompactionServiceInput::Read(const std::string& data_str,

View File

@ -357,11 +357,12 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
} else {
ASSERT_OK(result.status);
}
ASSERT_GE(result.stats.elapsed_micros, 1);
ASSERT_GE(result.stats.cpu_micros, 1);
ASSERT_GE(result.internal_stats.output_level_stats.micros, 1);
ASSERT_GE(result.internal_stats.output_level_stats.cpu_micros, 1);
ASSERT_EQ(20, result.stats.num_output_records);
ASSERT_EQ(result.output_files.size(), result.stats.num_output_files);
ASSERT_EQ(20, result.internal_stats.output_level_stats.num_output_records);
ASSERT_EQ(result.output_files.size(),
result.internal_stats.output_level_stats.num_output_files);
uint64_t total_size = 0;
for (auto output_file : result.output_files) {
@ -372,7 +373,7 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
ASSERT_GT(file_size, 0);
total_size += file_size;
}
ASSERT_EQ(total_size, result.stats.total_output_bytes);
ASSERT_EQ(total_size, result.internal_stats.TotalBytesWritten());
ASSERT_TRUE(result.stats.is_remote_compaction);
ASSERT_TRUE(result.stats.is_manual_compaction);
@ -1212,10 +1213,14 @@ TEST_F(CompactionServiceTest, PrecludeLastLevel) {
CompactionServiceResult result;
my_cs->GetResult(&result);
ASSERT_OK(result.status);
ASSERT_GT(result.stats.cpu_micros, 0);
ASSERT_GT(result.stats.elapsed_micros, 0);
ASSERT_EQ(result.stats.num_output_records, kNumTrigger * kNumKeys);
ASSERT_EQ(result.stats.num_output_files, 2);
ASSERT_GT(result.internal_stats.output_level_stats.cpu_micros, 0);
ASSERT_GT(result.internal_stats.output_level_stats.micros, 0);
ASSERT_EQ(result.internal_stats.output_level_stats.num_output_records +
result.internal_stats.proximal_level_stats.num_output_records,
kNumTrigger * kNumKeys);
ASSERT_EQ(result.internal_stats.output_level_stats.num_output_files +
result.internal_stats.proximal_level_stats.num_output_files,
2);
}
TEST_F(CompactionServiceTest, ConcurrentCompaction) {

View File

@ -36,11 +36,11 @@ Slice CompactionState::LargestUserKey() {
}
void CompactionState::AggregateCompactionStats(
InternalStats::CompactionStatsFull& compaction_stats,
CompactionJobStats& compaction_job_stats) {
InternalStats::CompactionStatsFull& internal_stats,
CompactionJobStats& job_stats) {
for (const auto& sc : sub_compact_states) {
sc.AggregateCompactionOutputStats(compaction_stats);
compaction_job_stats.Add(sc.compaction_job_stats);
sc.AggregateCompactionOutputStats(internal_stats);
job_stats.Add(sc.compaction_job_stats);
}
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -29,8 +29,8 @@ class CompactionState {
Status status;
void AggregateCompactionStats(
InternalStats::CompactionStatsFull& compaction_stats,
CompactionJobStats& compaction_job_stats);
InternalStats::CompactionStatsFull& internal_stats,
CompactionJobStats& job_stats);
explicit CompactionState(Compaction* c) : compaction(c) {}

View File

@ -14,7 +14,7 @@
namespace ROCKSDB_NAMESPACE {
void SubcompactionState::AggregateCompactionOutputStats(
InternalStats::CompactionStatsFull& compaction_stats) const {
InternalStats::CompactionStatsFull& internal_stats) const {
// Outputs should be closed. By extension, any files created just for
// range deletes have already been written also.
assert(compaction_outputs_.HasBuilder() == false);
@ -26,10 +26,10 @@ void SubcompactionState::AggregateCompactionOutputStats(
// assert(proximal_level_outputs_.stats_.num_output_files ==
// proximal_level_outputs_.outputs_.size());
compaction_stats.stats.Add(compaction_outputs_.stats_);
internal_stats.output_level_stats.Add(compaction_outputs_.stats_);
if (proximal_level_outputs_.HasOutput()) {
compaction_stats.has_proximal_level_output = true;
compaction_stats.proximal_level_stats.Add(proximal_level_outputs_.stats_);
internal_stats.has_proximal_level_output = true;
internal_stats.proximal_level_stats.Add(proximal_level_outputs_.stats_);
}
}

View File

@ -161,7 +161,7 @@ class SubcompactionState {
void Cleanup(Cache* cache);
void AggregateCompactionOutputStats(
InternalStats::CompactionStatsFull& compaction_stats) const;
InternalStats::CompactionStatsFull& internal_stats) const;
CompactionOutputs& Current() const {
assert(current_outputs_);
@ -177,6 +177,16 @@ class SubcompactionState {
return &compaction_outputs_;
}
// Per-level stats for the output
InternalStats::CompactionStats* OutputStats(bool is_proximal_level) {
assert(compaction);
if (is_proximal_level) {
assert(compaction->SupportsPerKeyPlacement());
return &proximal_level_outputs_.stats_;
}
return &compaction_outputs_.stats_;
}
CompactionRangeDelAggregator* RangeDelAgg() const {
return range_del_agg_.get();
}

View File

@ -33,42 +33,13 @@ ConfigOptions GetStrictConfigOptions() {
class TieredCompactionTest : public DBTestBase {
public:
TieredCompactionTest()
: DBTestBase("tiered_compaction_test", /*env_do_fsync=*/true),
kBasicCompStats(CompactionReason::kUniversalSizeAmplification, 1),
kBasicPerKeyPlacementCompStats(
CompactionReason::kUniversalSizeAmplification, 1),
kBasicFlushStats(CompactionReason::kFlush, 1) {
kBasicCompStats.micros = kHasValue;
kBasicCompStats.cpu_micros = kHasValue;
kBasicCompStats.bytes_read_non_output_levels = kHasValue;
kBasicCompStats.num_input_files_in_non_output_levels = kHasValue;
kBasicCompStats.num_input_records = kHasValue;
kBasicCompStats.num_dropped_records = kHasValue;
kBasicPerLevelStats.num_output_records = kHasValue;
kBasicPerLevelStats.bytes_written = kHasValue;
kBasicPerLevelStats.num_output_files = kHasValue;
kBasicPerKeyPlacementCompStats.micros = kHasValue;
kBasicPerKeyPlacementCompStats.cpu_micros = kHasValue;
kBasicPerKeyPlacementCompStats.Add(kBasicPerLevelStats);
kBasicFlushStats.micros = kHasValue;
kBasicFlushStats.cpu_micros = kHasValue;
kBasicFlushStats.bytes_written = kHasValue;
kBasicFlushStats.num_output_files = kHasValue;
}
: DBTestBase("tiered_compaction_test", /*env_do_fsync=*/true) {}
protected:
static constexpr uint8_t kHasValue = 1;
InternalStats::CompactionStats kBasicCompStats;
InternalStats::CompactionStats kBasicPerKeyPlacementCompStats;
InternalStats::CompactionOutputsStats kBasicPerLevelStats;
InternalStats::CompactionStats kBasicFlushStats;
std::atomic_bool enable_per_key_placement = true;
CompactionJobStats job_stats;
void SetUp() override {
SyncPoint::GetInstance()->SetCallBack(
"Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
@ -108,21 +79,35 @@ class TieredCompactionTest : public DBTestBase {
// Verify the compaction stats, the stats are roughly compared
void VerifyCompactionStats(
const std::vector<InternalStats::CompactionStats>& expect_stats,
const InternalStats::CompactionStats& expect_pl_stats) {
const std::vector<InternalStats::CompactionStats>& expected_stats,
const InternalStats::CompactionStats& expected_pl_stats,
size_t output_level) {
const std::vector<InternalStats::CompactionStats>& stats =
GetCompactionStats();
const size_t kLevels = expect_stats.size();
const size_t kLevels = expected_stats.size();
ASSERT_EQ(kLevels, stats.size());
ASSERT_TRUE(output_level < kLevels);
for (auto it = stats.begin(), expect = expect_stats.begin();
it != stats.end(); it++, expect++) {
VerifyCompactionStats(*it, *expect);
for (size_t level = 0; level < kLevels; level++) {
VerifyCompactionStats(stats[level], expected_stats[level]);
}
const InternalStats::CompactionStats& pl_stats =
GetPerKeyPlacementCompactionStats();
VerifyCompactionStats(pl_stats, expect_pl_stats);
VerifyCompactionStats(pl_stats, expected_pl_stats);
const auto& output_level_stats = stats[output_level];
CompactionJobStats expected_job_stats;
expected_job_stats.cpu_micros = output_level_stats.cpu_micros;
expected_job_stats.num_input_files =
output_level_stats.num_input_files_in_output_level +
output_level_stats.num_input_files_in_non_output_levels;
expected_job_stats.num_input_records = output_level_stats.num_input_records;
expected_job_stats.num_output_files =
output_level_stats.num_output_files + pl_stats.num_output_files;
expected_job_stats.num_output_records =
output_level_stats.num_output_records + pl_stats.num_output_records;
VerifyCompactionJobStats(job_stats, expected_job_stats);
}
void ResetAllStats(std::vector<InternalStats::CompactionStats>& stats,
@ -139,42 +124,52 @@ class TieredCompactionTest : public DBTestBase {
}
private:
void CompareStats(uint64_t val, uint64_t expect) {
if (expect > 0) {
ASSERT_TRUE(val > 0);
} else {
ASSERT_EQ(val, 0);
}
}
void VerifyCompactionStats(
const InternalStats::CompactionStats& stats,
const InternalStats::CompactionStats& expect_stats) {
CompareStats(stats.micros, expect_stats.micros);
CompareStats(stats.cpu_micros, expect_stats.cpu_micros);
CompareStats(stats.bytes_read_non_output_levels,
expect_stats.bytes_read_non_output_levels);
CompareStats(stats.bytes_read_output_level,
expect_stats.bytes_read_output_level);
CompareStats(stats.bytes_read_blob, expect_stats.bytes_read_blob);
CompareStats(stats.bytes_written, expect_stats.bytes_written);
CompareStats(stats.bytes_moved, expect_stats.bytes_moved);
CompareStats(stats.num_input_files_in_non_output_levels,
expect_stats.num_input_files_in_non_output_levels);
CompareStats(stats.num_input_files_in_output_level,
expect_stats.num_input_files_in_output_level);
CompareStats(stats.num_output_files, expect_stats.num_output_files);
CompareStats(stats.num_output_files_blob,
expect_stats.num_output_files_blob);
CompareStats(stats.num_input_records, expect_stats.num_input_records);
CompareStats(stats.num_dropped_records, expect_stats.num_dropped_records);
CompareStats(stats.num_output_records, expect_stats.num_output_records);
ASSERT_EQ(stats.micros > 0, expect_stats.micros > 0);
ASSERT_EQ(stats.cpu_micros > 0, expect_stats.cpu_micros > 0);
// Hard to get consistent byte sizes of SST files.
// Use ASSERT_NEAR for comparison
ASSERT_NEAR(stats.bytes_read_non_output_levels * 1.0f,
expect_stats.bytes_read_non_output_levels * 1.0f,
stats.bytes_read_non_output_levels * 0.5f);
ASSERT_NEAR(stats.bytes_read_output_level * 1.0f,
expect_stats.bytes_read_output_level * 1.0f,
stats.bytes_read_output_level * 0.5f);
ASSERT_NEAR(stats.bytes_read_blob * 1.0f,
expect_stats.bytes_read_blob * 1.0f,
stats.bytes_read_blob * 0.5f);
ASSERT_NEAR(stats.bytes_written * 1.0f, expect_stats.bytes_written * 1.0f,
stats.bytes_written * 0.5f);
ASSERT_EQ(stats.bytes_moved, expect_stats.bytes_moved);
ASSERT_EQ(stats.num_input_files_in_non_output_levels,
expect_stats.num_input_files_in_non_output_levels);
ASSERT_EQ(stats.num_input_files_in_output_level,
expect_stats.num_input_files_in_output_level);
ASSERT_EQ(stats.num_output_files, expect_stats.num_output_files);
ASSERT_EQ(stats.num_output_files_blob, expect_stats.num_output_files_blob);
ASSERT_EQ(stats.num_input_records, expect_stats.num_input_records);
ASSERT_EQ(stats.num_dropped_records, expect_stats.num_dropped_records);
ASSERT_EQ(stats.num_output_records, expect_stats.num_output_records);
ASSERT_EQ(stats.count, expect_stats.count);
for (int i = 0; i < static_cast<int>(CompactionReason::kNumOfReasons);
i++) {
ASSERT_EQ(stats.counts[i], expect_stats.counts[i]);
}
}
void VerifyCompactionJobStats(const CompactionJobStats& stats,
const CompactionJobStats& expected_stats) {
ASSERT_EQ(stats.cpu_micros, expected_stats.cpu_micros);
ASSERT_EQ(stats.num_input_files, expected_stats.num_input_files);
ASSERT_EQ(stats.num_input_records, expected_stats.num_input_records);
ASSERT_EQ(job_stats.num_output_files, expected_stats.num_output_files);
ASSERT_EQ(job_stats.num_output_records, expected_stats.num_output_records);
}
};
TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
@ -199,52 +194,135 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
[&](void* arg) {
*static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
});
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Install:AfterUpdateCompactionJobStats", [&](void* arg) {
job_stats.Reset();
job_stats.Add(*(static_cast<CompactionJobStats*>(arg)));
});
SyncPoint::GetInstance()->EnableProcessing();
std::vector<InternalStats::CompactionStats> expect_stats(kNumLevels);
InternalStats::CompactionStats& last_stats = expect_stats[kLastLevel];
InternalStats::CompactionStats expect_pl_stats;
// Put keys in the following way to create overlaps
// First file from 0 ~ 99
// Second file from 10 ~ 109
// ...
size_t bytes_per_file = 1952;
int total_input_key_count = kNumTrigger * kNumKeys;
int total_output_key_count = 130; // 0 ~ 129
for (int i = 0; i < kNumTrigger; i++) {
for (int j = 0; j < kNumKeys; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i)));
}
ASSERT_OK(Flush());
seq_history.emplace_back(dbfull()->GetLatestSequenceNumber());
expect_stats[0].Add(kBasicFlushStats);
InternalStats::CompactionStats flush_stats(CompactionReason::kFlush, 1);
flush_stats.cpu_micros = 1;
flush_stats.micros = 1;
flush_stats.bytes_written = bytes_per_file;
flush_stats.num_output_files = 1;
expect_stats[0].Add(flush_stats);
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// the proximal level file temperature is not cold, all data are output to
// the proximal level.
// the penultimate level file temperature is not cold, all data are output to
// the penultimate level.
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
// basic compaction stats are still counted to the last level
expect_stats[kLastLevel].Add(kBasicCompStats);
expect_pl_stats.Add(kBasicPerKeyPlacementCompStats);
uint64_t bytes_written_penultimate_level =
GetPerKeyPlacementCompactionStats().bytes_written;
VerifyCompactionStats(expect_stats, expect_pl_stats);
// TODO - Use designated initializer when c++20 support is required
{
InternalStats::CompactionStats last_level_compaction_stats(
CompactionReason::kUniversalSizeAmplification, 1);
last_level_compaction_stats.cpu_micros = 1;
last_level_compaction_stats.micros = 1;
last_level_compaction_stats.bytes_written = 0;
last_level_compaction_stats.bytes_read_non_output_levels =
bytes_per_file * kNumTrigger;
last_level_compaction_stats.num_input_files_in_non_output_levels =
kNumTrigger;
last_level_compaction_stats.num_input_records = total_input_key_count;
last_level_compaction_stats.num_dropped_records =
total_input_key_count - total_output_key_count;
last_level_compaction_stats.num_output_records = 0;
last_level_compaction_stats.num_output_files = 0;
expect_stats[kLastLevel].Add(last_level_compaction_stats);
}
{
InternalStats::CompactionStats penultimate_level_compaction_stats(
CompactionReason::kUniversalSizeAmplification, 1);
penultimate_level_compaction_stats.cpu_micros = 1;
penultimate_level_compaction_stats.micros = 1;
penultimate_level_compaction_stats.bytes_written =
bytes_written_penultimate_level;
penultimate_level_compaction_stats.num_output_files = 1;
penultimate_level_compaction_stats.num_output_records =
total_output_key_count;
expect_pl_stats.Add(penultimate_level_compaction_stats);
}
VerifyCompactionStats(expect_stats, expect_pl_stats, kLastLevel);
ResetAllStats(expect_stats, expect_pl_stats);
// move forward the cold_seq to split the file into 2 levels, so should have
// both the last level stats and the output_to_proximal_level stats
// both the last level stats and the penultimate level stats
latest_cold_seq = seq_history[0];
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
last_stats.Add(kBasicCompStats);
last_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
last_stats.Add(kBasicPerLevelStats);
last_stats.num_dropped_records = 0;
expect_pl_stats.Add(kBasicPerKeyPlacementCompStats);
expect_pl_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
VerifyCompactionStats(expect_stats, expect_pl_stats);
// Now update the input count to be the total count from the previous
total_input_key_count = total_output_key_count;
int moved_to_last_level_key_count = 10;
// bytes read in non output = bytes written in penultimate level from previous
uint64_t bytes_read_in_non_output_level = bytes_written_penultimate_level;
uint64_t bytes_written_output_level =
GetCompactionStats()[kLastLevel].bytes_written;
// Now get the new bytes written in penultimate level
bytes_written_penultimate_level =
GetPerKeyPlacementCompactionStats().bytes_written;
{
InternalStats::CompactionStats last_level_compaction_stats(
CompactionReason::kManualCompaction, 1);
last_level_compaction_stats.cpu_micros = 1;
last_level_compaction_stats.micros = 1;
last_level_compaction_stats.bytes_written = bytes_written_output_level;
last_level_compaction_stats.bytes_read_non_output_levels =
bytes_read_in_non_output_level;
last_level_compaction_stats.num_input_files_in_non_output_levels = 1;
last_level_compaction_stats.num_input_records = total_input_key_count;
last_level_compaction_stats.num_dropped_records =
total_input_key_count - total_output_key_count;
last_level_compaction_stats.num_output_records =
moved_to_last_level_key_count;
last_level_compaction_stats.num_output_files = 1;
expect_stats[kLastLevel].Add(last_level_compaction_stats);
}
{
InternalStats::CompactionStats penultimate_level_compaction_stats(
CompactionReason::kManualCompaction, 1);
penultimate_level_compaction_stats.cpu_micros = 1;
penultimate_level_compaction_stats.micros = 1;
penultimate_level_compaction_stats.bytes_written =
bytes_written_penultimate_level;
penultimate_level_compaction_stats.num_output_files = 1;
penultimate_level_compaction_stats.num_output_records =
total_output_key_count - moved_to_last_level_key_count;
expect_pl_stats.Add(penultimate_level_compaction_stats);
}
VerifyCompactionStats(expect_stats, expect_pl_stats, kLastLevel);
// delete all cold data, so all data will be on proximal level
for (int i = 0; i < 10; i++) {
@ -255,17 +333,54 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
ResetAllStats(expect_stats, expect_pl_stats);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
last_stats.Add(kBasicCompStats);
last_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
last_stats.bytes_read_output_level = kHasValue;
last_stats.num_input_files_in_output_level = kHasValue;
expect_pl_stats.Add(kBasicPerKeyPlacementCompStats);
expect_pl_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
VerifyCompactionStats(expect_stats, expect_pl_stats);
// 10 tombstones added
total_input_key_count = total_input_key_count + 10;
total_output_key_count = total_output_key_count - 10;
auto last_level_stats = GetCompactionStats()[kLastLevel];
bytes_written_penultimate_level =
GetPerKeyPlacementCompactionStats().bytes_written;
ASSERT_LT(bytes_written_penultimate_level,
last_level_stats.bytes_read_non_output_levels +
last_level_stats.bytes_read_output_level);
{
InternalStats::CompactionStats last_level_compaction_stats(
CompactionReason::kManualCompaction, 1);
last_level_compaction_stats.cpu_micros = 1;
last_level_compaction_stats.micros = 1;
last_level_compaction_stats.bytes_written = 0;
last_level_compaction_stats.bytes_read_non_output_levels =
last_level_stats.bytes_read_non_output_levels;
last_level_compaction_stats.bytes_read_output_level =
last_level_stats.bytes_read_output_level;
last_level_compaction_stats.num_input_files_in_non_output_levels = 2;
last_level_compaction_stats.num_input_files_in_output_level = 1;
last_level_compaction_stats.num_input_records = total_input_key_count;
last_level_compaction_stats.num_dropped_records =
total_input_key_count - total_output_key_count;
last_level_compaction_stats.num_output_records = 0;
last_level_compaction_stats.num_output_files = 0;
expect_stats[kLastLevel].Add(last_level_compaction_stats);
}
{
InternalStats::CompactionStats penultimate_level_compaction_stats(
CompactionReason::kManualCompaction, 1);
penultimate_level_compaction_stats.cpu_micros = 1;
penultimate_level_compaction_stats.micros = 1;
penultimate_level_compaction_stats.bytes_written =
bytes_written_penultimate_level;
penultimate_level_compaction_stats.num_output_files = 1;
penultimate_level_compaction_stats.num_output_records =
total_output_key_count;
expect_pl_stats.Add(penultimate_level_compaction_stats);
}
VerifyCompactionStats(expect_stats, expect_pl_stats, kLastLevel);
// move forward the cold_seq again with range delete, take a snapshot to keep
// the range dels in both cold and hot SSTs
@ -283,12 +398,47 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
last_stats.Add(kBasicCompStats);
last_stats.Add(kBasicPerLevelStats);
last_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
expect_pl_stats.Add(kBasicPerKeyPlacementCompStats);
expect_pl_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
VerifyCompactionStats(expect_stats, expect_pl_stats);
// Previous output + one delete range
total_input_key_count = total_output_key_count + 1;
moved_to_last_level_key_count = 20;
last_level_stats = GetCompactionStats()[kLastLevel];
bytes_written_penultimate_level =
GetPerKeyPlacementCompactionStats().bytes_written;
// Expected to write more in last level
ASSERT_GT(bytes_written_penultimate_level, last_level_stats.bytes_written);
{
InternalStats::CompactionStats last_level_compaction_stats(
CompactionReason::kManualCompaction, 1);
last_level_compaction_stats.cpu_micros = 1;
last_level_compaction_stats.micros = 1;
last_level_compaction_stats.bytes_written = last_level_stats.bytes_written;
last_level_compaction_stats.bytes_read_non_output_levels =
last_level_stats.bytes_read_non_output_levels;
last_level_compaction_stats.bytes_read_output_level = 0;
last_level_compaction_stats.num_input_files_in_non_output_levels = 2;
last_level_compaction_stats.num_input_files_in_output_level = 0;
last_level_compaction_stats.num_input_records = total_input_key_count;
last_level_compaction_stats.num_dropped_records =
1; // delete range tombstone
last_level_compaction_stats.num_output_records =
moved_to_last_level_key_count;
last_level_compaction_stats.num_output_files = 1;
expect_stats[kLastLevel].Add(last_level_compaction_stats);
}
{
InternalStats::CompactionStats penultimate_level_compaction_stats(
CompactionReason::kManualCompaction, 1);
penultimate_level_compaction_stats.cpu_micros = 1;
penultimate_level_compaction_stats.micros = 1;
penultimate_level_compaction_stats.bytes_written =
bytes_written_penultimate_level;
penultimate_level_compaction_stats.num_output_files = 1;
penultimate_level_compaction_stats.num_output_records =
total_input_key_count - moved_to_last_level_key_count - 1;
expect_pl_stats.Add(penultimate_level_compaction_stats);
}
VerifyCompactionStats(expect_stats, expect_pl_stats, kLastLevel);
// verify data
std::string value;
@ -341,11 +491,11 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
// This test was essentially for a hacked-up version on future functionality.
// It can be resurrected if/when a form of range-based tiering is properly
// implemented.
// TODO - Add stats verification when adding this test back
TEST_F(TieredCompactionTest, DISABLED_RangeBasedTieredStorageUniversal) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
const int kLastLevel = kNumLevels - 1;
auto options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
@ -371,7 +521,6 @@ TEST_F(TieredCompactionTest, DISABLED_RangeBasedTieredStorageUniversal) {
SyncPoint::GetInstance()->EnableProcessing();
std::vector<InternalStats::CompactionStats> expect_stats(kNumLevels);
InternalStats::CompactionStats& last_stats = expect_stats[kLastLevel];
InternalStats::CompactionStats expect_pl_stats;
for (int i = 0; i < kNumTrigger; i++) {
@ -379,18 +528,12 @@ TEST_F(TieredCompactionTest, DISABLED_RangeBasedTieredStorageUniversal) {
ASSERT_OK(Put(Key(j), "value" + std::to_string(j)));
}
ASSERT_OK(Flush());
expect_stats[0].Add(kBasicFlushStats);
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
last_stats.Add(kBasicCompStats);
last_stats.Add(kBasicPerLevelStats);
expect_pl_stats.Add(kBasicPerKeyPlacementCompStats);
VerifyCompactionStats(expect_stats, expect_pl_stats);
ResetAllStats(expect_stats, expect_pl_stats);
// change to all cold, no output_to_proximal_level output
@ -404,14 +547,6 @@ TEST_F(TieredCompactionTest, DISABLED_RangeBasedTieredStorageUniversal) {
ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
last_stats.Add(kBasicCompStats);
last_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
last_stats.Add(kBasicPerLevelStats);
last_stats.num_dropped_records = 0;
last_stats.bytes_read_output_level = kHasValue;
last_stats.num_input_files_in_output_level = kHasValue;
VerifyCompactionStats(expect_stats, expect_pl_stats);
// change to all hot, universal compaction support moving data to up level if
// it's within compaction level range.
{
@ -890,6 +1025,8 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
const int kNumKeys = 100;
const int kLastLevel = kNumLevels - 1;
int output_level = 0;
auto options = CurrentOptions();
SetColdTemperature(options);
options.level0_file_num_compaction_trigger = kNumTrigger;
@ -906,18 +1043,40 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
[&](void* arg) {
*static_cast<SequenceNumber*>(arg) = latest_cold_seq.load();
});
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Install:AfterUpdateCompactionJobStats", [&](void* arg) {
job_stats.Reset();
job_stats.Add(*(static_cast<CompactionJobStats*>(arg)));
});
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
auto compaction = static_cast<Compaction*>(arg);
output_level = compaction->output_level();
});
SyncPoint::GetInstance()->EnableProcessing();
std::vector<InternalStats::CompactionStats> expect_stats(kNumLevels);
InternalStats::CompactionStats& last_stats = expect_stats[kLastLevel];
InternalStats::CompactionStats expect_pl_stats;
// Put keys in the following way to create overlaps
// First file from 0 ~ 99
// Second file from 10 ~ 109
// ...
size_t bytes_per_file = 1952;
int total_input_key_count = kNumTrigger * kNumKeys;
int total_output_key_count = 130; // 0 ~ 129
for (int i = 0; i < kNumTrigger; i++) {
for (int j = 0; j < kNumKeys; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i)));
}
ASSERT_OK(Flush());
expect_stats[0].Add(kBasicFlushStats);
InternalStats::CompactionStats flush_stats(CompactionReason::kFlush, 1);
flush_stats.cpu_micros = 1;
flush_stats.micros = 1;
flush_stats.bytes_written = bytes_per_file;
flush_stats.num_output_files = 1;
expect_stats[0].Add(flush_stats);
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
@ -926,10 +1085,30 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
expect_stats[1].Add(kBasicCompStats);
expect_stats[1].Add(kBasicPerLevelStats);
expect_stats[1].ResetCompactionReason(CompactionReason::kLevelL0FilesNum);
VerifyCompactionStats(expect_stats, expect_pl_stats);
uint64_t bytes_written_output_level =
GetCompactionStats()[output_level].bytes_written;
ASSERT_GT(bytes_written_output_level, 0);
{
InternalStats::CompactionStats output_level_compaction_stats(
CompactionReason::kLevelL0FilesNum, 1);
output_level_compaction_stats.cpu_micros = 1;
output_level_compaction_stats.micros = 1;
output_level_compaction_stats.bytes_written = bytes_written_output_level;
output_level_compaction_stats.bytes_read_non_output_levels =
bytes_per_file * kNumTrigger;
output_level_compaction_stats.bytes_read_output_level = 0;
output_level_compaction_stats.num_input_files_in_non_output_levels =
kNumTrigger;
output_level_compaction_stats.num_input_files_in_output_level = 0;
output_level_compaction_stats.num_input_records = total_input_key_count;
output_level_compaction_stats.num_dropped_records =
total_input_key_count - total_output_key_count;
output_level_compaction_stats.num_output_records = total_output_key_count;
output_level_compaction_stats.num_output_files = 1;
expect_stats[output_level].Add(output_level_compaction_stats);
}
VerifyCompactionStats(expect_stats, expect_pl_stats, output_level);
// move all data to the last level
MoveFilesToLevel(kLastLevel);
@ -944,15 +1123,26 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
last_stats.Add(kBasicCompStats);
last_stats.Add(kBasicPerLevelStats);
last_stats.num_dropped_records = 0;
last_stats.bytes_read_non_output_levels = 0;
last_stats.num_input_files_in_non_output_levels = 0;
last_stats.bytes_read_output_level = kHasValue;
last_stats.num_input_files_in_output_level = kHasValue;
last_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
VerifyCompactionStats(expect_stats, expect_pl_stats);
total_input_key_count = total_output_key_count;
{
InternalStats::CompactionStats output_level_compaction_stats(
CompactionReason::kManualCompaction, 1);
output_level_compaction_stats.cpu_micros = 1;
output_level_compaction_stats.micros = 1;
output_level_compaction_stats.bytes_written = bytes_written_output_level;
output_level_compaction_stats.bytes_read_non_output_levels = 0;
output_level_compaction_stats.bytes_read_output_level =
bytes_written_output_level;
output_level_compaction_stats.num_input_files_in_non_output_levels = 0;
output_level_compaction_stats.num_input_files_in_output_level = 1;
output_level_compaction_stats.num_input_records = total_input_key_count;
output_level_compaction_stats.num_dropped_records =
total_input_key_count - total_output_key_count;
output_level_compaction_stats.num_output_records = total_output_key_count;
output_level_compaction_stats.num_output_files = 1;
expect_stats[output_level].Add(output_level_compaction_stats);
}
VerifyCompactionStats(expect_stats, expect_pl_stats, output_level);
// Add new data, which is all hot and overriding all existing data
latest_cold_seq = dbfull()->GetLatestSequenceNumber();
@ -976,17 +1166,47 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
uint64_t bytes_written_in_proximal_level =
GetPerKeyPlacementCompactionStats().bytes_written;
for (int level = 2; level < kNumLevels - 1; level++) {
expect_stats[level].bytes_moved = kHasValue;
expect_stats[level].bytes_moved = bytes_written_in_proximal_level;
}
last_stats.Add(kBasicCompStats);
last_stats.bytes_read_output_level = kHasValue;
last_stats.num_input_files_in_output_level = kHasValue;
last_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
expect_pl_stats.Add(kBasicPerKeyPlacementCompStats);
expect_pl_stats.ResetCompactionReason(CompactionReason::kManualCompaction);
VerifyCompactionStats(expect_stats, expect_pl_stats);
// Another set of 130 keys + from the previous
total_input_key_count = total_output_key_count + 130;
// Merged into 130
total_output_key_count = 130;
{
InternalStats::CompactionStats output_level_compaction_stats(
CompactionReason::kManualCompaction, 1);
output_level_compaction_stats.cpu_micros = 1;
output_level_compaction_stats.micros = 1;
output_level_compaction_stats.bytes_written = 0;
output_level_compaction_stats.bytes_read_non_output_levels =
bytes_written_in_proximal_level;
output_level_compaction_stats.bytes_read_output_level =
bytes_written_output_level;
output_level_compaction_stats.num_input_files_in_non_output_levels = 1;
output_level_compaction_stats.num_input_files_in_output_level = 1;
output_level_compaction_stats.num_input_records = total_input_key_count;
output_level_compaction_stats.num_dropped_records =
total_input_key_count - total_output_key_count;
output_level_compaction_stats.num_output_records = 0;
output_level_compaction_stats.num_output_files = 0;
expect_stats[output_level].Add(output_level_compaction_stats);
}
{
InternalStats::CompactionStats proximal_level_compaction_stats(
CompactionReason::kManualCompaction, 1);
expect_pl_stats.cpu_micros = 1;
expect_pl_stats.micros = 1;
expect_pl_stats.bytes_written = bytes_written_in_proximal_level;
expect_pl_stats.num_output_files = 1;
expect_pl_stats.num_output_records = total_output_key_count;
expect_pl_stats.Add(proximal_level_compaction_stats);
}
VerifyCompactionStats(expect_stats, expect_pl_stats, output_level);
// move forward the cold_seq, try to split the data into cold and hot, but in
// this case it's unsafe to split the data

View File

@ -153,23 +153,6 @@ class InternalStats {
InternalStats(int num_levels, SystemClock* clock, ColumnFamilyData* cfd);
// Per level compaction stats
struct CompactionOutputsStats {
uint64_t num_output_records = 0;
uint64_t bytes_written = 0;
uint64_t bytes_written_blob = 0;
uint64_t num_output_files = 0;
uint64_t num_output_files_blob = 0;
void Add(const CompactionOutputsStats& stats) {
this->num_output_records += stats.num_output_records;
this->bytes_written += stats.bytes_written;
this->bytes_written_blob += stats.bytes_written_blob;
this->num_output_files += stats.num_output_files;
this->num_output_files_blob += stats.num_output_files_blob;
}
};
// Per level compaction stats. comp_stats_[level] stores the stats for
// compactions that produced data for the specified "level".
struct CompactionStats {
@ -420,15 +403,6 @@ class InternalStats {
}
}
void Add(const CompactionOutputsStats& stats) {
this->num_output_files += static_cast<int>(stats.num_output_files);
this->num_output_records += stats.num_output_records;
this->bytes_written += stats.bytes_written;
this->bytes_written_blob += stats.bytes_written_blob;
this->num_output_files_blob +=
static_cast<int>(stats.num_output_files_blob);
}
void Subtract(const CompactionStats& c) {
this->micros -= c.micros;
this->cpu_micros -= c.cpu_micros;
@ -473,23 +447,25 @@ class InternalStats {
}
};
// Compaction stats, for per_key_placement compaction, it includes 2 levels
// stats: the last level and the proximal level.
// Compaction internal stats, for per_key_placement compaction, it includes 2
// levels stats: the last level and the proximal level.
struct CompactionStatsFull {
// the stats for the target primary output level
CompactionStats stats;
CompactionStats output_level_stats;
// stats for proximal level output if exist
bool has_proximal_level_output = false;
CompactionStats proximal_level_stats;
explicit CompactionStatsFull() : stats(), proximal_level_stats() {}
explicit CompactionStatsFull()
: output_level_stats(), proximal_level_stats() {}
explicit CompactionStatsFull(CompactionReason reason, int c)
: stats(reason, c), proximal_level_stats(reason, c) {}
: output_level_stats(reason, c), proximal_level_stats(reason, c) {}
uint64_t TotalBytesWritten() const {
uint64_t bytes_written = stats.bytes_written + stats.bytes_written_blob;
uint64_t bytes_written = output_level_stats.bytes_written +
output_level_stats.bytes_written_blob;
if (has_proximal_level_output) {
bytes_written += proximal_level_stats.bytes_written +
proximal_level_stats.bytes_written_blob;
@ -498,23 +474,23 @@ class InternalStats {
}
uint64_t DroppedRecords() {
uint64_t output_records = stats.num_output_records;
uint64_t output_records = output_level_stats.num_output_records;
if (has_proximal_level_output) {
output_records += proximal_level_stats.num_output_records;
}
if (stats.num_input_records > output_records) {
return stats.num_input_records - output_records;
if (output_level_stats.num_input_records > output_records) {
return output_level_stats.num_input_records - output_records;
}
return 0;
}
void SetMicros(uint64_t val) {
stats.micros = val;
output_level_stats.micros = val;
proximal_level_stats.micros = val;
}
void AddCpuMicros(uint64_t val) {
stats.cpu_micros += val;
output_level_stats.cpu_micros += val;
proximal_level_stats.cpu_micros += val;
}
};
@ -587,7 +563,7 @@ class InternalStats {
void AddCompactionStats(int level, Env::Priority thread_pri,
const CompactionStatsFull& comp_stats_full) {
AddCompactionStats(level, thread_pri, comp_stats_full.stats);
AddCompactionStats(level, thread_pri, comp_stats_full.output_level_stats);
if (comp_stats_full.has_proximal_level_output) {
per_key_placement_comp_stats_.Add(comp_stats_full.proximal_level_stats);
}

View File

@ -0,0 +1 @@
Fixed stats for Tiered Storage with preclude_last_level feature