diff --git a/CHANGELOG.md b/CHANGELOG.md index e89e51ed9..6d4c344cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ accidentally triggering the load of a previous DB version.** * #5361 Add parallel support for partialize_agg() * #5252 Improve unique constraint support on compressed hypertables * #5312 Add timeout support to ping_data_node() +* #5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables **Bugfixes** * #5396 Fix SEGMENTBY columns predicates to be pushed down diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.c b/src/nodes/chunk_dispatch/chunk_dispatch.c index d65d4c9d4..2a622ba2a 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.c +++ b/src/nodes/chunk_dispatch/chunk_dispatch.c @@ -4,6 +4,7 @@ * LICENSE-APACHE for a copy of the license. */ #include <postgres.h> +#include <access/xact.h> #include <nodes/nodes.h> #include <nodes/extensible.h> #include <nodes/makefuncs.h> @@ -145,7 +146,14 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, * postgres can do proper constraint checking. */ if (ts_cm_functions->decompress_batches_for_insert) + { ts_cm_functions->decompress_batches_for_insert(cis, chunk, slot); + OnConflictAction onconflict_action = + chunk_dispatch_get_on_conflict_action(dispatch); + /* mark rows visible */ + if (onconflict_action == ONCONFLICT_UPDATE) + dispatch->estate->es_output_cid = GetCurrentCommandId(true); + } else ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.c b/src/nodes/chunk_dispatch/chunk_insert_state.c index a275ef3a1..8c3d08c8c 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.c +++ b/src/nodes/chunk_dispatch/chunk_insert_state.c @@ -83,7 +83,7 @@ chunk_dispatch_get_returning_clauses(const ChunkDispatch *dispatch) #endif } -static OnConflictAction +OnConflictAction chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch) { if (!dispatch->dispatch_state) @@ -583,7 +583,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch) ALLOCSET_DEFAULT_SIZES); OnConflictAction onconflict_action = chunk_dispatch_get_on_conflict_action(dispatch); ResultRelInfo *relinfo; - bool has_compressed_chunk = (chunk->fd.compressed_chunk_id != 0); /* permissions NOT checked here; were checked at hypertable level */ if (check_enable_rls(chunk->table_id, InvalidOid, false) == RLS_ENABLED) @@ -597,12 +596,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch) CHUNK_INSERT, true); - if (has_compressed_chunk && onconflict_action == ONCONFLICT_UPDATE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg( - "INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks"))); - rel = table_open(chunk->table_id, RowExclusiveLock); MemoryContext old_mcxt = MemoryContextSwitchTo(cis_context); diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.h b/src/nodes/chunk_dispatch/chunk_insert_state.h index 57956241a..794976522 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.h +++ b/src/nodes/chunk_dispatch/chunk_insert_state.h @@ -67,4 +67,5 @@ typedef struct ChunkDispatch ChunkDispatch; extern ChunkInsertState *ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch); extern void ts_chunk_insert_state_destroy(ChunkInsertState *state); +OnConflictAction chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch); #endif /* TIMESCALEDB_CHUNK_INSERT_STATE_H */ diff --git a/tsl/test/expected/compression.out b/tsl/test/expected/compression.out index adf36649e..861ddace3 100644 --- a/tsl/test/expected/compression.out +++ b/tsl/test/expected/compression.out @@ -208,9 +208,26 @@ ERROR: cannot update/delete rows from chunk "_hyper_1_1_chunk" as it is compres insert into foo values(10, 12, 12, 12) on conflict( a, b) do update set b = excluded.b; -ERROR: INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks +SELECT * from foo ORDER BY a,b; + a | b | c | d +----+-----+----+---- + 10 | 12 | 12 | 12 + 20 | 11 | 20 | + 30 | 222 | 40 | +(3 rows) + --TEST2c Do DML directly on the chunk. -insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12); +insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12) +on conflict( a, b) +do update set b = excluded.b + 12; +SELECT * from foo ORDER BY a,b; + a | b | c | d +----+-----+----+---- + 10 | 24 | 12 | 12 + 20 | 11 | 20 | + 30 | 222 | 40 | +(3 rows) + update _timescaledb_internal._hyper_1_2_chunk set b = 12; ERROR: cannot update/delete rows from chunk "_hyper_1_2_chunk" as it is compressed @@ -229,7 +246,7 @@ ERROR: duplicate key value violates unique constraint "_hyper_1_2_chunk_foo_uni select * from _timescaledb_internal._hyper_1_2_chunk order by a; a | b | c | d ----+----+----+----- - 10 | 12 | 12 | 12 + 10 | 24 | 12 | 12 10 | 10 | 20 | 11 | 10 | 20 | 120 (3 rows) diff --git a/tsl/test/expected/compression_conflicts.out b/tsl/test/expected/compression_conflicts.out index 40f8c0da4..4e39e01f3 100644 --- a/tsl/test/expected/compression_conflicts.out +++ b/tsl/test/expected/compression_conflicts.out @@ -274,3 +274,117 @@ SELECT count(*) FROM ONLY :CHUNK; 1 (1 row) +CREATE OR REPLACE VIEW compressed_chunk_info_view AS +SELECT + h.schema_name AS hypertable_schema, + h.table_name AS hypertable_name, + c.schema_name as chunk_schema, + c.table_name as chunk_name, + c.status as chunk_status, + comp.schema_name as compressed_chunk_schema, + comp.table_name as compressed_chunk_name +FROM + _timescaledb_catalog.hypertable h JOIN + _timescaledb_catalog.chunk c ON h.id = c.hypertable_id + LEFT JOIN _timescaledb_catalog.chunk comp +ON comp.id = c.compressed_chunk_id; +CREATE TABLE compressed_ht ( + time TIMESTAMP WITH TIME ZONE NOT NULL, + sensor_id INTEGER NOT NULL, + cpu double precision null, + temperature double precision null, + name varchar(100) default 'this is a default string value' +); +CREATE UNIQUE INDEX sensor_id_time_idx on compressed_ht(time, sensor_id); +SELECT * FROM create_hypertable('compressed_ht', 'time', + chunk_time_interval => INTERVAL '2 months'); +WARNING: column type "character varying" used for "name" does not follow best practices + hypertable_id | schema_name | table_name | created +---------------+-------------+---------------+--------- + 7 | public | compressed_ht | t +(1 row) + +-- create chunk 1 +INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1'); +INSERT INTO compressed_ht VALUES ('2017-12-24 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1'); +-- create chunk 2 +INSERT INTO compressed_ht VALUES ('2017-03-28 01:10:28.192199+05:30', '2', 0.876, 4.123, 'chunk 2'); +INSERT INTO compressed_ht VALUES ('2017-03-12 01:10:28.192199+05:30', '3', 0.876, 4.123, 'chunk 2'); +-- create chunk 3 +INSERT INTO compressed_ht VALUES ('2022-01-18 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-08 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-11 01:10:28.192199+05:30', '5', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'chunk 3'); +ALTER TABLE compressed_ht SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'sensor_id' +); +SELECT COMPRESS_CHUNK(SHOW_CHUNKS('compressed_ht')); + compress_chunk +---------------------------------------- + _timescaledb_internal._hyper_7_7_chunk + _timescaledb_internal._hyper_7_8_chunk + _timescaledb_internal._hyper_7_9_chunk +(3 rows) + +-- check compression status +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + chunk_status | CHUNK_NAME +--------------+------------------ + 1 | _hyper_7_7_chunk + 1 | _hyper_7_8_chunk + 1 | _hyper_7_9_chunk +(3 rows) + +-- should report 0 row +SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE'; + count +------- + 0 +(1 row) + +INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'new insert row') + ON conflict(sensor_id, time) +DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE'; +-- should report 1 row +SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE'; + count +------- + 1 +(1 row) + +-- check that chunk 1 compression status is set to partial +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + chunk_status | CHUNK_NAME +--------------+------------------ + 9 | _hyper_7_7_chunk + 1 | _hyper_7_8_chunk + 1 | _hyper_7_9_chunk +(3 rows) + +INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'new insert row') + ON conflict(sensor_id, time) +DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE' RETURNING *; + time | sensor_id | cpu | temperature | name +-------------------------------------+-----------+-------+-------------+----------------------- + Sun Jan 23 11:40:28.192199 2022 PST | 6 | 0.876 | 4.123 | ON CONFLICT DO UPDATE +(1 row) + +-- check that chunks 1 and 3 compression status is set to partial +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + chunk_status | CHUNK_NAME +--------------+------------------ + 9 | _hyper_7_7_chunk + 1 | _hyper_7_8_chunk + 9 | _hyper_7_9_chunk +(3 rows) + diff --git a/tsl/test/sql/compression.sql b/tsl/test/sql/compression.sql index 6ade30823..310a83a47 100644 --- a/tsl/test/sql/compression.sql +++ b/tsl/test/sql/compression.sql @@ -85,9 +85,14 @@ set b = (select f1.newval from foo_join f1 left join lateral (select newval as n insert into foo values(10, 12, 12, 12) on conflict( a, b) do update set b = excluded.b; +SELECT * from foo ORDER BY a,b; --TEST2c Do DML directly on the chunk. -insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12); +insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12) +on conflict( a, b) +do update set b = excluded.b + 12; +SELECT * from foo ORDER BY a,b; + update _timescaledb_internal._hyper_1_2_chunk set b = 12; delete from _timescaledb_internal._hyper_1_2_chunk; diff --git a/tsl/test/sql/compression_conflicts.sql b/tsl/test/sql/compression_conflicts.sql index 5b7e7b431..63f757bf6 100644 --- a/tsl/test/sql/compression_conflicts.sql +++ b/tsl/test/sql/compression_conflicts.sql @@ -210,3 +210,82 @@ INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHI -- data should have move into uncompressed chunk for conflict check SELECT count(*) FROM ONLY :CHUNK; +CREATE OR REPLACE VIEW compressed_chunk_info_view AS +SELECT + h.schema_name AS hypertable_schema, + h.table_name AS hypertable_name, + c.schema_name as chunk_schema, + c.table_name as chunk_name, + c.status as chunk_status, + comp.schema_name as compressed_chunk_schema, + comp.table_name as compressed_chunk_name +FROM + _timescaledb_catalog.hypertable h JOIN + _timescaledb_catalog.chunk c ON h.id = c.hypertable_id + LEFT JOIN _timescaledb_catalog.chunk comp +ON comp.id = c.compressed_chunk_id; + +CREATE TABLE compressed_ht ( + time TIMESTAMP WITH TIME ZONE NOT NULL, + sensor_id INTEGER NOT NULL, + cpu double precision null, + temperature double precision null, + name varchar(100) default 'this is a default string value' +); +CREATE UNIQUE INDEX sensor_id_time_idx on compressed_ht(time, sensor_id); + +SELECT * FROM create_hypertable('compressed_ht', 'time', + chunk_time_interval => INTERVAL '2 months'); + +-- create chunk 1 +INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1'); +INSERT INTO compressed_ht VALUES ('2017-12-24 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1'); + +-- create chunk 2 +INSERT INTO compressed_ht VALUES ('2017-03-28 01:10:28.192199+05:30', '2', 0.876, 4.123, 'chunk 2'); +INSERT INTO compressed_ht VALUES ('2017-03-12 01:10:28.192199+05:30', '3', 0.876, 4.123, 'chunk 2'); + +-- create chunk 3 +INSERT INTO compressed_ht VALUES ('2022-01-18 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-08 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-11 01:10:28.192199+05:30', '5', 0.876, 4.123, 'chunk 3'); +INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'chunk 3'); + +ALTER TABLE compressed_ht SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'sensor_id' +); + +SELECT COMPRESS_CHUNK(SHOW_CHUNKS('compressed_ht')); + +-- check compression status +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + +-- should report 0 row +SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE'; + +INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'new insert row') + ON conflict(sensor_id, time) +DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE'; + +-- should report 1 row +SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE'; + +-- check that chunk 1 compression status is set to partial +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name; + +INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'new insert row') + ON conflict(sensor_id, time) +DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE' RETURNING *; + +-- check that chunks 1 and 3 compression status is set to partial +SELECT chunk_status, + chunk_name as "CHUNK_NAME" +FROM compressed_chunk_info_view +WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;