From cc51e20e87f3ce9426b88790149cd7321cc4853e Mon Sep 17 00:00:00 2001 From: Bharathy Date: Mon, 20 Mar 2023 21:59:38 +0530 Subject: [PATCH] Add support for ON CONFLICT DO UPDATE for compressed hypertables This patch fixes execution of INSERT with ON CONFLICT DO UPDATE by removing error and allowing UPDATE do happen on the given compressed hypertable. --- CHANGELOG.md | 1 + src/nodes/chunk_dispatch/chunk_dispatch.c | 8 ++ src/nodes/chunk_dispatch/chunk_insert_state.c | 9 +- src/nodes/chunk_dispatch/chunk_insert_state.h | 1 + tsl/test/expected/compression.out | 23 +++- tsl/test/expected/compression_conflicts.out | 114 ++++++++++++++++++ tsl/test/sql/compression.sql | 7 +- tsl/test/sql/compression_conflicts.sql | 79 ++++++++++++ 8 files changed, 230 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e89e51ed9..6d4c344cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ accidentally triggering the load of a previous DB version.** * #5361 Add parallel support for partialize_agg() * #5252 Improve unique constraint support on compressed hypertables * #5312 Add timeout support to ping_data_node() +* #5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables **Bugfixes** * #5396 Fix SEGMENTBY columns predicates to be pushed down diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.c b/src/nodes/chunk_dispatch/chunk_dispatch.c index d65d4c9d4..2a622ba2a 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.c +++ b/src/nodes/chunk_dispatch/chunk_dispatch.c @@ -4,6 +4,7 @@ * LICENSE-APACHE for a copy of the license. */ #include +#include #include #include #include @@ -145,7 +146,14 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, * postgres can do proper constraint checking. */ if (ts_cm_functions->decompress_batches_for_insert) + { ts_cm_functions->decompress_batches_for_insert(cis, chunk, slot); + OnConflictAction onconflict_action = + chunk_dispatch_get_on_conflict_action(dispatch); + /* mark rows visible */ + if (onconflict_action == ONCONFLICT_UPDATE) + dispatch->estate->es_output_cid = GetCurrentCommandId(true); + } else ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.c b/src/nodes/chunk_dispatch/chunk_insert_state.c index a275ef3a1..8c3d08c8c 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.c +++ b/src/nodes/chunk_dispatch/chunk_insert_state.c @@ -83,7 +83,7 @@ chunk_dispatch_get_returning_clauses(const ChunkDispatch *dispatch) #endif } -static OnConflictAction +OnConflictAction chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch) { if (!dispatch->dispatch_state) @@ -583,7 +583,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch) ALLOCSET_DEFAULT_SIZES); OnConflictAction onconflict_action = chunk_dispatch_get_on_conflict_action(dispatch); ResultRelInfo *relinfo; - bool has_compressed_chunk = (chunk->fd.compressed_chunk_id != 0); /* permissions NOT checked here; were checked at hypertable level */ if (check_enable_rls(chunk->table_id, InvalidOid, false) == RLS_ENABLED) @@ -597,12 +596,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch) CHUNK_INSERT, true); - if (has_compressed_chunk && onconflict_action == ONCONFLICT_UPDATE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg( - "INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks"))); - rel = table_open(chunk->table_id, RowExclusiveLock); MemoryContext old_mcxt = MemoryContextSwitchTo(cis_context); diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.h b/src/nodes/chunk_dispatch/chunk_insert_state.h index 57956241a..794976522 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.h +++ b/src/nodes/chunk_dispatch/chunk_insert_state.h @@ -67,4 +67,5 @@ typedef struct ChunkDispatch ChunkDispatch; extern ChunkInsertState *ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch); extern void ts_chunk_insert_state_destroy(ChunkInsertState *state); +OnConflictAction chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch); #endif /* TIMESCALEDB_CHUNK_INSERT_STATE_H */ diff --git a/tsl/test/expected/compression.out b/tsl/test/expected/compression.out index adf36649e..861ddace3 100644 --- a/tsl/test/expected/compression.out +++ b/tsl/test/expected/compression.out @@ -208,9 +208,26 @@ ERROR: cannot update/delete rows from chunk "_hyper_1_1_chunk" as it is compres insert into foo values(10, 12, 12, 12) on conflict( a, b) do update set b = excluded.b; -ERROR: INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks +SELECT * from foo ORDER BY a,b; + a | b | c | d +----+-----+----+---- + 10 | 12 | 12 | 12 + 20 | 11 | 20 | + 30 | 222 | 40 | +(3 rows) + --TEST2c Do DML directly on the chunk. -insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12); +insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12) +on conflict( a, b) +do update set b = excluded.b + 12; +SELECT * from foo ORDER BY a,b; + a | b | c | d +----+-----+----+---- + 10 | 24 | 12 | 12 + 20 | 11 | 20 | + 30 | 222 | 40 | +(3 rows) + update _timescaledb_internal._hyper_1_2_chunk set b = 12; ERROR: cannot update/delete rows from chunk "_hyper_1_2_chunk" as it is compressed @@ -229,7 +246,7 @@ ERROR: duplicate key value violates unique constraint "_hyper_1_2_chunk_foo_uni select * from _timescaledb_internal._hyper_1_2_chunk order by a; a | b | c | d ----+----+----+----- - 10 | 12 | 12 | 12 + 10 | 24 | 12 | 12 10 | 10 | 20 | 11 | 10 | 20 | 120 (3 rows) diff --git a/tsl/test/expected/compression_conflicts.out b/tsl/test/expected/compression_conflicts.out index 40f8c0da4..4e39e01f3 100644 --- a/tsl/test/expected/compression_conflicts.out +++ b/tsl/test/expected/compression_conflicts.out @@ -274,3 +274,117 @@ SELECT count(*) FROM ONLY :CHUNK; 1 (1 row) +CREATE OR REPLACE VIEW compressed_chunk_info_view AS +SELECT + h.schema_name AS hypertable_schema, + h.table_name AS hypertable_name, + c.schema_name as chunk_schema, + c.table_name as chunk_name, + c.status as chunk_status, + comp.schema_name as compressed_chunk_schema, + comp.table_name as compressed_chunk_name +FROM + _timescaledb_catalog.hypertable h JOIN + _timescaledb_catalog.chunk c ON h.id = c.hypertable_id + LEFT JOIN _timescaledb_catalog.chunk comp +ON comp.id = c.compressed_chunk_id; +CREATE TABLE compressed_ht ( + time TIMESTAMP WITH TIME ZONE NOT NULL, + sensor_id INTEGER NOT NULL, + cpu double precision null, + temperature double precision null, + name varchar(100) default 'this is a default string value' +); +CREATE UNIQUE INDEX sensor_id_time_idx on compressed_ht(time, sensor_id); +SELECT * FROM create_hypertable('compressed_ht', 'time', + chunk_time_interval => INTERVAL '2 months'); +WARNING: column type "character varying" used for "name" does not follow best practices + hypertable_id | schema_name | table_name | created +---------------+-------------+---------------+--------- + 7 | public | compressed_ht | t +(1 row) + +-- create chunk 1 +INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1'); +INSERT INTO compressed_ht VALUES ('2017-12-24 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1'); +-- create chunk 2 +INSERT INTO compressed_ht VALUES ('2017-03-28 01:10:28.192199+05:30', '2', 0.876, 4.123, 'chunk 2'); +INSERT INTO compressed_ht VALUES ('2017-03-12 01:10:28.192199+05:30', '3', 0.876, 4.123, 'chunk 2'); +-- create chunk 3 +INSERT INTO compressed_ht VALUES ('2022-01-18 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-08 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-11 01:10:28.192199+05:30', '5', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'chunk 3'); +ALTER TABLE compressed_ht SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'sensor_id' +); +SELECT COMPRESS_CHUNK(SHOW_CHUNKS('compressed_ht')); + compress_chunk +---------------------------------------- + _timescaledb_internal._hyper_7_7_chunk + _timescaledb_internal._hyper_7_8_chunk + _timescaledb_internal._hyper_7_9_chunk +(3 rows) + +-- check compression status +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + chunk_status | CHUNK_NAME +--------------+------------------ + 1 | _hyper_7_7_chunk + 1 | _hyper_7_8_chunk + 1 | _hyper_7_9_chunk +(3 rows) + +-- should report 0 row +SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE'; + count +------- + 0 +(1 row) + +INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'new insert row') + ON conflict(sensor_id, time) +DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE'; +-- should report 1 row +SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE'; + count +------- + 1 +(1 row) + +-- check that chunk 1 compression status is set to partial +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + chunk_status | CHUNK_NAME +--------------+------------------ + 9 | _hyper_7_7_chunk + 1 | _hyper_7_8_chunk + 1 | _hyper_7_9_chunk +(3 rows) + +INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'new insert row') + ON conflict(sensor_id, time) +DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE' RETURNING *; + time | sensor_id | cpu | temperature | name +-------------------------------------+-----------+-------+-------------+----------------------- + Sun Jan 23 11:40:28.192199 2022 PST | 6 | 0.876 | 4.123 | ON CONFLICT DO UPDATE +(1 row) + +-- check that chunks 1 and 3 compression status is set to partial +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + chunk_status | CHUNK_NAME +--------------+------------------ + 9 | _hyper_7_7_chunk + 1 | _hyper_7_8_chunk + 9 | _hyper_7_9_chunk +(3 rows) + diff --git a/tsl/test/sql/compression.sql b/tsl/test/sql/compression.sql index 6ade30823..310a83a47 100644 --- a/tsl/test/sql/compression.sql +++ b/tsl/test/sql/compression.sql @@ -85,9 +85,14 @@ set b = (select f1.newval from foo_join f1 left join lateral (select newval as n insert into foo values(10, 12, 12, 12) on conflict( a, b) do update set b = excluded.b; +SELECT * from foo ORDER BY a,b; --TEST2c Do DML directly on the chunk. -insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12); +insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12) +on conflict( a, b) +do update set b = excluded.b + 12; +SELECT * from foo ORDER BY a,b; + update _timescaledb_internal._hyper_1_2_chunk set b = 12; delete from _timescaledb_internal._hyper_1_2_chunk; diff --git a/tsl/test/sql/compression_conflicts.sql b/tsl/test/sql/compression_conflicts.sql index 5b7e7b431..63f757bf6 100644 --- a/tsl/test/sql/compression_conflicts.sql +++ b/tsl/test/sql/compression_conflicts.sql @@ -210,3 +210,82 @@ INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHI -- data should have move into uncompressed chunk for conflict check SELECT count(*) FROM ONLY :CHUNK; +CREATE OR REPLACE VIEW compressed_chunk_info_view AS +SELECT + h.schema_name AS hypertable_schema, + h.table_name AS hypertable_name, + c.schema_name as chunk_schema, + c.table_name as chunk_name, + c.status as chunk_status, + comp.schema_name as compressed_chunk_schema, + comp.table_name as compressed_chunk_name +FROM + _timescaledb_catalog.hypertable h JOIN + _timescaledb_catalog.chunk c ON h.id = c.hypertable_id + LEFT JOIN _timescaledb_catalog.chunk comp +ON comp.id = c.compressed_chunk_id; + +CREATE TABLE compressed_ht ( + time TIMESTAMP WITH TIME ZONE NOT NULL, + sensor_id INTEGER NOT NULL, + cpu double precision null, + temperature double precision null, + name varchar(100) default 'this is a default string value' +); +CREATE UNIQUE INDEX sensor_id_time_idx on compressed_ht(time, sensor_id); + +SELECT * FROM create_hypertable('compressed_ht', 'time', + chunk_time_interval => INTERVAL '2 months'); + +-- create chunk 1 +INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1'); +INSERT INTO compressed_ht VALUES ('2017-12-24 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1'); + +-- create chunk 2 +INSERT INTO compressed_ht VALUES ('2017-03-28 01:10:28.192199+05:30', '2', 0.876, 4.123, 'chunk 2'); +INSERT INTO compressed_ht VALUES ('2017-03-12 01:10:28.192199+05:30', '3', 0.876, 4.123, 'chunk 2'); + +-- create chunk 3 +INSERT INTO compressed_ht VALUES ('2022-01-18 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-08 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-11 01:10:28.192199+05:30', '5', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'chunk 3'); + +ALTER TABLE compressed_ht SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'sensor_id' +); + +SELECT COMPRESS_CHUNK(SHOW_CHUNKS('compressed_ht')); + +-- check compression status +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + +-- should report 0 row +SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE'; + +INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'new insert row') + ON conflict(sensor_id, time) +DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE'; + +-- should report 1 row +SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE'; + +-- check that chunk 1 compression status is set to partial +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + +INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'new insert row') + ON conflict(sensor_id, time) +DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE' RETURNING *; + +-- check that chunks 1 and 3 compression status is set to partial +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;