From 65562f02e88e814b4823be40fd27bee245b130ae Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Sun, 29 Jan 2023 21:53:50 +0100 Subject: [PATCH] Support unique constraints on compressed chunks This patch allows unique constraints on compressed chunks. When trying to INSERT into compressed chunks with unique constraints any potentially conflicting compressed batches will be decompressed to let postgres do constraint checking on the INSERT. With this patch only INSERT ON CONFLICT DO NOTHING will be supported. For decompression only segment by information is considered to determine conflicting batches. This will be enhanced in a follow-up patch to also include orderby metadata to require decompressing less batches. --- CHANGELOG.md | 2 + src/chunk.c | 6 + src/chunk.h | 1 + src/copy.c | 2 + src/cross_module_fn.h | 3 + src/indexing.c | 2 +- src/indexing.h | 2 +- src/nodes/chunk_dispatch/chunk_dispatch.c | 28 +- src/nodes/chunk_dispatch/chunk_dispatch.h | 2 +- src/nodes/chunk_dispatch/chunk_insert_state.c | 14 +- src/process_utility.c | 16 +- tsl/src/compression/compression.c | 210 ++++++++++++- tsl/src/compression/compression.h | 5 + tsl/src/compression/create.c | 10 +- tsl/src/init.c | 1 + tsl/test/expected/compression.out | 5 +- tsl/test/expected/compression_conflicts.out | 276 ++++++++++++++++++ tsl/test/expected/compression_errors.out | 40 ++- tsl/test/sql/CMakeLists.txt | 1 + tsl/test/sql/compression.sql | 2 + tsl/test/sql/compression_conflicts.sql | 212 ++++++++++++++ tsl/test/sql/compression_errors.sql | 9 +- 22 files changed, 773 insertions(+), 76 deletions(-) create mode 100644 tsl/test/expected/compression_conflicts.out create mode 100644 tsl/test/sql/compression_conflicts.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 14d93163b..4c0d19b05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ accidentally triggering the load of a previous DB version.** * #5212 Allow pushdown of reference table joins * #5312 Add timeout support to the ping_data_node() * #5361 Add parallel support for partialize_agg() +* #5252 Improve unique constraint support on compressed hypertables +* #5312 Add timeout support to ping_data_node() **Bugfixes** * #5396 Fix SEGMENTBY columns predicates to be pushed down diff --git a/src/chunk.c b/src/chunk.c index 0556de418..d8111cae3 100644 --- a/src/chunk.c +++ b/src/chunk.c @@ -4378,6 +4378,12 @@ ts_chunk_is_compressed(const Chunk *chunk) return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED); } +bool +ts_chunk_is_distributed(const Chunk *chunk) +{ + return chunk->data_nodes != NIL; +} + /* Note that only a compressed chunk can have partial flag set */ bool ts_chunk_is_partial(const Chunk *chunk) diff --git a/src/chunk.h b/src/chunk.h index b8c20cbaa..a024d8b77 100644 --- a/src/chunk.h +++ b/src/chunk.h @@ -206,6 +206,7 @@ extern TSDLLEXPORT Chunk *ts_chunk_get_compressed_chunk_parent(const Chunk *chun extern TSDLLEXPORT bool ts_chunk_is_unordered(const Chunk *chunk); extern TSDLLEXPORT bool ts_chunk_is_partial(const Chunk *chunk); extern TSDLLEXPORT bool ts_chunk_is_compressed(const Chunk *chunk); +extern TSDLLEXPORT bool ts_chunk_is_distributed(const Chunk *chunk); extern TSDLLEXPORT bool ts_chunk_validate_chunk_status_for_operation(Oid chunk_relid, int32 chunk_status, ChunkOperation cmd, diff --git a/src/copy.c b/src/copy.c index 96068f787..cc306003f 100644 --- a/src/copy.c +++ b/src/copy.c @@ -324,6 +324,7 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf ChunkInsertState *cis = ts_chunk_dispatch_get_chunk_insert_state(miinfo->ccstate->dispatch, buffer->point, + buffer->slots[0], NULL /* on chunk changed function */, NULL /* payload for on chunk changed function */); @@ -948,6 +949,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte /* Find or create the insert state matching the point */ cis = ts_chunk_dispatch_get_chunk_insert_state(dispatch, point, + myslot, on_chunk_insert_state_changed, bistate); diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 3e46783ae..d2a8d0b20 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -33,6 +33,7 @@ typedef struct JsonbParseState JsonbParseState; typedef struct Hypertable Hypertable; typedef struct Chunk Chunk; +typedef struct ChunkInsertState ChunkInsertState; typedef struct CopyChunkState CopyChunkState; typedef struct CrossModuleFunctions @@ -136,6 +137,8 @@ typedef struct CrossModuleFunctions PGFunction create_compressed_chunk; PGFunction compress_chunk; PGFunction decompress_chunk; + void (*decompress_batches_for_insert)(ChunkInsertState *state, Chunk *chunk, + TupleTableSlot *slot); /* The compression functions below are not installed in SQL as part of create extension; * They are installed and tested during testing scripts. They are exposed in cross-module * functions because they may be very useful for debugging customer problems if the sql diff --git a/src/indexing.c b/src/indexing.c index 7cdf7ee82..6a5ecd80d 100644 --- a/src/indexing.c +++ b/src/indexing.c @@ -270,7 +270,7 @@ indexing_create_and_verify_hypertable_indexes(const Hypertable *ht, bool create_ table_close(tblrel, AccessShareLock); } -bool +bool TSDLLEXPORT ts_indexing_relation_has_primary_or_unique_index(Relation htrel) { Bitmapset *key_attrs = RelationGetIndexAttrBitmap(htrel, INDEX_ATTR_BITMAP_KEY); diff --git a/src/indexing.h b/src/indexing.h index b70f08960..5ad981b6e 100644 --- a/src/indexing.h +++ b/src/indexing.h @@ -24,6 +24,6 @@ extern TSDLLEXPORT Oid ts_indexing_find_clustered_index(Oid table_relid); extern void ts_indexing_mark_as_valid(Oid index_id); extern bool ts_indexing_mark_as_invalid(Oid index_id); -extern bool ts_indexing_relation_has_primary_or_unique_index(Relation htrel); +extern bool TSDLLEXPORT ts_indexing_relation_has_primary_or_unique_index(Relation htrel); #endif /* TIMESCALEDB_INDEXING_H */ diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.c b/src/nodes/chunk_dispatch/chunk_dispatch.c index 7cce830b0..d65d4c9d4 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.c +++ b/src/nodes/chunk_dispatch/chunk_dispatch.c @@ -58,6 +58,7 @@ destroy_chunk_insert_state(void *cis) */ extern ChunkInsertState * ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, + TupleTableSlot *slot, const on_chunk_changed_func on_chunk_changed, void *data) { ChunkInsertState *cis; @@ -91,6 +92,7 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, * where the chunk already exists. */ bool found; + Assert(slot); Chunk *chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point); #if PG14_GE @@ -106,7 +108,12 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found); } else + { found = true; + } + + if (!chunk) + elog(ERROR, "no chunk found or created"); /* get the filtered list of "available" DNs for this chunk but only if it's replicated */ if (found && dispatch->hypertable->fd.replication_factor > 1) @@ -126,12 +133,26 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, list_free(chunk_data_nodes); } - if (!chunk) - elog(ERROR, "no chunk found or created"); - cis = ts_chunk_insert_state_create(chunk, dispatch); ts_subspace_store_add(dispatch->cache, chunk->cube, cis, destroy_chunk_insert_state); + if (found && ts_chunk_is_compressed(chunk) && !ts_chunk_is_distributed(chunk)) + { + /* + * If this is an INSERT into a compressed chunk with UNIQUE or + * PRIMARY KEY constraints we need to make sure any batches that could + * potentially lead to a conflict are in the decompressed chunk so + * postgres can do proper constraint checking. + */ + if (ts_cm_functions->decompress_batches_for_insert) + ts_cm_functions->decompress_batches_for_insert(cis, chunk, slot); + else + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functionality not supported under the current \"%s\" license", + ts_guc_license))); + } + MemoryContextSwitchTo(old_context); } else if (cis->rel->rd_id == dispatch->prev_cis_oid && cis == dispatch->prev_cis) @@ -308,6 +329,7 @@ chunk_dispatch_exec(CustomScanState *node) /* Find or create the insert state matching the point */ cis = ts_chunk_dispatch_get_chunk_insert_state(dispatch, point, + slot, on_chunk_insert_state_changed, state); diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.h b/src/nodes/chunk_dispatch/chunk_dispatch.h index 372390eb0..8c8d18cc4 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.h +++ b/src/nodes/chunk_dispatch/chunk_dispatch.h @@ -84,7 +84,7 @@ typedef void (*on_chunk_changed_func)(ChunkInsertState *state, void *data); extern ChunkDispatch *ts_chunk_dispatch_create(Hypertable *ht, EState *estate, int eflags); extern void ts_chunk_dispatch_destroy(ChunkDispatch *chunk_dispatch); extern ChunkInsertState * -ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *p, +ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *p, TupleTableSlot *slot, const on_chunk_changed_func on_chunk_changed, void *data); extern TSDLLEXPORT Path *ts_chunk_dispatch_path_create(PlannerInfo *root, ModifyTablePath *mtpath, diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.c b/src/nodes/chunk_dispatch/chunk_insert_state.c index e0ca621fa..a275ef3a1 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.c +++ b/src/nodes/chunk_dispatch/chunk_insert_state.c @@ -597,21 +597,13 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch) CHUNK_INSERT, true); - if (has_compressed_chunk && onconflict_action != ONCONFLICT_NONE) + if (has_compressed_chunk && onconflict_action == ONCONFLICT_UPDATE) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("insert with ON CONFLICT clause is not supported on " - "compressed chunks"))); + errmsg( + "INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks"))); rel = table_open(chunk->table_id, RowExclusiveLock); - if (has_compressed_chunk && ts_indexing_relation_has_primary_or_unique_index(rel)) - { - table_close(rel, RowExclusiveLock); - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("insert into a compressed chunk that has primary or unique constraint is " - "not supported"))); - } MemoryContext old_mcxt = MemoryContextSwitchTo(cis_context); relinfo = create_chunk_result_relation_info(dispatch, rel); diff --git a/src/process_utility.c b/src/process_utility.c index af6db74a6..4934cea5b 100644 --- a/src/process_utility.c +++ b/src/process_utility.c @@ -2679,7 +2679,7 @@ process_index_start(ProcessUtilityArgs *args) hcache = ts_hypertable_cache_pin(); ht = ts_hypertable_cache_get_entry_rv(hcache, stmt->relation); - if (NULL == ht) + if (!ht) { /* Check if the relation is a Continuous Aggregate */ cagg = ts_continuous_agg_find_by_rv(stmt->relation); @@ -2702,7 +2702,7 @@ process_index_start(ProcessUtilityArgs *args) } } - if (NULL == ht) + if (!ht) { ts_cache_release(hcache); return DDL_CONTINUE; @@ -2716,18 +2716,6 @@ process_index_start(ProcessUtilityArgs *args) /* Make the RangeVar for the underlying materialization hypertable */ stmt->relation = makeRangeVar(NameStr(ht->fd.schema_name), NameStr(ht->fd.table_name), -1); } - else if (TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(ht)) - { - /* unique indexes are not allowed on compressed hypertables*/ - if (stmt->unique || stmt->primary || stmt->isconstraint) - { - ts_cache_release(hcache); - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("operation not supported on hypertables that have compression " - "enabled"))); - } - } ts_hypertable_permissions_check_by_id(ht->fd.id); add_hypertable_to_process_args(args, ht); diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 88f136e20..afe6bd61b 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -22,10 +22,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -38,17 +40,18 @@ #include "array.h" #include "chunk.h" +#include "create.h" +#include "custom_type_cache.h" #include "debug_point.h" #include "deltadelta.h" #include "dictionary.h" #include "gorilla.h" -#include "ts_catalog/compression_chunk_size.h" -#include "create.h" -#include "custom_type_cache.h" -#include "segment_meta.h" -#include "ts_catalog/hypertable_compression.h" -#include "ts_catalog/catalog.h" #include "guc.h" +#include "nodes/chunk_dispatch/chunk_insert_state.h" +#include "indexing.h" +#include "segment_meta.h" +#include "ts_catalog/compression_chunk_size.h" +#include "ts_catalog/hypertable_compression.h" #define MAX_ROWS_PER_COMPRESSION 1000 /* gap in sequence id between rows, potential for adding rows in gap later */ @@ -1403,8 +1406,11 @@ row_compressor_finish(RowCompressor *row_compressor) static SegmentInfo * segment_info_new(Form_pg_attribute column_attr) { - Oid eq_fn_oid = - lookup_type_cache(column_attr->atttypid, TYPECACHE_EQ_OPR_FINFO)->eq_opr_finfo.fn_oid; + TypeCacheEntry *tce = lookup_type_cache(column_attr->atttypid, TYPECACHE_EQ_OPR_FINFO); + + if (!OidIsValid(tce->eq_opr_finfo.fn_oid)) + elog(ERROR, "no equality function for column \"%s\"", NameStr(column_attr->attname)); + SegmentInfo *segment_info = palloc(sizeof(*segment_info)); *segment_info = (SegmentInfo){ @@ -1412,9 +1418,7 @@ segment_info_new(Form_pg_attribute column_attr) .typ_by_val = column_attr->attbyval, }; - if (!OidIsValid(eq_fn_oid)) - elog(ERROR, "no equality function for column \"%s\"", NameStr(column_attr->attname)); - fmgr_info_cxt(eq_fn_oid, &segment_info->eq_fn, CurrentMemoryContext); + fmgr_info_cxt(tce->eq_opr_finfo.fn_oid, &segment_info->eq_fn, CurrentMemoryContext); segment_info->eq_fcinfo = HEAP_FCINFO(2); segment_info->collation = column_attr->attcollation; @@ -1500,6 +1504,7 @@ typedef struct RowDecompressor int16 num_compressed_columns; TupleDesc in_desc; + Relation in_rel; TupleDesc out_desc; Relation out_rel; @@ -1543,6 +1548,7 @@ build_decompressor(Relation in_rel, Relation out_rel) .num_compressed_columns = in_desc->natts, .in_desc = in_desc, + .in_rel = in_rel, .out_desc = out_desc, .out_rel = out_rel, @@ -1590,7 +1596,7 @@ decompress_chunk(Oid in_table, Oid out_table) * we are compressing, so we only take an ExclusiveLock instead of AccessExclusive. */ Relation out_rel = table_open(out_table, AccessExclusiveLock); - Relation in_rel = relation_open(in_table, ExclusiveLock); + Relation in_rel = table_open(in_table, ExclusiveLock); RowDecompressor decompressor = build_decompressor(in_rel, out_rel); @@ -2029,3 +2035,183 @@ update_compressed_chunk_relstats(Oid uncompressed_relid, Oid compressed_relid) CommandCounterIncrement(); } } + +/* + * Build scankeys for decompression of specific batches. key_columns references the + * columns of the uncompressed chunk. + */ +static ScanKeyData * +build_scankeys(int32 hypertable_id, RowDecompressor decompressor, Bitmapset *key_columns, + Bitmapset **null_columns, TupleTableSlot *slot, int *num_scankeys) +{ + int key_index = 0; + ScanKeyData *scankeys = NULL; + + if (!bms_is_empty(key_columns)) + { + scankeys = palloc0(bms_num_members(key_columns) * 2 * sizeof(ScanKeyData)); + int i = -1; + while ((i = bms_next_member(key_columns, i)) > 0) + { + AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber; + char *attname = get_attname(decompressor.out_rel->rd_id, attno, false); + AttrNumber cmp_attno = get_attnum(decompressor.in_rel->rd_id, attname); + FormData_hypertable_compression *fd = + ts_hypertable_compression_get_by_pkey(hypertable_id, attname); + + /* + * There are 3 possible scenarios we have to consider + * when dealing with columns which are part of unique + * constraints. + * + * 1. Column is segmentby-Column + * In this case we can add a single ScanKey with an + * equality check for the value. + * 2. Column is orderby-Column + * In this we can add 2 ScanKeys with range constraints + * utilizing batch metadata. + * 3. Column is neither segmentby nor orderby + * In this case we cannot utilize this column for + * batch filtering as the values are compressed and + * we have no metadata. + */ + + if (COMPRESSIONCOL_IS_SEGMENT_BY(fd)) + { + bool isnull; + Datum value = slot_getattr(slot, attno, &isnull); + Oid atttypid = decompressor.out_desc->attrs[attno - 1].atttypid; + + TypeCacheEntry *tce = lookup_type_cache(atttypid, TYPECACHE_EQ_OPR_FINFO); + + /* Segmentby column type should match in compressed and uncompressed chunk */ + Assert(decompressor.out_desc->attrs[AttrNumberGetAttrOffset(attno)].atttypid == + decompressor.in_desc->attrs[AttrNumberGetAttrOffset(cmp_attno)].atttypid); + + if (!OidIsValid(tce->eq_opr_finfo.fn_oid)) + elog(ERROR, "no equality function for type \"%s\"", format_type_be(atttypid)); + + /* + * In PG versions <= 14 NULL values are always considered distinct + * from other NULL values and therefore NULLABLE multi-columnn + * unique constraints might expose unexpected behaviour in the + * presence of NULL values. + * Since SK_SEARCHNULL is not supported by heap scans we cannot + * build a ScanKey for NOT NULL and instead have to do those + * checks manually. + */ + if (isnull) + { + *null_columns = bms_add_member(*null_columns, cmp_attno); + } + else + { + ScanKeyEntryInitialize(&scankeys[key_index], + 0, /* flags */ + cmp_attno, + BTEqualStrategyNumber, + InvalidOid, /* No strategy subtype. */ + decompressor.out_desc + ->attrs[AttrNumberGetAttrOffset(attno)] + .attcollation, + tce->eq_opr_finfo.fn_oid, + value); + key_index++; + } + } + } + } + + *num_scankeys = key_index; + return scankeys; +} + +void +decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlot *slot) +{ + Relation out_rel = cis->rel; + + if (!ts_indexing_relation_has_primary_or_unique_index(out_rel)) + { + /* + * If there are no unique constraints there is nothing to do here. + */ + return; + } + + Chunk *comp = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true); + Relation in_rel = relation_open(comp->table_id, RowExclusiveLock); + + RowDecompressor decompressor = build_decompressor(in_rel, out_rel); + Bitmapset *key_columns = RelationGetIndexAttrBitmap(out_rel, INDEX_ATTR_BITMAP_KEY); + Bitmapset *null_columns = NULL; + + int num_scankeys; + ScanKeyData *scankeys = build_scankeys(chunk->fd.hypertable_id, + decompressor, + key_columns, + &null_columns, + slot, + &num_scankeys); + + bms_free(key_columns); + + TableScanDesc heapScan = + table_beginscan(in_rel, GetTransactionSnapshot(), num_scankeys, scankeys); + + for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); + compressed_tuple != NULL; + compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) + { + Assert(HeapTupleIsValid(compressed_tuple)); + bool valid = true; + + /* + * Since the heap scan API does not support SK_SEARCHNULL we have to check + * for NULL values manually when those are part of the constraints. + */ + for (int attno = bms_next_member(null_columns, -1); attno >= 0; + attno = bms_next_member(null_columns, attno)) + { + if (!heap_attisnull(compressed_tuple, attno, decompressor.in_desc)) + { + valid = false; + break; + } + } + + /* + * Skip if NULL check failed. + */ + if (!valid) + continue; + + heap_deform_tuple(compressed_tuple, + decompressor.in_desc, + decompressor.compressed_datums, + decompressor.compressed_is_nulls); + + row_decompressor_decompress_row(&decompressor); + + TM_FailureData tmfd; + TM_Result result pg_attribute_unused(); + result = table_tuple_delete(in_rel, + &compressed_tuple->t_self, + decompressor.mycid, + GetTransactionSnapshot(), + InvalidSnapshot, + true, + &tmfd, + false); + Assert(result == TM_Ok); + } + + heap_endscan(heapScan); + + ts_catalog_close_indexes(decompressor.indexstate); + FreeBulkInsertState(decompressor.bistate); + + CommandCounterIncrement(); + + table_close(in_rel, NoLock); +} diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index 4c8a8a2ea..cdc9543b5 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -153,4 +153,9 @@ extern DecompressionIterator *(*tsl_get_decompression_iterator_init( extern void update_compressed_chunk_relstats(Oid uncompressed_relid, Oid compressed_relid); extern void merge_chunk_relstats(Oid merged_relid, Oid compressed_relid); +typedef struct Chunk Chunk; +typedef struct ChunkInsertState ChunkInsertState; +extern void decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, + TupleTableSlot *slot); + #endif diff --git a/tsl/src/compression/create.c b/tsl/src/compression/create.c index 5e12e5e50..c061b683c 100644 --- a/tsl/src/compression/create.c +++ b/tsl/src/compression/create.c @@ -851,13 +851,9 @@ validate_existing_constraints(Hypertable *ht, CompressColInfo *colinfo) } /* is colno a segment-by or order_by column */ else if (col_def->segmentby_column_index < 1 && col_def->orderby_column_index < 1) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("column \"%s\" must be used for segmenting or ordering", - NameStr(col_def->attname)), - errdetail("The constraint \"%s\" cannot be enforced with" - " the given compression configuration.", - NameStr(form->conname)))); + ereport(WARNING, + (errmsg("column \"%s\" should be used for segmenting or ordering", + NameStr(col_def->attname)))); } if (form->contype == CONSTRAINT_FOREIGN) diff --git a/tsl/src/init.c b/tsl/src/init.c index 3fbe69393..012c10779 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -183,6 +183,7 @@ CrossModuleFunctions tsl_cm_functions = { .process_rename_cmd = tsl_process_rename_cmd, .compress_chunk = tsl_compress_chunk, .decompress_chunk = tsl_decompress_chunk, + .decompress_batches_for_insert = decompress_batches_for_insert, .data_node_add = data_node_add, .data_node_delete = data_node_delete, diff --git a/tsl/test/expected/compression.out b/tsl/test/expected/compression.out index e2feb0d8c..adf36649e 100644 --- a/tsl/test/expected/compression.out +++ b/tsl/test/expected/compression.out @@ -163,8 +163,9 @@ where ch1.compressed_chunk_id = ch2.id; select compress_chunk( '_timescaledb_internal._hyper_1_2_chunk'); ERROR: chunk "_hyper_1_2_chunk" is already compressed --TEST2a try DML on a compressed chunk +BEGIN; insert into foo values( 11 , 10 , 20, 120); -ERROR: insert into a compressed chunk that has primary or unique constraint is not supported +ROLLBACK; update foo set b =20 where a = 10; ERROR: cannot update/delete rows from chunk "_hyper_1_2_chunk" as it is compressed delete from foo where a = 10; @@ -207,7 +208,7 @@ ERROR: cannot update/delete rows from chunk "_hyper_1_1_chunk" as it is compres insert into foo values(10, 12, 12, 12) on conflict( a, b) do update set b = excluded.b; -ERROR: insert with ON CONFLICT clause is not supported on compressed chunks +ERROR: INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks --TEST2c Do DML directly on the chunk. insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12); update _timescaledb_internal._hyper_1_2_chunk diff --git a/tsl/test/expected/compression_conflicts.out b/tsl/test/expected/compression_conflicts.out new file mode 100644 index 000000000..40f8c0da4 --- /dev/null +++ b/tsl/test/expected/compression_conflicts.out @@ -0,0 +1,276 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- test conflict handling on compressed hypertables with unique constraints +-- test 1: single column primary key +CREATE TABLE comp_conflicts_1(time timestamptz, device text, value float, PRIMARY KEY(time)); +SELECT table_name FROM create_hypertable('comp_conflicts_1','time'); + table_name +------------------ + comp_conflicts_1 +(1 row) + +ALTER TABLE comp_conflicts_1 SET (timescaledb.compress); +-- implicitly create chunk +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1); +-- sanity check behaviour without compression +-- should fail due to multiple entries with same time value +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1); +ERROR: duplicate key value violates unique constraint "1_1_comp_conflicts_1_pkey" +INSERT INTO comp_conflicts_1 VALUES +('2020-01-01','d1',0.1), +('2020-01-01','d2',0.2), +('2020-01-01','d3',0.3); +ERROR: duplicate key value violates unique constraint "1_1_comp_conflicts_1_pkey" +\set ON_ERROR_STOP 1 +-- should succeed since there are no conflicts in the values +BEGIN; +INSERT INTO comp_conflicts_1 VALUES +('2020-01-01 0:00:01','d1',0.1), +('2020-01-01 0:00:02','d2',0.2), +('2020-01-01 0:00:03','d3',0.3); +ROLLBACK; +SELECT compress_chunk(c) AS "CHUNK" FROM show_chunks('comp_conflicts_1') c +\gset +-- after compression no data should be in uncompressed chunk +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 0 +(1 row) + +-- repeat tests on an actual compressed chunk +-- should fail due to multiple entries with same time value +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1); +ERROR: duplicate key value violates unique constraint "1_1_comp_conflicts_1_pkey" +INSERT INTO comp_conflicts_1 VALUES +('2020-01-01','d1',0.1), +('2020-01-01','d2',0.2), +('2020-01-01','d3',0.3); +ERROR: duplicate key value violates unique constraint "1_1_comp_conflicts_1_pkey" +\set ON_ERROR_STOP 1 +-- no data should be in uncompressed chunk since the inserts failed and their transaction rolled back +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 0 +(1 row) + +-- should succeed since there are no conflicts in the values +BEGIN; + INSERT INTO comp_conflicts_1 VALUES + ('2020-01-01 0:00:01','d1',0.1), + ('2020-01-01 0:00:02','d2',0.2), + ('2020-01-01 0:00:03','d3',0.3); + -- data should have move into uncompressed chunk for conflict check + SELECT count(*) FROM ONLY :CHUNK; + count +------- + 4 +(1 row) + +ROLLBACK; +-- no data should be in uncompressed chunk since we did rollback +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 0 +(1 row) + +-- should fail since it conflicts with existing row +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1); +ERROR: duplicate key value violates unique constraint "1_1_comp_conflicts_1_pkey" +\set ON_ERROR_STOP 1 +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHING; +-- data should have move into uncompressed chunk for conflict check +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 1 +(1 row) + +-- test 2: multi-column unique without segmentby +CREATE TABLE comp_conflicts_2(time timestamptz NOT NULL, device text, value float, UNIQUE(time, device)); +SELECT table_name FROM create_hypertable('comp_conflicts_2','time'); + table_name +------------------ + comp_conflicts_2 +(1 row) + +ALTER TABLE comp_conflicts_2 SET (timescaledb.compress); +WARNING: column "device" should be used for segmenting or ordering +-- implicitly create chunk +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d1',0.1); +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d2',0.2); +SELECT compress_chunk(c) AS "CHUNK" FROM show_chunks('comp_conflicts_2') c +\gset +-- after compression no data should be in uncompressed chunk +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 0 +(1 row) + +-- should fail due to multiple entries with same time, device value +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d1',0.1); +ERROR: duplicate key value violates unique constraint "3_2_comp_conflicts_2_time_device_key" +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d2',0.2); +ERROR: duplicate key value violates unique constraint "3_2_comp_conflicts_2_time_device_key" +INSERT INTO comp_conflicts_2 VALUES +('2020-01-01','d1',0.1), +('2020-01-01','d2',0.2), +('2020-01-01','d3',0.3); +ERROR: duplicate key value violates unique constraint "3_2_comp_conflicts_2_time_device_key" +\set ON_ERROR_STOP 1 +-- no data should be in uncompressed chunk since the inserts failed and their transaction rolled back +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 0 +(1 row) + +-- should succeed since there are no conflicts in the values +BEGIN; + INSERT INTO comp_conflicts_2 VALUES + ('2020-01-01 0:00:01','d1',0.1), + ('2020-01-01 0:00:01','d2',0.2), + ('2020-01-01 0:00:01','d3',0.3); + -- data should have move into uncompressed chunk for conflict check + SELECT count(*) FROM ONLY :CHUNK; + count +------- + 5 +(1 row) + +ROLLBACK; +-- no data should be in uncompressed chunk since we did rollback +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 0 +(1 row) + +-- should fail since it conflicts with existing row +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d1',0.1); +ERROR: duplicate key value violates unique constraint "3_2_comp_conflicts_2_time_device_key" +\set ON_ERROR_STOP 1 +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHING; +-- data should have move into uncompressed chunk for conflict check +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 2 +(1 row) + +-- test 3: multi-column primary key with segmentby +CREATE TABLE comp_conflicts_3(time timestamptz NOT NULL, device text, value float, UNIQUE(time, device)); +SELECT table_name FROM create_hypertable('comp_conflicts_3','time'); + table_name +------------------ + comp_conflicts_3 +(1 row) + +ALTER TABLE comp_conflicts_3 SET (timescaledb.compress,timescaledb.compress_segmentby='device'); +-- implicitly create chunk +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1); +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d2',0.2); +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01',NULL,0.3); +SELECT compress_chunk(c) AS "CHUNK" FROM show_chunks('comp_conflicts_3') c +\gset +-- after compression no data should be in uncompressed chunk +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 0 +(1 row) + +-- should fail due to multiple entries with same time, device value +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1); +ERROR: duplicate key value violates unique constraint "5_3_comp_conflicts_3_time_device_key" +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d2',0.2); +ERROR: duplicate key value violates unique constraint "5_3_comp_conflicts_3_time_device_key" +INSERT INTO comp_conflicts_3 VALUES +('2020-01-01','d1',0.1), +('2020-01-01','d2',0.2), +('2020-01-01','d3',0.3); +ERROR: duplicate key value violates unique constraint "5_3_comp_conflicts_3_time_device_key" +\set ON_ERROR_STOP 1 +-- no data should be in uncompressed chunk since the inserts failed and their transaction rolled back +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 0 +(1 row) + +-- NULL is considered distinct from other NULL so even though the next INSERT looks +-- like a conflict it is not a constraint violation (PG15 makes NULL behaviour configurable) +BEGIN; + INSERT INTO comp_conflicts_3 VALUES ('2020-01-01',NULL,0.3); + -- data for 1 segment (count = 1 value + 1 inserted) should be present in uncompressed chunk + SELECT count(*) FROM ONLY :CHUNK; + count +------- + 2 +(1 row) + +ROLLBACK; +-- should succeed since there are no conflicts in the values +BEGIN; + INSERT INTO comp_conflicts_3 VALUES ('2020-01-01 0:00:01','d1',0.1); + -- data for 1 segment (count = 1 value + 1 inserted) should have move into uncompressed chunk for conflict check + SELECT count(*) FROM ONLY :CHUNK; + count +------- + 2 +(1 row) + +ROLLBACK; +BEGIN; + INSERT INTO comp_conflicts_3 VALUES + ('2020-01-01 0:00:01','d1',0.1), + ('2020-01-01 0:00:01','d2',0.2), + ('2020-01-01 0:00:01','d3',0.3); + -- data for 2 segment (count = 2 value + 2 inserted) should have move into uncompressed chunk for conflict check + SELECT count(*) FROM ONLY :CHUNK; + count +------- + 4 +(1 row) + +ROLLBACK; +BEGIN; + INSERT INTO comp_conflicts_3 VALUES ('2020-01-01 0:00:01','d3',0.2); + -- count = 1 since no data should have move into uncompressed chunk for conflict check since d3 is new segment + SELECT count(*) FROM ONLY :CHUNK; + count +------- + 1 +(1 row) + +ROLLBACK; +-- no data should be in uncompressed chunk since we did rollback +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 0 +(1 row) + +-- should fail since it conflicts with existing row +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1); +ERROR: duplicate key value violates unique constraint "5_3_comp_conflicts_3_time_device_key" +\set ON_ERROR_STOP 1 +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHING; +-- data should have move into uncompressed chunk for conflict check +SELECT count(*) FROM ONLY :CHUNK; + count +------- + 1 +(1 row) + diff --git a/tsl/test/expected/compression_errors.out b/tsl/test/expected/compression_errors.out index 6319655e9..ce6006053 100644 --- a/tsl/test/expected/compression_errors.out +++ b/tsl/test/expected/compression_errors.out @@ -212,10 +212,6 @@ ALTER TABLE foo ADD CONSTRAINT chk UNIQUE(b); ERROR: operation not supported on hypertables that have compression enabled ALTER TABLE foo DROP CONSTRAINT chk_existing; ERROR: operation not supported on hypertables that have compression enabled ---can add index , but not unique index -CREATE UNIQUE INDEX foo_idx ON foo ( a, c ); -ERROR: operation not supported on hypertables that have compression enabled -CREATE INDEX foo_idx ON foo ( a, c ); --note that the time column "a" should not be added to the end of the order by list again (should appear first) select hc.* from _timescaledb_catalog.hypertable_compression hc inner join _timescaledb_catalog.hypertable h on (h.id = hc.hypertable_id) where h.table_name = 'foo' order by attname; hypertable_id | attname | compression_algorithm_id | segmentby_column_index | orderby_column_index | orderby_asc | orderby_nullsfirst @@ -317,13 +313,15 @@ select table_name from create_hypertable('table_constr', 'timec', chunk_time_int table_constr (1 row) +BEGIN; ALTER TABLE table_constr set (timescaledb.compress, timescaledb.compress_segmentby = 'd'); -ERROR: column "device_id" must be used for segmenting or ordering -DETAIL: The constraint "table_constr_pkey" cannot be enforced with the given compression configuration. +WARNING: column "device_id" should be used for segmenting or ordering +ROLLBACK; alter table table_constr add constraint table_constr_uk unique (location, timec, device_id); +BEGIN; ALTER TABLE table_constr set (timescaledb.compress, timescaledb.compress_orderby = 'timec', timescaledb.compress_segmentby = 'device_id'); -ERROR: column "location" must be used for segmenting or ordering -DETAIL: The constraint "table_constr_uk" cannot be enforced with the given compression configuration. +WARNING: column "location" should be used for segmenting or ordering +ROLLBACK; alter table table_constr add constraint table_constr_fk FOREIGN KEY(d) REFERENCES fortable(col) on delete cascade; ALTER TABLE table_constr set (timescaledb.compress, timescaledb.compress_orderby = 'timec', timescaledb.compress_segmentby = 'device_id, location'); ERROR: column "d" must be used for segmenting @@ -372,7 +370,7 @@ CREATE TABLE table_fk ( SELECT create_hypertable('table_fk', 'time'); create_hypertable ------------------------ - (21,public,table_fk,t) + (23,public,table_fk,t) (1 row) ALTER TABLE table_fk DROP COLUMN id1; @@ -428,7 +426,7 @@ WHERE ch1.hypertable_id = ht.id and ht.table_name like 'table_constr2' \gset SELECT compress_chunk(:'CHUNK_NAME'); compress_chunk ------------------------------------------ - _timescaledb_internal._hyper_23_10_chunk + _timescaledb_internal._hyper_25_10_chunk (1 row) ALTER TABLE table_constr2 set (timescaledb.compress=false); @@ -438,7 +436,7 @@ DETAIL: There are compressed chunks that prevent changing the existing compress SELECT decompress_chunk(:'CHUNK_NAME'); decompress_chunk ------------------------------------------ - _timescaledb_internal._hyper_23_10_chunk + _timescaledb_internal._hyper_25_10_chunk (1 row) ALTER TABLE table_constr2 SET (timescaledb.compress=false); @@ -450,7 +448,7 @@ NOTICE: adding not-null constraint to column "time" DETAIL: Time dimensions cannot have NULL values. create_hypertable ------------------------------ - (25,public,test_table_int,t) + (27,public,test_table_int,t) (1 row) CREATE OR REPLACE function dummy_now() returns BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 5::BIGINT'; @@ -471,7 +469,7 @@ WHERE id = :compressjob_id; SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id; config ----------------------- - {"hypertable_id": 25} + {"hypertable_id": 27} (1 row) --should fail @@ -518,7 +516,7 @@ CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 SELECT create_hypertable('metric', 'time', 'dev_id', 10); create_hypertable ---------------------- - (27,public,metric,t) + (29,public,metric,t) (1 row) ALTER TABLE metric SET ( @@ -533,7 +531,7 @@ FROM generate_series('2021-08-17 00:00:00'::timestamp, SELECT compress_chunk(show_chunks('metric')); compress_chunk ------------------------------------------ - _timescaledb_internal._hyper_27_17_chunk + _timescaledb_internal._hyper_29_17_chunk (1 row) -- column does not exist the first time @@ -559,7 +557,7 @@ WARNING: column type "timestamp without time zone" used for "time" does not fol HINT: Use datatype TIMESTAMPTZ instead. create_hypertable -------------------- - (29,public,test,t) + (31,public,test,t) (1 row) INSERT INTO test VALUES ('2001-01-01 00:00', 'home'), @@ -584,14 +582,14 @@ EXPLAIN SELECT DISTINCT 1 FROM test; ---------------------------------------------------------------------------------- Unique (cost=0.00..50.80 rows=1 width=4) -> Result (cost=0.00..50.80 rows=2040 width=4) - -> Seq Scan on _hyper_29_19_chunk (cost=0.00..30.40 rows=2040 width=0) + -> Seq Scan on _hyper_31_19_chunk (cost=0.00..30.40 rows=2040 width=0) (3 rows) --compress chunks SELECT COMPRESS_CHUNK(X) FROM SHOW_CHUNKS('test') X; compress_chunk ------------------------------------------ - _timescaledb_internal._hyper_29_19_chunk + _timescaledb_internal._hyper_31_19_chunk (1 row) --below query should pass after chunks are compressed @@ -607,8 +605,8 @@ EXPLAIN SELECT DISTINCT 1 FROM test; ------------------------------------------------------------------------------------------------------ Unique (cost=0.51..21.02 rows=1 width=4) -> Result (cost=0.51..21.02 rows=2000 width=4) - -> Custom Scan (DecompressChunk) on _hyper_29_19_chunk (cost=0.51..1.02 rows=2000 width=0) - -> Seq Scan on compress_hyper_30_20_chunk (cost=0.00..1.02 rows=2 width=4) + -> Custom Scan (DecompressChunk) on _hyper_31_19_chunk (cost=0.51..1.02 rows=2000 width=0) + -> Seq Scan on compress_hyper_32_20_chunk (cost=0.00..1.02 rows=2 width=4) (4 rows) --github issue 4398 @@ -620,7 +618,7 @@ NOTICE: adding not-null constraint to column "tm" DETAIL: Time dimensions cannot have NULL values. hypertable_id | schema_name | table_name | created ---------------+-------------+------------+--------- - 31 | public | ts_table | t + 33 | public | ts_table | t (1 row) --should report a warning diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 8fa8cad8a..5894a042e 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -13,6 +13,7 @@ set(TEST_FILES cagg_watermark.sql compressed_collation.sql compression_bgw.sql + compression_conflicts.sql compression_permissions.sql compression_qualpushdown.sql dist_param.sql diff --git a/tsl/test/sql/compression.sql b/tsl/test/sql/compression.sql index 03e349e61..6ade30823 100644 --- a/tsl/test/sql/compression.sql +++ b/tsl/test/sql/compression.sql @@ -55,7 +55,9 @@ where ch1.compressed_chunk_id = ch2.id; select compress_chunk( '_timescaledb_internal._hyper_1_2_chunk'); --TEST2a try DML on a compressed chunk +BEGIN; insert into foo values( 11 , 10 , 20, 120); +ROLLBACK; update foo set b =20 where a = 10; delete from foo where a = 10; diff --git a/tsl/test/sql/compression_conflicts.sql b/tsl/test/sql/compression_conflicts.sql new file mode 100644 index 000000000..5b7e7b431 --- /dev/null +++ b/tsl/test/sql/compression_conflicts.sql @@ -0,0 +1,212 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +-- test conflict handling on compressed hypertables with unique constraints + +-- test 1: single column primary key +CREATE TABLE comp_conflicts_1(time timestamptz, device text, value float, PRIMARY KEY(time)); + +SELECT table_name FROM create_hypertable('comp_conflicts_1','time'); +ALTER TABLE comp_conflicts_1 SET (timescaledb.compress); + +-- implicitly create chunk +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1); + +-- sanity check behaviour without compression +-- should fail due to multiple entries with same time value +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1); +INSERT INTO comp_conflicts_1 VALUES +('2020-01-01','d1',0.1), +('2020-01-01','d2',0.2), +('2020-01-01','d3',0.3); +\set ON_ERROR_STOP 1 + +-- should succeed since there are no conflicts in the values +BEGIN; +INSERT INTO comp_conflicts_1 VALUES +('2020-01-01 0:00:01','d1',0.1), +('2020-01-01 0:00:02','d2',0.2), +('2020-01-01 0:00:03','d3',0.3); +ROLLBACK; + +SELECT compress_chunk(c) AS "CHUNK" FROM show_chunks('comp_conflicts_1') c +\gset + +-- after compression no data should be in uncompressed chunk +SELECT count(*) FROM ONLY :CHUNK; + +-- repeat tests on an actual compressed chunk +-- should fail due to multiple entries with same time value +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1); +INSERT INTO comp_conflicts_1 VALUES +('2020-01-01','d1',0.1), +('2020-01-01','d2',0.2), +('2020-01-01','d3',0.3); +\set ON_ERROR_STOP 1 + +-- no data should be in uncompressed chunk since the inserts failed and their transaction rolled back +SELECT count(*) FROM ONLY :CHUNK; + +-- should succeed since there are no conflicts in the values +BEGIN; + + INSERT INTO comp_conflicts_1 VALUES + ('2020-01-01 0:00:01','d1',0.1), + ('2020-01-01 0:00:02','d2',0.2), + ('2020-01-01 0:00:03','d3',0.3); + + -- data should have move into uncompressed chunk for conflict check + SELECT count(*) FROM ONLY :CHUNK; + +ROLLBACK; + +-- no data should be in uncompressed chunk since we did rollback +SELECT count(*) FROM ONLY :CHUNK; + +-- should fail since it conflicts with existing row +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1); +\set ON_ERROR_STOP 1 + +INSERT INTO comp_conflicts_1 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHING; + +-- data should have move into uncompressed chunk for conflict check +SELECT count(*) FROM ONLY :CHUNK; + +-- test 2: multi-column unique without segmentby +CREATE TABLE comp_conflicts_2(time timestamptz NOT NULL, device text, value float, UNIQUE(time, device)); + +SELECT table_name FROM create_hypertable('comp_conflicts_2','time'); +ALTER TABLE comp_conflicts_2 SET (timescaledb.compress); + +-- implicitly create chunk +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d1',0.1); +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d2',0.2); + +SELECT compress_chunk(c) AS "CHUNK" FROM show_chunks('comp_conflicts_2') c +\gset + +-- after compression no data should be in uncompressed chunk +SELECT count(*) FROM ONLY :CHUNK; + +-- should fail due to multiple entries with same time, device value +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d1',0.1); +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d2',0.2); +INSERT INTO comp_conflicts_2 VALUES +('2020-01-01','d1',0.1), +('2020-01-01','d2',0.2), +('2020-01-01','d3',0.3); +\set ON_ERROR_STOP 1 + +-- no data should be in uncompressed chunk since the inserts failed and their transaction rolled back +SELECT count(*) FROM ONLY :CHUNK; + +-- should succeed since there are no conflicts in the values +BEGIN; + + INSERT INTO comp_conflicts_2 VALUES + ('2020-01-01 0:00:01','d1',0.1), + ('2020-01-01 0:00:01','d2',0.2), + ('2020-01-01 0:00:01','d3',0.3); + + -- data should have move into uncompressed chunk for conflict check + SELECT count(*) FROM ONLY :CHUNK; + +ROLLBACK; + +-- no data should be in uncompressed chunk since we did rollback +SELECT count(*) FROM ONLY :CHUNK; + +-- should fail since it conflicts with existing row +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d1',0.1); +\set ON_ERROR_STOP 1 + +INSERT INTO comp_conflicts_2 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHING; + +-- data should have move into uncompressed chunk for conflict check +SELECT count(*) FROM ONLY :CHUNK; + +-- test 3: multi-column primary key with segmentby +CREATE TABLE comp_conflicts_3(time timestamptz NOT NULL, device text, value float, UNIQUE(time, device)); + +SELECT table_name FROM create_hypertable('comp_conflicts_3','time'); +ALTER TABLE comp_conflicts_3 SET (timescaledb.compress,timescaledb.compress_segmentby='device'); + +-- implicitly create chunk +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1); +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d2',0.2); +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01',NULL,0.3); + +SELECT compress_chunk(c) AS "CHUNK" FROM show_chunks('comp_conflicts_3') c +\gset + +-- after compression no data should be in uncompressed chunk +SELECT count(*) FROM ONLY :CHUNK; + +-- should fail due to multiple entries with same time, device value +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1); +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d2',0.2); +INSERT INTO comp_conflicts_3 VALUES +('2020-01-01','d1',0.1), +('2020-01-01','d2',0.2), +('2020-01-01','d3',0.3); +\set ON_ERROR_STOP 1 + +-- no data should be in uncompressed chunk since the inserts failed and their transaction rolled back +SELECT count(*) FROM ONLY :CHUNK; + +-- NULL is considered distinct from other NULL so even though the next INSERT looks +-- like a conflict it is not a constraint violation (PG15 makes NULL behaviour configurable) +BEGIN; + INSERT INTO comp_conflicts_3 VALUES ('2020-01-01',NULL,0.3); + + -- data for 1 segment (count = 1 value + 1 inserted) should be present in uncompressed chunk + SELECT count(*) FROM ONLY :CHUNK; +ROLLBACK; + +-- should succeed since there are no conflicts in the values +BEGIN; + + INSERT INTO comp_conflicts_3 VALUES ('2020-01-01 0:00:01','d1',0.1); + + -- data for 1 segment (count = 1 value + 1 inserted) should have move into uncompressed chunk for conflict check + SELECT count(*) FROM ONLY :CHUNK; + +ROLLBACK; + +BEGIN; + INSERT INTO comp_conflicts_3 VALUES + ('2020-01-01 0:00:01','d1',0.1), + ('2020-01-01 0:00:01','d2',0.2), + ('2020-01-01 0:00:01','d3',0.3); + + -- data for 2 segment (count = 2 value + 2 inserted) should have move into uncompressed chunk for conflict check + SELECT count(*) FROM ONLY :CHUNK; +ROLLBACK; + +BEGIN; + INSERT INTO comp_conflicts_3 VALUES ('2020-01-01 0:00:01','d3',0.2); + + -- count = 1 since no data should have move into uncompressed chunk for conflict check since d3 is new segment + SELECT count(*) FROM ONLY :CHUNK; +ROLLBACK; + +-- no data should be in uncompressed chunk since we did rollback +SELECT count(*) FROM ONLY :CHUNK; + +-- should fail since it conflicts with existing row +\set ON_ERROR_STOP 0 +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1); +\set ON_ERROR_STOP 1 + +INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHING; + +-- data should have move into uncompressed chunk for conflict check +SELECT count(*) FROM ONLY :CHUNK; + diff --git a/tsl/test/sql/compression_errors.sql b/tsl/test/sql/compression_errors.sql index 6add7ef96..b196727d3 100644 --- a/tsl/test/sql/compression_errors.sql +++ b/tsl/test/sql/compression_errors.sql @@ -105,9 +105,6 @@ ALTER TABLE foo RESET (timescaledb.compress); ALTER TABLE foo ADD CONSTRAINT chk CHECK(b > 0); ALTER TABLE foo ADD CONSTRAINT chk UNIQUE(b); ALTER TABLE foo DROP CONSTRAINT chk_existing; ---can add index , but not unique index -CREATE UNIQUE INDEX foo_idx ON foo ( a, c ); -CREATE INDEX foo_idx ON foo ( a, c ); --note that the time column "a" should not be added to the end of the order by list again (should appear first) select hc.* from _timescaledb_catalog.hypertable_compression hc inner join _timescaledb_catalog.hypertable h on (h.id = hc.hypertable_id) where h.table_name = 'foo' order by attname; @@ -167,11 +164,17 @@ create table table_constr( device_id integer, ); select table_name from create_hypertable('table_constr', 'timec', chunk_time_interval=> 10); +BEGIN; ALTER TABLE table_constr set (timescaledb.compress, timescaledb.compress_segmentby = 'd'); +ROLLBACK; alter table table_constr add constraint table_constr_uk unique (location, timec, device_id); +BEGIN; ALTER TABLE table_constr set (timescaledb.compress, timescaledb.compress_orderby = 'timec', timescaledb.compress_segmentby = 'device_id'); +ROLLBACK; + alter table table_constr add constraint table_constr_fk FOREIGN KEY(d) REFERENCES fortable(col) on delete cascade; ALTER TABLE table_constr set (timescaledb.compress, timescaledb.compress_orderby = 'timec', timescaledb.compress_segmentby = 'device_id, location'); + --exclusion constraints not allowed alter table table_constr add constraint table_constr_exclu exclude using btree (timec with = ); ALTER TABLE table_constr set (timescaledb.compress, timescaledb.compress_orderby = 'timec', timescaledb.compress_segmentby = 'device_id, location, d');