diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index c7c511465..5db3d4bfe 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -41,6 +41,8 @@ #include "segment_meta.h" #define MAX_ROWS_PER_COMPRESSION 1000 +/* gap in sequence id between rows, potential for adding rows in gap later */ +#define SEQUENCE_NUM_GAP 10 #define COMPRESSIONCOL_IS_SEGMENT_BY(col) (col->segmentby_column_index > 0) #define COMPRESSIONCOL_IS_ORDER_BY(col) (col->orderby_column_index > 0) @@ -116,9 +118,12 @@ typedef struct RowCompressor */ int16 *uncompressed_col_to_compressed_col; int16 count_metadata_column_offset; + int16 sequence_num_metadata_column_offset; /* the number of uncompressed rows compressed into the current compressed row */ uint32 rows_compressed_into_current_value; + /* a unique monotonically increasing (according to order by) id for each compressed row */ + int32 sequence_num; /* cached arrays used to build the HeapTuple */ Datum *compressed_values; @@ -414,8 +419,12 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_ int col; Name count_metadata_name = DatumGetName( DirectFunctionCall1(namein, CStringGetDatum(COMPRESSION_COLUMN_METADATA_COUNT_NAME))); + Name sequence_num_metadata_name = DatumGetName( + DirectFunctionCall1(namein, + CStringGetDatum(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME))); AttrNumber count_metadata_column_num = attno_find_by_attname(out_desc, count_metadata_name); - int16 count_metadata_column_offset; + AttrNumber sequence_num_column_num = + attno_find_by_attname(out_desc, sequence_num_metadata_name); Oid compressed_data_type_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid; if (count_metadata_column_num == InvalidAttrNumber) @@ -423,7 +432,10 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_ "missing metadata column '%s' in compressed table", COMPRESSION_COLUMN_METADATA_COUNT_NAME); - count_metadata_column_offset = AttrNumberGetAttrOffset(count_metadata_column_num); + if (sequence_num_column_num == InvalidAttrNumber) + elog(ERROR, + "missing metadata column '%s' in compressed table", + COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME); *row_compressor = (RowCompressor){ .compressed_table = compressed_table, @@ -433,10 +445,12 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_ .uncompressed_col_to_compressed_col = palloc0(sizeof(*row_compressor->uncompressed_col_to_compressed_col) * uncompressed_tuple_desc->natts), - .count_metadata_column_offset = count_metadata_column_offset, + .count_metadata_column_offset = AttrNumberGetAttrOffset(count_metadata_column_num), + .sequence_num_metadata_column_offset = AttrNumberGetAttrOffset(sequence_num_column_num), .compressed_values = palloc(sizeof(Datum) * num_columns_in_compressed_table), .compressed_is_null = palloc(sizeof(bool) * num_columns_in_compressed_table), .rows_compressed_into_current_value = 0, + .sequence_num = SEQUENCE_NUM_GAP, }; memset(row_compressor->compressed_is_null, 1, sizeof(bool) * num_columns_in_compressed_table); @@ -703,6 +717,16 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change Int32GetDatum(row_compressor->rows_compressed_into_current_value); row_compressor->compressed_is_null[row_compressor->count_metadata_column_offset] = false; + row_compressor->compressed_values[row_compressor->sequence_num_metadata_column_offset] = + Int32GetDatum(row_compressor->sequence_num); + row_compressor->compressed_is_null[row_compressor->sequence_num_metadata_column_offset] = false; + + /* overflow could happen only if chunk has more than 200B rows */ + if (row_compressor->sequence_num > PG_INT32_MAX - SEQUENCE_NUM_GAP) + elog(ERROR, "sequence id overflow"); + + row_compressor->sequence_num += SEQUENCE_NUM_GAP; + compressed_tuple = heap_form_tuple(RelationGetDescr(row_compressor->compressed_table), row_compressor->compressed_values, row_compressor->compressed_is_null); diff --git a/tsl/src/compression/create.c b/tsl/src/compression/create.c index a54acfe1d..6a45afe25 100644 --- a/tsl/src/compression/create.c +++ b/tsl/src/compression/create.c @@ -131,6 +131,14 @@ compresscolinfo_add_metadata_columns(CompressColInfo *cc) INT4OID, -1 /* typemod */, 0 /*collation*/)); + /* sequence_num column */ + cc->coldeflist = lappend(cc->coldeflist, + + /* count of the number of uncompressed rows */ + makeColumnDef(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME, + INT4OID, + -1 /* typemod */, + 0 /*collation*/)); for (colno = 0; colno < cc->numcols; colno++) { diff --git a/tsl/src/compression/create.h b/tsl/src/compression/create.h index 6897fceaf..3df15166d 100644 --- a/tsl/src/compression/create.h +++ b/tsl/src/compression/create.h @@ -14,6 +14,8 @@ #define COMPRESSION_COLUMN_METADATA_PREFIX "_ts_meta_" #define COMPRESSION_COLUMN_METADATA_COUNT_NAME COMPRESSION_COLUMN_METADATA_PREFIX "count" +#define COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME \ + COMPRESSION_COLUMN_METADATA_PREFIX "sequence_num" bool tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht, WithClauseResult *with_clause_options); diff --git a/tsl/test/expected/compress_table.out b/tsl/test/expected/compress_table.out index b1d22e281..12698fab4 100644 --- a/tsl/test/expected/compress_table.out +++ b/tsl/test/expected/compress_table.out @@ -50,6 +50,7 @@ CREATE TABLE uncompressed( texts TEXT); CREATE TABLE compressed( _ts_meta_count int, + _ts_meta_sequence_num int, _ts_meta_min_max_1 _timescaledb_internal.segment_meta_min_max, _ts_meta_min_max_2 _timescaledb_internal.segment_meta_min_max, time _timescaledb_internal.compressed_data, @@ -396,6 +397,7 @@ CREATE TABLE uncompressed( time FLOAT); CREATE TABLE compressed( _ts_meta_count int, + _ts_meta_sequence_num int, _ts_meta_min_max_1 _timescaledb_internal.segment_meta_min_max, b _timescaledb_internal.compressed_data, device _timescaledb_internal.compressed_data, diff --git a/tsl/test/expected/compression.out b/tsl/test/expected/compression.out index 79c5c6df0..f8fcd2cf5 100644 --- a/tsl/test/expected/compression.out +++ b/tsl/test/expected/compression.out @@ -117,6 +117,8 @@ select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days':: alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby = 'location', timescaledb.compress_orderby = 'time'); insert into conditions select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75; +insert into conditions +select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'NYC', 'klick', 55, 75; select hypertable_id, attname, compression_algorithm_id , al.name from _timescaledb_catalog.hypertable_compression hc, _timescaledb_catalog.hypertable ht, @@ -136,16 +138,17 @@ where cl.oid = at.attrelid and at.attnum > 0 and cl.relname = '_compressed_hypertable_4' and atttypid = ty.oid order by at.attnum; - attname | attstorage | typname ---------------------+------------+---------------------- - time | e | compressed_data - location | x | text - location2 | x | compressed_data - temperature | e | compressed_data - humidity | e | compressed_data - _ts_meta_count | p | int4 - _ts_meta_min_max_1 | e | segment_meta_min_max -(7 rows) + attname | attstorage | typname +-----------------------+------------+---------------------- + time | e | compressed_data + location | x | text + location2 | x | compressed_data + temperature | e | compressed_data + humidity | e | compressed_data + _ts_meta_count | p | int4 + _ts_meta_sequence_num | p | int4 + _ts_meta_min_max_1 | e | segment_meta_min_max +(8 rows) SELECT ch1.schema_name|| '.' || ch1.table_name as "CHUNK_NAME", ch1.id "CHUNK_ID" FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'conditions' @@ -153,15 +156,15 @@ LIMIT 1 \gset SELECT count(*) from :CHUNK_NAME; count ------- - 21 + 42 (1 row) SELECT count(*) as "ORIGINAL_CHUNK_COUNT" from :CHUNK_NAME \gset select tableoid::regclass, count(*) from conditions group by tableoid order by tableoid; tableoid | count ----------------------------------------+------- - _timescaledb_internal._hyper_3_7_chunk | 21 - _timescaledb_internal._hyper_3_8_chunk | 10 + _timescaledb_internal._hyper_3_7_chunk | 42 + _timescaledb_internal._hyper_3_8_chunk | 20 (2 rows) select compress_chunk(ch1.schema_name|| '.' || ch1.table_name) @@ -176,7 +179,7 @@ FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch select tableoid::regclass, count(*) from conditions group by tableoid order by tableoid; tableoid | count ----------------------------------------+------- - _timescaledb_internal._hyper_3_8_chunk | 10 + _timescaledb_internal._hyper_3_8_chunk | 20 (1 row) select compress_chunk(ch1.schema_name|| '.' || ch1.table_name) @@ -203,15 +206,22 @@ SELECT count(*) from :CHUNK_NAME; SELECT count(*) from :COMPRESSED_CHUNK_NAME; count ------- - 1 + 2 (1 row) SELECT sum(_ts_meta_count) from :COMPRESSED_CHUNK_NAME; sum ----- - 21 + 42 (1 row) +SELECT _ts_meta_sequence_num from :COMPRESSED_CHUNK_NAME; + _ts_meta_sequence_num +----------------------- + 10 + 20 +(2 rows) + \x select * from timescaledb_information.compressed_chunk_size where hypertable_name::text like 'conditions' @@ -274,7 +284,7 @@ FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch SELECT count(*), count(*) = :'ORIGINAL_CHUNK_COUNT' from :CHUNK_NAME; count | ?column? -------+---------- - 21 | t + 42 | t (1 row) --check that the compressed chunk is dropped diff --git a/tsl/test/sql/compress_table.sql b/tsl/test/sql/compress_table.sql index 9f739f180..df1efba1d 100644 --- a/tsl/test/sql/compress_table.sql +++ b/tsl/test/sql/compress_table.sql @@ -49,6 +49,7 @@ CREATE TABLE uncompressed( CREATE TABLE compressed( _ts_meta_count int, + _ts_meta_sequence_num int, _ts_meta_min_max_1 _timescaledb_internal.segment_meta_min_max, _ts_meta_min_max_2 _timescaledb_internal.segment_meta_min_max, time _timescaledb_internal.compressed_data, @@ -175,6 +176,7 @@ CREATE TABLE uncompressed( CREATE TABLE compressed( _ts_meta_count int, + _ts_meta_sequence_num int, _ts_meta_min_max_1 _timescaledb_internal.segment_meta_min_max, b _timescaledb_internal.compressed_data, device _timescaledb_internal.compressed_data, diff --git a/tsl/test/sql/compression.sql b/tsl/test/sql/compression.sql index d80d2082f..c2016c679 100644 --- a/tsl/test/sql/compression.sql +++ b/tsl/test/sql/compression.sql @@ -51,6 +51,8 @@ select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days':: alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby = 'location', timescaledb.compress_orderby = 'time'); insert into conditions select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75; +insert into conditions +select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'NYC', 'klick', 55, 75; select hypertable_id, attname, compression_algorithm_id , al.name from _timescaledb_catalog.hypertable_compression hc, @@ -92,6 +94,7 @@ where uncompressed.compressed_chunk_id = compressed.id AND uncompressed.id = :'C SELECT count(*) from :CHUNK_NAME; SELECT count(*) from :COMPRESSED_CHUNK_NAME; SELECT sum(_ts_meta_count) from :COMPRESSED_CHUNK_NAME; +SELECT _ts_meta_sequence_num from :COMPRESSED_CHUNK_NAME; \x select * from timescaledb_information.compressed_chunk_size