From df4c4445513de501624607368f17fca64842a84d Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Fri, 6 Sep 2019 17:20:33 -0400 Subject: [PATCH] Delete related rows for compression This fixes delete of relate rows when we have compressed hypertables. Namely we delete rows from: - compression_chunk_size - hypertable_compression We also fix hypertable_compression to handle NULLS correctly. We add a stub for tests with continuous aggs as well as compression. But, that's broken for now so it's commented out. Will be fixed in another PR. --- src/CMakeLists.txt | 1 + src/chunk.c | 2 + src/compression_chunk_size.c | 39 +++++++++++++ src/compression_chunk_size.h | 13 +++++ src/hypertable.c | 4 ++ src/hypertable_compression.c | 61 +++++++++++++-------- src/hypertable_compression.h | 9 ++- tsl/src/compression/compress_utils.c | 33 +---------- tsl/src/compression/create.c | 4 +- tsl/src/decompress_chunk/decompress_chunk.c | 2 +- tsl/src/decompress_chunk/exec.c | 2 +- tsl/test/expected/compression_ddl.out | 43 ++++++++++++++- tsl/test/sql/compression_ddl.sql | 30 +++++++++- 13 files changed, 178 insertions(+), 65 deletions(-) create mode 100644 src/compression_chunk_size.c create mode 100644 src/compression_chunk_size.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e8581b700..47aca2d13 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -17,6 +17,7 @@ set(SOURCES constraint_aware_append.c cross_module_fn.c copy.c + compression_chunk_size.c compression_with_clause.c compression_segment_meta_min_max.c dimension.c diff --git a/src/chunk.c b/src/chunk.c index 0a3ee4450..c924475a8 100644 --- a/src/chunk.c +++ b/src/chunk.c @@ -55,6 +55,7 @@ #include "cache.h" #include "bgw_policy/chunk_stats.h" #include "scan_iterator.h" +#include "compression_chunk_size.h" TS_FUNCTION_INFO_V1(ts_chunk_show_chunks); TS_FUNCTION_INFO_V1(ts_chunk_drop_chunks); @@ -1701,6 +1702,7 @@ chunk_tuple_delete(TupleInfo *ti, void *data) chunk_formdata_fill(&form, ti->tuple, ti->desc); ts_chunk_constraint_delete_by_chunk_id(form.id, ccs); ts_chunk_index_delete_by_chunk_id(form.id, true); + ts_compression_chunk_size_delete(form.id); /* Check for dimension slices that are orphaned by the chunk deletion */ for (i = 0; i < ccs->num_constraints; i++) diff --git a/src/compression_chunk_size.c b/src/compression_chunk_size.c new file mode 100644 index 000000000..5fdbf7474 --- /dev/null +++ b/src/compression_chunk_size.c @@ -0,0 +1,39 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ +#include + +#include "compression_chunk_size.h" +#include "catalog.h" +#include "scanner.h" +#include "scan_iterator.h" + +static void +init_scan_by_uncompressed_chunk_id(ScanIterator *iterator, int32 uncompressed_chunk_id) +{ + iterator->ctx.index = + catalog_get_index(ts_catalog_get(), COMPRESSION_CHUNK_SIZE, COMPRESSION_CHUNK_SIZE_PKEY); + ts_scan_iterator_scan_key_init(iterator, + Anum_compression_chunk_size_pkey_chunk_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(uncompressed_chunk_id)); +} + +TSDLLEXPORT int +ts_compression_chunk_size_delete(int32 uncompressed_chunk_id) +{ + ScanIterator iterator = + ts_scan_iterator_create(COMPRESSION_CHUNK_SIZE, RowExclusiveLock, CurrentMemoryContext); + int count = 0; + + init_scan_by_uncompressed_chunk_id(&iterator, uncompressed_chunk_id); + ts_scanner_foreach(&iterator) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); + ts_catalog_delete(ti->scanrel, ti->tuple); + } + return count; +} diff --git a/src/compression_chunk_size.h b/src/compression_chunk_size.h new file mode 100644 index 000000000..41084858c --- /dev/null +++ b/src/compression_chunk_size.h @@ -0,0 +1,13 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ +#ifndef TIMESCALEDB_COMPRESSION_CHUNK_SIZE_H +#define TIMESCALEDB_COMPRESSION_CHUNK_SIZE_H +#include +#include + +extern TSDLLEXPORT int ts_compression_chunk_size_delete(int32 uncompressed_chunk_id); + +#endif diff --git a/src/hypertable.c b/src/hypertable.c index 84bebc0c9..838032493 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -40,6 +40,7 @@ #include "dimension.h" #include "chunk.h" #include "chunk_adaptive.h" +#include "hypertable_compression.h" #include "subspace_store.h" #include "hypertable_cache.h" @@ -565,6 +566,9 @@ hypertable_tuple_delete(TupleInfo *ti, void *data) /* Remove any dependent continuous aggs */ ts_continuous_agg_drop_hypertable_callback(hypertable_id); + /* remove any associated compression definitions */ + ts_hypertable_compression_delete_by_hypertable_id(hypertable_id); + if (!compressed_hypertable_id_isnull) { Hypertable *compressed_hypertable = ts_hypertable_get_by_id(compressed_hypertable_id); diff --git a/src/hypertable_compression.c b/src/hypertable_compression.c index 6ff440747..9ed06cf9d 100644 --- a/src/hypertable_compression.c +++ b/src/hypertable_compression.c @@ -15,33 +15,48 @@ static void hypertable_compression_fill_from_tuple(FormData_hypertable_compression *fd, TupleInfo *ti) { - HeapTuple tuple = ti->tuple; - TupleDesc desc = ti->desc; - Datum val; - bool isnull; - memcpy((void *) fd, GETSTRUCT(tuple), sizeof(FormData_hypertable_compression)); - /* copy the part that could have null values explictly */ - val = heap_getattr(tuple, Anum_hypertable_compression_segmentby_column_index, desc, &isnull); - if (isnull) + Datum values[Natts_hypertable_compression]; + bool isnulls[Natts_hypertable_compression]; + + heap_deform_tuple(ti->tuple, ti->desc, values, isnulls); + + Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_hypertable_id)]); + Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_attname)]); + Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_algo_id)]); + + fd->hypertable_id = + DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_hypertable_compression_hypertable_id)]); + memcpy(&fd->attname, + DatumGetName(values[AttrNumberGetAttrOffset(Anum_hypertable_compression_attname)]), + NAMEDATALEN); + fd->algo_id = + DatumGetInt16(values[AttrNumberGetAttrOffset(Anum_hypertable_compression_algo_id)]); + + if (isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_segmentby_column_index)]) fd->segmentby_column_index = 0; else - fd->segmentby_column_index = DatumGetInt16(val); - val = heap_getattr(tuple, Anum_hypertable_compression_orderby_column_index, desc, &isnull); - if (isnull) + fd->segmentby_column_index = DatumGetInt16( + values[AttrNumberGetAttrOffset(Anum_hypertable_compression_segmentby_column_index)]); + + if (isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_column_index)]) fd->orderby_column_index = 0; else { - fd->orderby_column_index = DatumGetInt16(val); - val = heap_getattr(tuple, Anum_hypertable_compression_orderby_asc, desc, &isnull); - fd->orderby_asc = BoolGetDatum(val); - val = heap_getattr(tuple, Anum_hypertable_compression_orderby_nullsfirst, desc, &isnull); - fd->orderby_nullsfirst = BoolGetDatum(val); + Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_asc)]); + Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_nullsfirst)]); + + fd->orderby_column_index = DatumGetInt16( + values[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_column_index)]); + fd->orderby_asc = + BoolGetDatum(values[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_asc)]); + fd->orderby_nullsfirst = BoolGetDatum( + values[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_nullsfirst)]); } } -void -hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values, - bool *nulls) +TSDLLEXPORT void +ts_hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values, + bool *nulls) { memset(nulls, 0, sizeof(bool) * Natts_hypertable_compression); values[AttrNumberGetAttrOffset(Anum_hypertable_compression_hypertable_id)] = @@ -80,8 +95,8 @@ hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Da /* returns length of list and fills passed in list with pointers * to FormData_hypertable_compression */ -List * -get_hypertablecompression_info(int32 htid) +TSDLLEXPORT List * +ts_hypertable_compression_get(int32 htid) { List *fdlist = NIL; FormData_hypertable_compression *colfd = NULL; @@ -109,8 +124,8 @@ get_hypertablecompression_info(int32 htid) return fdlist; } -bool -hypertable_compression_delete_by_hypertable_id(int32 htid) +TSDLLEXPORT bool +ts_hypertable_compression_delete_by_hypertable_id(int32 htid) { int count = 0; ScanIterator iterator = diff --git a/src/hypertable_compression.h b/src/hypertable_compression.h index ec6a76976..b1634c8c4 100644 --- a/src/hypertable_compression.h +++ b/src/hypertable_compression.h @@ -11,12 +11,11 @@ #include #include -extern TSDLLEXPORT List *get_hypertablecompression_info(int32 htid); +extern TSDLLEXPORT List *ts_hypertable_compression_get(int32 htid); extern TSDLLEXPORT void -hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values, - bool *nulls); +ts_hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values, + bool *nulls); -extern TSDLLEXPORT bool hypertable_compression_delete_by_hypertable_id(int32 htid); -extern TSDLLEXPORT bool ts_is_compression_hypertable(int32 hypertable_id); +extern TSDLLEXPORT bool ts_hypertable_compression_delete_by_hypertable_id(int32 htid); #endif diff --git a/tsl/src/compression/compress_utils.c b/tsl/src/compression/compress_utils.c index 10abe0b6a..f4e389b38 100644 --- a/tsl/src/compression/compress_utils.c +++ b/tsl/src/compression/compress_utils.c @@ -25,6 +25,7 @@ #include "scanner.h" #include "scan_iterator.h" #include "license.h" +#include "compression_chunk_size.h" #if !PG96 #include @@ -67,34 +68,6 @@ compute_chunk_size(Oid chunk_relid) return ret; } -static void -init_scan_by_uncompressed_chunk_id(ScanIterator *iterator, int32 uncompressed_chunk_id) -{ - iterator->ctx.index = - catalog_get_index(ts_catalog_get(), COMPRESSION_CHUNK_SIZE, COMPRESSION_CHUNK_SIZE_PKEY); - ts_scan_iterator_scan_key_init(iterator, - Anum_compression_chunk_size_pkey_chunk_id, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(uncompressed_chunk_id)); -} - -static int -compression_chunk_size_delete(int32 uncompressed_chunk_id) -{ - ScanIterator iterator = - ts_scan_iterator_create(COMPRESSION_CHUNK_SIZE, RowExclusiveLock, CurrentMemoryContext); - int count = 0; - - init_scan_by_uncompressed_chunk_id(&iterator, uncompressed_chunk_id); - ts_scanner_foreach(&iterator) - { - TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); - ts_catalog_delete(ti->scanrel, ti->tuple); - } - return count; -} - static void compression_chunk_size_catalog_insert(int32 src_chunk_id, ChunkSize *src_size, int32 compress_chunk_id, ChunkSize *compress_size) @@ -197,7 +170,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid) LockRelationOid(catalog_get_table_id(ts_catalog_get(), CHUNK), RowExclusiveLock); // get compression properties for hypertable - htcols_list = get_hypertablecompression_info(cxt.srcht->fd.id); + htcols_list = ts_hypertable_compression_get(cxt.srcht->fd.id); htcols_listlen = list_length(htcols_list); // create compressed chunk DDL and compress the data compress_ht_chunk = create_compress_chunk_table(cxt.compress_ht, cxt.srcht_chunk); @@ -274,7 +247,7 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_ LockRelationOid(catalog_get_table_id(ts_catalog_get(), CHUNK), RowExclusiveLock); decompress_chunk(compressed_chunk->table_id, uncompressed_chunk->table_id); - compression_chunk_size_delete(uncompressed_chunk->fd.id); + ts_compression_chunk_size_delete(uncompressed_chunk->fd.id); ts_chunk_set_compressed_chunk(uncompressed_chunk, INVALID_CHUNK_ID, true); ts_chunk_drop(compressed_chunk, DROP_RESTRICT, -1); diff --git a/tsl/src/compression/create.c b/tsl/src/compression/create.c index 9859a3a1e..2dd85b694 100644 --- a/tsl/src/compression/create.c +++ b/tsl/src/compression/create.c @@ -328,7 +328,7 @@ compresscolinfo_add_catalog_entries(CompressColInfo *compress_cols, int32 htid) { FormData_hypertable_compression *fd = &compress_cols->col_meta[i]; fd->hypertable_id = htid; - hypertable_compression_fill_tuple_values(fd, &values[0], &nulls[0]); + ts_hypertable_compression_fill_tuple_values(fd, &values[0], &nulls[0]); ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx); ts_catalog_insert_values(rel, desc, values, nulls); ts_catalog_restore_user(&sec_ctx); @@ -561,7 +561,7 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht, /* need to drop the old compressed hypertable in case the segment by columns changed (and * thus the column types of compressed hypertable need to change) */ ts_hypertable_drop(compressed, DROP_RESTRICT); - hypertable_compression_delete_by_hypertable_id(ht->fd.id); + ts_hypertable_compression_delete_by_hypertable_id(ht->fd.id); } compress_htid = create_compression_table(ownerid, &compress_cols); diff --git a/tsl/src/decompress_chunk/decompress_chunk.c b/tsl/src/decompress_chunk/decompress_chunk.c index f91dce46e..6f60704d3 100644 --- a/tsl/src/decompress_chunk/decompress_chunk.c +++ b/tsl/src/decompress_chunk/decompress_chunk.c @@ -53,7 +53,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp Index compressed_index; RelOptInfo *compressed_rel; Path *path; - List *compression_info = get_hypertablecompression_info(ht->fd.id); + List *compression_info = ts_hypertable_compression_get(ht->fd.id); /* * since we rely on parallel coordination from the scan below * this node it is probably not beneficial to have more diff --git a/tsl/src/decompress_chunk/exec.c b/tsl/src/decompress_chunk/exec.c index 6211b6344..c855b81a7 100644 --- a/tsl/src/decompress_chunk/exec.c +++ b/tsl/src/decompress_chunk/exec.c @@ -170,7 +170,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) if (eflags & EXEC_FLAG_BACKWARD) state->reverse = !state->reverse; - state->hypertable_compression_info = get_hypertablecompression_info(state->hypertable_id); + state->hypertable_compression_info = ts_hypertable_compression_get(state->hypertable_id); initialize_column_state(state); diff --git a/tsl/test/expected/compression_ddl.out b/tsl/test/expected/compression_ddl.out index 4603bed03..7f878d14d 100644 --- a/tsl/test/expected/compression_ddl.out +++ b/tsl/test/expected/compression_ddl.out @@ -84,6 +84,16 @@ WHERE hypertable.table_name like 'test1'; 26 (1 row) +--make sure there are no orphaned _timescaledb_catalog.compression_chunk_size entries (should be 0) +SELECT count(*) as orphaned_compression_chunk_size +FROM _timescaledb_catalog.compression_chunk_size size +LEFT JOIN _timescaledb_catalog.chunk chunk ON (chunk.id = size.chunk_id) +WHERE chunk.id IS NULL; + orphaned_compression_chunk_size +--------------------------------- + 0 +(1 row) + SELECT count(*) as count_chunks_compressed FROM _timescaledb_catalog.chunk chunk INNER JOIN _timescaledb_catalog.hypertable comp_hyper ON (chunk.hypertable_id = comp_hyper.id) @@ -232,6 +242,16 @@ WHERE uncomp_hyper.table_name like 'test1'; 1 (1 row) +--make sure there are no orphaned _timescaledb_catalog.compression_chunk_size entries (should be 0) +SELECT count(*) as orphaned_compression_chunk_size +FROM _timescaledb_catalog.compression_chunk_size size +LEFT JOIN _timescaledb_catalog.chunk chunk ON (chunk.id = size.chunk_id) +WHERE chunk.id IS NULL; + orphaned_compression_chunk_size +--------------------------------- + 0 +(1 row) + -- -- DROP HYPERTABLE -- @@ -262,6 +282,12 @@ SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable; 0 (1 row) +SELECT count(*) FROM _timescaledb_catalog.hypertable_compression; + count +------- + 0 +(1 row) + ROLLBACK; --create a dependent object on the compressed hypertable to test cascade behaviour CREATE VIEW dependent_1 AS SELECT * FROM :COMPRESSED_HYPER_NAME; @@ -269,12 +295,25 @@ CREATE VIEW dependent_1 AS SELECT * FROM :COMPRESSED_HYPER_NAME; DROP TABLE :UNCOMPRESSED_HYPER_NAME; ERROR: cannot drop table _timescaledb_internal._compressed_hypertable_2 because other objects depend on it \set ON_ERROR_STOP 1 +BEGIN; DROP TABLE :UNCOMPRESSED_HYPER_NAME CASCADE; NOTICE: drop cascades to 2 other objects ---verify that there are no more hypertable remaining -SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable +SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable; count ------- 0 (1 row) +ROLLBACK; +DROP VIEW dependent_1; +--create a cont agg view on the ht as well then the drop should nuke everything +--TODO put back when cont aggs work +--CREATE VIEW test1_cont_view WITH ( timescaledb.continuous, timescaledb.refresh_interval='72 hours') +--AS SELECT time_bucket('1 hour', "Time"), SUM(i) +-- FROM test1 +-- GROUP BY 1; +--REFRESH MATERIALIZED VIEW test1_cont_view; +--SELECT count(*) FROM test1_cont_view; +--DROP TABLE :UNCOMPRESSED_HYPER_NAME CASCADE; +--verify that there are no more hypertable remaining +--SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable; diff --git a/tsl/test/sql/compression_ddl.sql b/tsl/test/sql/compression_ddl.sql index 72446a56d..212b7a936 100644 --- a/tsl/test/sql/compression_ddl.sql +++ b/tsl/test/sql/compression_ddl.sql @@ -48,6 +48,12 @@ FROM _timescaledb_catalog.chunk chunk INNER JOIN _timescaledb_catalog.hypertable hypertable ON (chunk.hypertable_id = hypertable.id) WHERE hypertable.table_name like 'test1'; +--make sure there are no orphaned _timescaledb_catalog.compression_chunk_size entries (should be 0) +SELECT count(*) as orphaned_compression_chunk_size +FROM _timescaledb_catalog.compression_chunk_size size +LEFT JOIN _timescaledb_catalog.chunk chunk ON (chunk.id = size.chunk_id) +WHERE chunk.id IS NULL; + SELECT count(*) as count_chunks_compressed FROM _timescaledb_catalog.chunk chunk INNER JOIN _timescaledb_catalog.hypertable comp_hyper ON (chunk.hypertable_id = comp_hyper.id) @@ -144,6 +150,11 @@ INNER JOIN _timescaledb_catalog.hypertable comp_hyper ON (chunk.hypertable_id = INNER JOIN _timescaledb_catalog.hypertable uncomp_hyper ON (comp_hyper.id = uncomp_hyper.compressed_hypertable_id) WHERE uncomp_hyper.table_name like 'test1'; +--make sure there are no orphaned _timescaledb_catalog.compression_chunk_size entries (should be 0) +SELECT count(*) as orphaned_compression_chunk_size +FROM _timescaledb_catalog.compression_chunk_size size +LEFT JOIN _timescaledb_catalog.chunk chunk ON (chunk.id = size.chunk_id) +WHERE chunk.id IS NULL; -- -- DROP HYPERTABLE @@ -170,6 +181,7 @@ DROP TABLE :UNCOMPRESSED_HYPER_NAME; --verify that there are no more hypertable remaining SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable; +SELECT count(*) FROM _timescaledb_catalog.hypertable_compression; ROLLBACK; --create a dependent object on the compressed hypertable to test cascade behaviour @@ -179,7 +191,23 @@ CREATE VIEW dependent_1 AS SELECT * FROM :COMPRESSED_HYPER_NAME; DROP TABLE :UNCOMPRESSED_HYPER_NAME; \set ON_ERROR_STOP 1 +BEGIN; DROP TABLE :UNCOMPRESSED_HYPER_NAME CASCADE; +SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable; +ROLLBACK; +DROP VIEW dependent_1; + +--create a cont agg view on the ht as well then the drop should nuke everything +--TODO put back when cont aggs work +--CREATE VIEW test1_cont_view WITH ( timescaledb.continuous, timescaledb.refresh_interval='72 hours') +--AS SELECT time_bucket('1 hour', "Time"), SUM(i) +-- FROM test1 +-- GROUP BY 1; + +--REFRESH MATERIALIZED VIEW test1_cont_view; + +--SELECT count(*) FROM test1_cont_view; +--DROP TABLE :UNCOMPRESSED_HYPER_NAME CASCADE; --verify that there are no more hypertable remaining -SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable +--SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;