Add sequence id metadata col to compressed table

Add a sequence id to the compressed table. This id increments
monotonically for each compressed row in a way that follows
the order by clause. We leave gaps to allow for the
possibility to fill in rows due to e.g. inserts down
the line.

The sequence id is global to the entire chunk and does not reset
for each segment-by-group-change since this has the potential
to allow some micro optimizations when ordering by a segment by
columns as well.

The sequence number is a INT32, which allows up to 200 billion
uncompressed rows per chunk to be supported (assuming 1000 rows
per compressed row and a gap of 10). Overflow is checked in the
code and will error if this is breached.
This commit is contained in:
Matvey Arye 2019-08-27 12:27:43 -04:00 committed by Matvey Arye
parent 6d0dfdfe1a
commit 5c891f732e
7 changed files with 71 additions and 20 deletions

View File

@ -41,6 +41,8 @@
#include "segment_meta.h"
#define MAX_ROWS_PER_COMPRESSION 1000
/* gap in sequence id between rows, potential for adding rows in gap later */
#define SEQUENCE_NUM_GAP 10
#define COMPRESSIONCOL_IS_SEGMENT_BY(col) (col->segmentby_column_index > 0)
#define COMPRESSIONCOL_IS_ORDER_BY(col) (col->orderby_column_index > 0)
@ -116,9 +118,12 @@ typedef struct RowCompressor
*/
int16 *uncompressed_col_to_compressed_col;
int16 count_metadata_column_offset;
int16 sequence_num_metadata_column_offset;
/* the number of uncompressed rows compressed into the current compressed row */
uint32 rows_compressed_into_current_value;
/* a unique monotonically increasing (according to order by) id for each compressed row */
int32 sequence_num;
/* cached arrays used to build the HeapTuple */
Datum *compressed_values;
@ -414,8 +419,12 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
int col;
Name count_metadata_name = DatumGetName(
DirectFunctionCall1(namein, CStringGetDatum(COMPRESSION_COLUMN_METADATA_COUNT_NAME)));
Name sequence_num_metadata_name = DatumGetName(
DirectFunctionCall1(namein,
CStringGetDatum(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME)));
AttrNumber count_metadata_column_num = attno_find_by_attname(out_desc, count_metadata_name);
int16 count_metadata_column_offset;
AttrNumber sequence_num_column_num =
attno_find_by_attname(out_desc, sequence_num_metadata_name);
Oid compressed_data_type_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;
if (count_metadata_column_num == InvalidAttrNumber)
@ -423,7 +432,10 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
"missing metadata column '%s' in compressed table",
COMPRESSION_COLUMN_METADATA_COUNT_NAME);
count_metadata_column_offset = AttrNumberGetAttrOffset(count_metadata_column_num);
if (sequence_num_column_num == InvalidAttrNumber)
elog(ERROR,
"missing metadata column '%s' in compressed table",
COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME);
*row_compressor = (RowCompressor){
.compressed_table = compressed_table,
@ -433,10 +445,12 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
.uncompressed_col_to_compressed_col =
palloc0(sizeof(*row_compressor->uncompressed_col_to_compressed_col) *
uncompressed_tuple_desc->natts),
.count_metadata_column_offset = count_metadata_column_offset,
.count_metadata_column_offset = AttrNumberGetAttrOffset(count_metadata_column_num),
.sequence_num_metadata_column_offset = AttrNumberGetAttrOffset(sequence_num_column_num),
.compressed_values = palloc(sizeof(Datum) * num_columns_in_compressed_table),
.compressed_is_null = palloc(sizeof(bool) * num_columns_in_compressed_table),
.rows_compressed_into_current_value = 0,
.sequence_num = SEQUENCE_NUM_GAP,
};
memset(row_compressor->compressed_is_null, 1, sizeof(bool) * num_columns_in_compressed_table);
@ -703,6 +717,16 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
Int32GetDatum(row_compressor->rows_compressed_into_current_value);
row_compressor->compressed_is_null[row_compressor->count_metadata_column_offset] = false;
row_compressor->compressed_values[row_compressor->sequence_num_metadata_column_offset] =
Int32GetDatum(row_compressor->sequence_num);
row_compressor->compressed_is_null[row_compressor->sequence_num_metadata_column_offset] = false;
/* overflow could happen only if chunk has more than 200B rows */
if (row_compressor->sequence_num > PG_INT32_MAX - SEQUENCE_NUM_GAP)
elog(ERROR, "sequence id overflow");
row_compressor->sequence_num += SEQUENCE_NUM_GAP;
compressed_tuple = heap_form_tuple(RelationGetDescr(row_compressor->compressed_table),
row_compressor->compressed_values,
row_compressor->compressed_is_null);

View File

@ -131,6 +131,14 @@ compresscolinfo_add_metadata_columns(CompressColInfo *cc)
INT4OID,
-1 /* typemod */,
0 /*collation*/));
/* sequence_num column */
cc->coldeflist = lappend(cc->coldeflist,
/* count of the number of uncompressed rows */
makeColumnDef(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME,
INT4OID,
-1 /* typemod */,
0 /*collation*/));
for (colno = 0; colno < cc->numcols; colno++)
{

View File

@ -14,6 +14,8 @@
#define COMPRESSION_COLUMN_METADATA_PREFIX "_ts_meta_"
#define COMPRESSION_COLUMN_METADATA_COUNT_NAME COMPRESSION_COLUMN_METADATA_PREFIX "count"
#define COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME \
COMPRESSION_COLUMN_METADATA_PREFIX "sequence_num"
bool tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options);

View File

@ -50,6 +50,7 @@ CREATE TABLE uncompressed(
texts TEXT);
CREATE TABLE compressed(
_ts_meta_count int,
_ts_meta_sequence_num int,
_ts_meta_min_max_1 _timescaledb_internal.segment_meta_min_max,
_ts_meta_min_max_2 _timescaledb_internal.segment_meta_min_max,
time _timescaledb_internal.compressed_data,
@ -396,6 +397,7 @@ CREATE TABLE uncompressed(
time FLOAT);
CREATE TABLE compressed(
_ts_meta_count int,
_ts_meta_sequence_num int,
_ts_meta_min_max_1 _timescaledb_internal.segment_meta_min_max,
b _timescaledb_internal.compressed_data,
device _timescaledb_internal.compressed_data,

View File

@ -117,6 +117,8 @@ select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days'::
alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby = 'location', timescaledb.compress_orderby = 'time');
insert into conditions
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75;
insert into conditions
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'NYC', 'klick', 55, 75;
select hypertable_id, attname, compression_algorithm_id , al.name
from _timescaledb_catalog.hypertable_compression hc,
_timescaledb_catalog.hypertable ht,
@ -136,16 +138,17 @@ where cl.oid = at.attrelid and at.attnum > 0
and cl.relname = '_compressed_hypertable_4'
and atttypid = ty.oid
order by at.attnum;
attname | attstorage | typname
--------------------+------------+----------------------
time | e | compressed_data
location | x | text
location2 | x | compressed_data
temperature | e | compressed_data
humidity | e | compressed_data
_ts_meta_count | p | int4
_ts_meta_min_max_1 | e | segment_meta_min_max
(7 rows)
attname | attstorage | typname
-----------------------+------------+----------------------
time | e | compressed_data
location | x | text
location2 | x | compressed_data
temperature | e | compressed_data
humidity | e | compressed_data
_ts_meta_count | p | int4
_ts_meta_sequence_num | p | int4
_ts_meta_min_max_1 | e | segment_meta_min_max
(8 rows)
SELECT ch1.schema_name|| '.' || ch1.table_name as "CHUNK_NAME", ch1.id "CHUNK_ID"
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'conditions'
@ -153,15 +156,15 @@ LIMIT 1 \gset
SELECT count(*) from :CHUNK_NAME;
count
-------
21
42
(1 row)
SELECT count(*) as "ORIGINAL_CHUNK_COUNT" from :CHUNK_NAME \gset
select tableoid::regclass, count(*) from conditions group by tableoid order by tableoid;
tableoid | count
----------------------------------------+-------
_timescaledb_internal._hyper_3_7_chunk | 21
_timescaledb_internal._hyper_3_8_chunk | 10
_timescaledb_internal._hyper_3_7_chunk | 42
_timescaledb_internal._hyper_3_8_chunk | 20
(2 rows)
select compress_chunk(ch1.schema_name|| '.' || ch1.table_name)
@ -176,7 +179,7 @@ FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch
select tableoid::regclass, count(*) from conditions group by tableoid order by tableoid;
tableoid | count
----------------------------------------+-------
_timescaledb_internal._hyper_3_8_chunk | 10
_timescaledb_internal._hyper_3_8_chunk | 20
(1 row)
select compress_chunk(ch1.schema_name|| '.' || ch1.table_name)
@ -203,15 +206,22 @@ SELECT count(*) from :CHUNK_NAME;
SELECT count(*) from :COMPRESSED_CHUNK_NAME;
count
-------
1
2
(1 row)
SELECT sum(_ts_meta_count) from :COMPRESSED_CHUNK_NAME;
sum
-----
21
42
(1 row)
SELECT _ts_meta_sequence_num from :COMPRESSED_CHUNK_NAME;
_ts_meta_sequence_num
-----------------------
10
20
(2 rows)
\x
select * from timescaledb_information.compressed_chunk_size
where hypertable_name::text like 'conditions'
@ -274,7 +284,7 @@ FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch
SELECT count(*), count(*) = :'ORIGINAL_CHUNK_COUNT' from :CHUNK_NAME;
count | ?column?
-------+----------
21 | t
42 | t
(1 row)
--check that the compressed chunk is dropped

View File

@ -49,6 +49,7 @@ CREATE TABLE uncompressed(
CREATE TABLE compressed(
_ts_meta_count int,
_ts_meta_sequence_num int,
_ts_meta_min_max_1 _timescaledb_internal.segment_meta_min_max,
_ts_meta_min_max_2 _timescaledb_internal.segment_meta_min_max,
time _timescaledb_internal.compressed_data,
@ -175,6 +176,7 @@ CREATE TABLE uncompressed(
CREATE TABLE compressed(
_ts_meta_count int,
_ts_meta_sequence_num int,
_ts_meta_min_max_1 _timescaledb_internal.segment_meta_min_max,
b _timescaledb_internal.compressed_data,
device _timescaledb_internal.compressed_data,

View File

@ -51,6 +51,8 @@ select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days'::
alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby = 'location', timescaledb.compress_orderby = 'time');
insert into conditions
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75;
insert into conditions
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'NYC', 'klick', 55, 75;
select hypertable_id, attname, compression_algorithm_id , al.name
from _timescaledb_catalog.hypertable_compression hc,
@ -92,6 +94,7 @@ where uncompressed.compressed_chunk_id = compressed.id AND uncompressed.id = :'C
SELECT count(*) from :CHUNK_NAME;
SELECT count(*) from :COMPRESSED_CHUNK_NAME;
SELECT sum(_ts_meta_count) from :COMPRESSED_CHUNK_NAME;
SELECT _ts_meta_sequence_num from :COMPRESSED_CHUNK_NAME;
\x
select * from timescaledb_information.compressed_chunk_size