Add counts to compression statistics

Store information related to compressed and uncompressed row
counts after compressing a chunk. This is saved in
compression_chunk_size table.
This commit is contained in:
gayyappan 2020-06-10 12:34:18 -04:00 committed by gayyappan
parent 5aaa07b9ee
commit b93b30b0c2
11 changed files with 138 additions and 39 deletions

0
scripts/test_updates.sh Normal file → Executable file
View File

1
scripts/test_updates_pg12.sh Normal file → Executable file
View File

@ -4,6 +4,7 @@ set -e
set -o pipefail
SCRIPT_DIR=$(dirname $0)
echo $SCRIPT_DIR
TAGS="1.7.0-pg12 1.7.1-pg12"
TEST_VERSION="v6"

View File

@ -358,6 +358,8 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_chunk_size (
compressed_heap_size BIGINT NOT NULL,
compressed_toast_size BIGINT NOT NULL,
compressed_index_size BIGINT NOT NULL,
numrows_pre_compression BIGINT,
numrows_post_compression BIGINT,
PRIMARY KEY(chunk_id, compressed_chunk_id)
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', '');

View File

@ -58,6 +58,11 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.remote_txn', ''
GRANT SELECT ON _timescaledb_catalog.remote_txn TO PUBLIC;
-- Update drop_chunks policy table
--since we are recreating the bgw_policy_drop_chunks table, it has to be explicitly
-- removed from the extension as it was configured to be dumped by pg_extension_config_dump.
-- So remove it from the extension first and then recreate a new table.
CREATE TABLE _timescaledb_config.bgw_policy_drop_chunks_tmp (
job_id INTEGER PRIMARY KEY
REFERENCES _timescaledb_config.bgw_job(id)
@ -80,7 +85,6 @@ INSERT INTO _timescaledb_config.bgw_policy_drop_chunks_tmp (
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_config.bgw_policy_drop_chunks;
DROP VIEW IF EXISTS timescaledb_information.policy_stats;
DROP VIEW IF EXISTS timescaledb_information.policy_stats;
DROP VIEW IF EXISTS timescaledb_information.drop_chunks_policies;
DROP TABLE IF EXISTS _timescaledb_config.bgw_policy_drop_chunks;
@ -102,3 +106,15 @@ INSERT INTO _timescaledb_config.bgw_policy_drop_chunks (
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_policy_drop_chunks', '');
DROP TABLE _timescaledb_config.bgw_policy_drop_chunks_tmp;
GRANT SELECT ON _timescaledb_config.bgw_policy_drop_chunks TO PUBLIC;
DROP VIEW IF EXISTS timescaledb_information.compressed_hypertable_stats;
DROP VIEW IF EXISTS timescaledb_information.compressed_chunk_stats;
-- all existing compressed chunks have NULL value for the new columns
ALTER TABLE IF EXISTS _timescaledb_catalog.compression_chunk_size ADD COLUMN IF NOT EXISTS numrows_pre_compression BIGINT;
ALTER TABLE IF EXISTS _timescaledb_catalog.compression_chunk_size ADD COLUMN IF NOT EXISTS numrows_post_compression BIGINT;
--rewrite catalog table to not break catalog scans on tables with missingval optimization
CLUSTER _timescaledb_catalog.compression_chunk_size USING compression_chunk_size_pkey;
ALTER TABLE _timescaledb_catalog.compression_chunk_size SET WITHOUT CLUSTER;

View File

@ -1241,6 +1241,8 @@ typedef enum Anum_compression_chunk_size
Anum_compression_chunk_size_compressed_heap_size,
Anum_compression_chunk_size_compressed_toast_size,
Anum_compression_chunk_size_compressed_index_size,
Anum_compression_chunk_size_numrows_pre_compression,
Anum_compression_chunk_size_numrows_post_compression,
_Anum_compression_chunk_size_max,
} Anum_compression_chunk_size;
@ -1256,6 +1258,8 @@ typedef struct FormData_compression_chunk_size
int64 compressed_heap_size;
int64 compressed_toast_size;
int64 compressed_index_size;
int64 numrows_pre_compression;
int64 numrows_post_compression;
} FormData_compression_chunk_size;
typedef FormData_compression_chunk_size *Form_compression_chunk_size;

View File

@ -47,17 +47,41 @@ ts_compression_chunk_size_totals()
ts_scanner_foreach(&iterator)
{
bool nulls[Natts_compression_chunk_size];
Datum values[Natts_compression_chunk_size];
FormData_compression_chunk_size fd;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
FormData_compression_chunk_size *fd = STRUCT_FROM_TUPLE(ti->tuple,
ti->mctx,
FormData_compression_chunk_size,
FormData_compression_chunk_size);
sizes.uncompressed_heap_size += fd->uncompressed_heap_size;
sizes.uncompressed_index_size += fd->uncompressed_index_size;
sizes.uncompressed_toast_size += fd->uncompressed_toast_size;
sizes.compressed_heap_size += fd->compressed_heap_size;
sizes.compressed_index_size += fd->compressed_index_size;
sizes.compressed_toast_size += fd->compressed_toast_size;
heap_deform_tuple(ti->tuple, ti->desc, values, nulls);
memset(&fd, 0, sizeof(FormData_compression_chunk_size));
Assert(!nulls[AttrNumberGetAttrOffset(Anum_compression_chunk_size_uncompressed_heap_size)]);
Assert(
!nulls[AttrNumberGetAttrOffset(Anum_compression_chunk_size_uncompressed_index_size)]);
Assert(
!nulls[AttrNumberGetAttrOffset(Anum_compression_chunk_size_uncompressed_toast_size)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_heap_size)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_index_size)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_toast_size)]);
fd.uncompressed_heap_size = DatumGetInt64(
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_uncompressed_heap_size)]);
fd.uncompressed_index_size = DatumGetInt64(
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_uncompressed_index_size)]);
fd.uncompressed_toast_size = DatumGetInt64(
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_uncompressed_toast_size)]);
fd.compressed_heap_size = DatumGetInt64(
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_heap_size)]);
fd.compressed_index_size = DatumGetInt64(
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_index_size)]);
fd.compressed_toast_size = DatumGetInt64(
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_toast_size)]);
sizes.uncompressed_heap_size += fd.uncompressed_heap_size;
sizes.uncompressed_index_size += fd.uncompressed_index_size;
sizes.uncompressed_toast_size += fd.uncompressed_toast_size;
sizes.compressed_heap_size += fd.compressed_heap_size;
sizes.compressed_index_size += fd.compressed_index_size;
sizes.compressed_toast_size += fd.compressed_toast_size;
}
return sizes;

View File

@ -80,7 +80,8 @@ compute_chunk_size(Oid chunk_relid)
static void
compression_chunk_size_catalog_insert(int32 src_chunk_id, ChunkSize *src_size,
int32 compress_chunk_id, ChunkSize *compress_size)
int32 compress_chunk_id, ChunkSize *compress_size,
int64 rowcnt_pre_compression, int64 rowcnt_post_compression)
{
Catalog *catalog = ts_catalog_get();
Relation rel;
@ -111,6 +112,10 @@ compression_chunk_size_catalog_insert(int32 src_chunk_id, ChunkSize *src_size,
Int64GetDatum(compress_size->toast_size);
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_index_size)] =
Int64GetDatum(compress_size->index_size);
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_pre_compression)] =
Int64GetDatum(rowcnt_pre_compression);
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_post_compression)] =
Int64GetDatum(rowcnt_post_compression);
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
@ -214,6 +219,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
const ColumnCompressionInfo **colinfo_array;
int i = 0, htcols_listlen;
ChunkSize before_size, after_size;
CompressionStats cstat;
hcache = ts_hypertable_cache_pin();
compresschunkcxt_init(&cxt, hcache, hypertable_relid, chunk_relid);
@ -241,7 +247,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
colinfo_array[i++] = fd;
}
before_size = compute_chunk_size(cxt.srcht_chunk->table_id);
compress_chunk(cxt.srcht_chunk->table_id,
cstat = compress_chunk(cxt.srcht_chunk->table_id,
compress_ht_chunk->table_id,
colinfo_array,
htcols_listlen);
@ -267,7 +273,9 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
compression_chunk_size_catalog_insert(cxt.srcht_chunk->fd.id,
&before_size,
compress_ht_chunk->fd.id,
&after_size);
&after_size,
cstat.rowcnt_pre_compression,
cstat.rowcnt_post_compression);
ts_chunk_set_compressed_chunk(cxt.srcht_chunk, compress_ht_chunk->fd.id, false);
ts_cache_release(hcache);

View File

@ -136,6 +136,8 @@ typedef struct RowCompressor
/* cached arrays used to build the HeapTuple */
Datum *compressed_values;
bool *compressed_is_null;
int64 rowcnt_pre_compression;
int64 num_compressed_rows;
} RowCompressor;
static int16 *compress_chunk_populate_keys(Oid in_table, const ColumnCompressionInfo **columns,
@ -212,12 +214,13 @@ truncate_relation(Oid table_oid)
reindex_relation(table_oid, REINDEX_REL_PROCESS_TOAST, 0);
}
void
CompressionStats
compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column_compression_info,
int num_compression_infos)
{
int n_keys;
const ColumnCompressionInfo **keys;
CompressionStats cstat;
/* We want to prevent other compressors from compressing this table,
* and we want to prevent INSERTs or UPDATEs which could mess up our compression.
@ -269,6 +272,9 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
table_close(out_rel, NoLock);
table_close(in_rel, NoLock);
cstat.rowcnt_pre_compression = row_compressor.rowcnt_pre_compression;
cstat.rowcnt_post_compression = row_compressor.num_compressed_rows;
return cstat;
}
static int16 *
@ -482,6 +488,8 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
.compressed_values = palloc(sizeof(Datum) * num_columns_in_compressed_table),
.compressed_is_null = palloc(sizeof(bool) * num_columns_in_compressed_table),
.rows_compressed_into_current_value = 0,
.rowcnt_pre_compression = 0,
.num_compressed_rows = 0,
.sequence_num = SEQUENCE_NUM_GAP,
};
@ -823,7 +831,8 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
row_compressor->compressed_values[compressed_col] = 0;
row_compressor->compressed_is_null[compressed_col] = true;
}
row_compressor->rowcnt_pre_compression += row_compressor->rows_compressed_into_current_value;
row_compressor->num_compressed_rows++;
row_compressor->rows_compressed_into_current_value = 0;
MemoryContextReset(row_compressor->per_row_ctx);
}

View File

@ -107,6 +107,12 @@ typedef enum CompressionAlgorithms
_MAX_NUM_COMPRESSION_ALGORITHMS = 128,
} CompressionAlgorithms;
typedef struct CompressionStats
{
int64 rowcnt_pre_compression;
int64 rowcnt_post_compression;
} CompressionStats;
extern Datum tsl_compressed_data_decompress_forward(PG_FUNCTION_ARGS);
extern Datum tsl_compressed_data_decompress_reverse(PG_FUNCTION_ARGS);
extern Datum tsl_compressed_data_send(PG_FUNCTION_ARGS);
@ -136,8 +142,9 @@ pg_attribute_unused() assert_num_compression_algorithms_sane(void)
}
extern CompressionStorage compression_get_toast_storage(CompressionAlgorithms algo);
extern void compress_chunk(Oid in_table, Oid out_table,
const ColumnCompressionInfo **column_compression_info, int num_columns);
extern CompressionStats compress_chunk(Oid in_table, Oid out_table,
const ColumnCompressionInfo **column_compression_info,
int num_columns);
extern void decompress_chunk(Oid in_table, Oid out_table);
extern DecompressionIterator *(*tsl_get_decompression_iterator_init(

View File

@ -116,7 +116,7 @@ select compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');
\x
select * from _timescaledb_catalog.compression_chunk_size
order by chunk_id;
-[ RECORD 1 ]-----------+------
-[ RECORD 1 ]------------+------
chunk_id | 1
compressed_chunk_id | 6
uncompressed_heap_size | 8192
@ -125,7 +125,9 @@ uncompressed_index_size | 32768
compressed_heap_size | 8192
compressed_toast_size | 8192
compressed_index_size | 32768
-[ RECORD 2 ]-----------+------
numrows_pre_compression | 1
numrows_post_compression | 1
-[ RECORD 2 ]------------+------
chunk_id | 2
compressed_chunk_id | 5
uncompressed_heap_size | 8192
@ -134,6 +136,8 @@ uncompressed_index_size | 32768
compressed_heap_size | 8192
compressed_toast_size | 8192
compressed_index_size | 32768
numrows_pre_compression | 1
numrows_post_compression | 1
\x
select ch1.id, ch1.schema_name, ch1.table_name , ch2.table_name as compress_table
@ -345,6 +349,22 @@ SELECT _ts_meta_sequence_num from :COMPRESSED_CHUNK_NAME;
(2 rows)
\x
SELECT chunk_id, numrows_pre_compression, numrows_post_compression
FROM _timescaledb_catalog.chunk srcch,
_timescaledb_catalog.compression_chunk_size map,
_timescaledb_catalog.hypertable srcht
WHERE map.chunk_id = srcch.id and srcht.id = srcch.hypertable_id
and srcht.table_name like 'conditions'
order by chunk_id;
-[ RECORD 1 ]------------+---
chunk_id | 12
numrows_pre_compression | 42
numrows_post_compression | 2
-[ RECORD 2 ]------------+---
chunk_id | 13
numrows_pre_compression | 20
numrows_post_compression | 2
select * from timescaledb_information.compressed_chunk_stats
where hypertable_name::text like 'conditions'
order by hypertable_name, chunk_name;

View File

@ -157,6 +157,14 @@ SELECT sum(_ts_meta_count) from :COMPRESSED_CHUNK_NAME;
SELECT _ts_meta_sequence_num from :COMPRESSED_CHUNK_NAME;
\x
SELECT chunk_id, numrows_pre_compression, numrows_post_compression
FROM _timescaledb_catalog.chunk srcch,
_timescaledb_catalog.compression_chunk_size map,
_timescaledb_catalog.hypertable srcht
WHERE map.chunk_id = srcch.id and srcht.id = srcch.hypertable_id
and srcht.table_name like 'conditions'
order by chunk_id;
select * from timescaledb_information.compressed_chunk_stats
where hypertable_name::text like 'conditions'
order by hypertable_name, chunk_name;