Add support for ON CONFLICT DO UPDATE for compressed hypertables

This patch fixes execution of INSERT with ON CONFLICT DO UPDATE by
removing error and allowing UPDATE do happen on the given compressed
hypertable.
This commit is contained in:
Bharathy 2023-03-20 21:59:38 +05:30
parent 8cccc375fb
commit cc51e20e87
8 changed files with 230 additions and 12 deletions

View File

@ -12,6 +12,7 @@ accidentally triggering the load of a previous DB version.**
* #5361 Add parallel support for partialize_agg() * #5361 Add parallel support for partialize_agg()
* #5252 Improve unique constraint support on compressed hypertables * #5252 Improve unique constraint support on compressed hypertables
* #5312 Add timeout support to ping_data_node() * #5312 Add timeout support to ping_data_node()
* #5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables
**Bugfixes** **Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down * #5396 Fix SEGMENTBY columns predicates to be pushed down

View File

@ -4,6 +4,7 @@
* LICENSE-APACHE for a copy of the license. * LICENSE-APACHE for a copy of the license.
*/ */
#include <postgres.h> #include <postgres.h>
#include <access/xact.h>
#include <nodes/nodes.h> #include <nodes/nodes.h>
#include <nodes/extensible.h> #include <nodes/extensible.h>
#include <nodes/makefuncs.h> #include <nodes/makefuncs.h>
@ -145,7 +146,14 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
* postgres can do proper constraint checking. * postgres can do proper constraint checking.
*/ */
if (ts_cm_functions->decompress_batches_for_insert) if (ts_cm_functions->decompress_batches_for_insert)
{
ts_cm_functions->decompress_batches_for_insert(cis, chunk, slot); ts_cm_functions->decompress_batches_for_insert(cis, chunk, slot);
OnConflictAction onconflict_action =
chunk_dispatch_get_on_conflict_action(dispatch);
/* mark rows visible */
if (onconflict_action == ONCONFLICT_UPDATE)
dispatch->estate->es_output_cid = GetCurrentCommandId(true);
}
else else
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

View File

@ -83,7 +83,7 @@ chunk_dispatch_get_returning_clauses(const ChunkDispatch *dispatch)
#endif #endif
} }
static OnConflictAction OnConflictAction
chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch) chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch)
{ {
if (!dispatch->dispatch_state) if (!dispatch->dispatch_state)
@ -583,7 +583,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
OnConflictAction onconflict_action = chunk_dispatch_get_on_conflict_action(dispatch); OnConflictAction onconflict_action = chunk_dispatch_get_on_conflict_action(dispatch);
ResultRelInfo *relinfo; ResultRelInfo *relinfo;
bool has_compressed_chunk = (chunk->fd.compressed_chunk_id != 0);
/* permissions NOT checked here; were checked at hypertable level */ /* permissions NOT checked here; were checked at hypertable level */
if (check_enable_rls(chunk->table_id, InvalidOid, false) == RLS_ENABLED) if (check_enable_rls(chunk->table_id, InvalidOid, false) == RLS_ENABLED)
@ -597,12 +596,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
CHUNK_INSERT, CHUNK_INSERT,
true); true);
if (has_compressed_chunk && onconflict_action == ONCONFLICT_UPDATE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks")));
rel = table_open(chunk->table_id, RowExclusiveLock); rel = table_open(chunk->table_id, RowExclusiveLock);
MemoryContext old_mcxt = MemoryContextSwitchTo(cis_context); MemoryContext old_mcxt = MemoryContextSwitchTo(cis_context);

View File

@ -67,4 +67,5 @@ typedef struct ChunkDispatch ChunkDispatch;
extern ChunkInsertState *ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch); extern ChunkInsertState *ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch);
extern void ts_chunk_insert_state_destroy(ChunkInsertState *state); extern void ts_chunk_insert_state_destroy(ChunkInsertState *state);
OnConflictAction chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch);
#endif /* TIMESCALEDB_CHUNK_INSERT_STATE_H */ #endif /* TIMESCALEDB_CHUNK_INSERT_STATE_H */

View File

@ -208,9 +208,26 @@ ERROR: cannot update/delete rows from chunk "_hyper_1_1_chunk" as it is compres
insert into foo values(10, 12, 12, 12) insert into foo values(10, 12, 12, 12)
on conflict( a, b) on conflict( a, b)
do update set b = excluded.b; do update set b = excluded.b;
ERROR: INSERT with ON CONFLICT DO UPDATE is not supported on compressed chunks SELECT * from foo ORDER BY a,b;
a | b | c | d
----+-----+----+----
10 | 12 | 12 | 12
20 | 11 | 20 |
30 | 222 | 40 |
(3 rows)
--TEST2c Do DML directly on the chunk. --TEST2c Do DML directly on the chunk.
insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12); insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12)
on conflict( a, b)
do update set b = excluded.b + 12;
SELECT * from foo ORDER BY a,b;
a | b | c | d
----+-----+----+----
10 | 24 | 12 | 12
20 | 11 | 20 |
30 | 222 | 40 |
(3 rows)
update _timescaledb_internal._hyper_1_2_chunk update _timescaledb_internal._hyper_1_2_chunk
set b = 12; set b = 12;
ERROR: cannot update/delete rows from chunk "_hyper_1_2_chunk" as it is compressed ERROR: cannot update/delete rows from chunk "_hyper_1_2_chunk" as it is compressed
@ -229,7 +246,7 @@ ERROR: duplicate key value violates unique constraint "_hyper_1_2_chunk_foo_uni
select * from _timescaledb_internal._hyper_1_2_chunk order by a; select * from _timescaledb_internal._hyper_1_2_chunk order by a;
a | b | c | d a | b | c | d
----+----+----+----- ----+----+----+-----
10 | 12 | 12 | 12 10 | 24 | 12 | 12
10 | 10 | 20 | 10 | 10 | 20 |
11 | 10 | 20 | 120 11 | 10 | 20 | 120
(3 rows) (3 rows)

View File

@ -274,3 +274,117 @@ SELECT count(*) FROM ONLY :CHUNK;
1 1
(1 row) (1 row)
CREATE OR REPLACE VIEW compressed_chunk_info_view AS
SELECT
h.schema_name AS hypertable_schema,
h.table_name AS hypertable_name,
c.schema_name as chunk_schema,
c.table_name as chunk_name,
c.status as chunk_status,
comp.schema_name as compressed_chunk_schema,
comp.table_name as compressed_chunk_name
FROM
_timescaledb_catalog.hypertable h JOIN
_timescaledb_catalog.chunk c ON h.id = c.hypertable_id
LEFT JOIN _timescaledb_catalog.chunk comp
ON comp.id = c.compressed_chunk_id;
CREATE TABLE compressed_ht (
time TIMESTAMP WITH TIME ZONE NOT NULL,
sensor_id INTEGER NOT NULL,
cpu double precision null,
temperature double precision null,
name varchar(100) default 'this is a default string value'
);
CREATE UNIQUE INDEX sensor_id_time_idx on compressed_ht(time, sensor_id);
SELECT * FROM create_hypertable('compressed_ht', 'time',
chunk_time_interval => INTERVAL '2 months');
WARNING: column type "character varying" used for "name" does not follow best practices
hypertable_id | schema_name | table_name | created
---------------+-------------+---------------+---------
7 | public | compressed_ht | t
(1 row)
-- create chunk 1
INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1');
INSERT INTO compressed_ht VALUES ('2017-12-24 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1');
-- create chunk 2
INSERT INTO compressed_ht VALUES ('2017-03-28 01:10:28.192199+05:30', '2', 0.876, 4.123, 'chunk 2');
INSERT INTO compressed_ht VALUES ('2017-03-12 01:10:28.192199+05:30', '3', 0.876, 4.123, 'chunk 2');
-- create chunk 3
INSERT INTO compressed_ht VALUES ('2022-01-18 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-08 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-11 01:10:28.192199+05:30', '5', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'chunk 3');
ALTER TABLE compressed_ht SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id'
);
SELECT COMPRESS_CHUNK(SHOW_CHUNKS('compressed_ht'));
compress_chunk
----------------------------------------
_timescaledb_internal._hyper_7_7_chunk
_timescaledb_internal._hyper_7_8_chunk
_timescaledb_internal._hyper_7_9_chunk
(3 rows)
-- check compression status
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
1 | _hyper_7_7_chunk
1 | _hyper_7_8_chunk
1 | _hyper_7_9_chunk
(3 rows)
-- should report 0 row
SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE';
count
-------
0
(1 row)
INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'new insert row')
ON conflict(sensor_id, time)
DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE';
-- should report 1 row
SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE';
count
-------
1
(1 row)
-- check that chunk 1 compression status is set to partial
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
9 | _hyper_7_7_chunk
1 | _hyper_7_8_chunk
1 | _hyper_7_9_chunk
(3 rows)
INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'new insert row')
ON conflict(sensor_id, time)
DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE' RETURNING *;
time | sensor_id | cpu | temperature | name
-------------------------------------+-----------+-------+-------------+-----------------------
Sun Jan 23 11:40:28.192199 2022 PST | 6 | 0.876 | 4.123 | ON CONFLICT DO UPDATE
(1 row)
-- check that chunks 1 and 3 compression status is set to partial
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+------------------
9 | _hyper_7_7_chunk
1 | _hyper_7_8_chunk
9 | _hyper_7_9_chunk
(3 rows)

View File

@ -85,9 +85,14 @@ set b = (select f1.newval from foo_join f1 left join lateral (select newval as n
insert into foo values(10, 12, 12, 12) insert into foo values(10, 12, 12, 12)
on conflict( a, b) on conflict( a, b)
do update set b = excluded.b; do update set b = excluded.b;
SELECT * from foo ORDER BY a,b;
--TEST2c Do DML directly on the chunk. --TEST2c Do DML directly on the chunk.
insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12); insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12)
on conflict( a, b)
do update set b = excluded.b + 12;
SELECT * from foo ORDER BY a,b;
update _timescaledb_internal._hyper_1_2_chunk update _timescaledb_internal._hyper_1_2_chunk
set b = 12; set b = 12;
delete from _timescaledb_internal._hyper_1_2_chunk; delete from _timescaledb_internal._hyper_1_2_chunk;

View File

@ -210,3 +210,82 @@ INSERT INTO comp_conflicts_3 VALUES ('2020-01-01','d1',0.1) ON CONFLICT DO NOTHI
-- data should have move into uncompressed chunk for conflict check -- data should have move into uncompressed chunk for conflict check
SELECT count(*) FROM ONLY :CHUNK; SELECT count(*) FROM ONLY :CHUNK;
CREATE OR REPLACE VIEW compressed_chunk_info_view AS
SELECT
h.schema_name AS hypertable_schema,
h.table_name AS hypertable_name,
c.schema_name as chunk_schema,
c.table_name as chunk_name,
c.status as chunk_status,
comp.schema_name as compressed_chunk_schema,
comp.table_name as compressed_chunk_name
FROM
_timescaledb_catalog.hypertable h JOIN
_timescaledb_catalog.chunk c ON h.id = c.hypertable_id
LEFT JOIN _timescaledb_catalog.chunk comp
ON comp.id = c.compressed_chunk_id;
CREATE TABLE compressed_ht (
time TIMESTAMP WITH TIME ZONE NOT NULL,
sensor_id INTEGER NOT NULL,
cpu double precision null,
temperature double precision null,
name varchar(100) default 'this is a default string value'
);
CREATE UNIQUE INDEX sensor_id_time_idx on compressed_ht(time, sensor_id);
SELECT * FROM create_hypertable('compressed_ht', 'time',
chunk_time_interval => INTERVAL '2 months');
-- create chunk 1
INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1');
INSERT INTO compressed_ht VALUES ('2017-12-24 01:10:28.192199+05:30', '1', 0.876, 4.123, 'chunk 1');
-- create chunk 2
INSERT INTO compressed_ht VALUES ('2017-03-28 01:10:28.192199+05:30', '2', 0.876, 4.123, 'chunk 2');
INSERT INTO compressed_ht VALUES ('2017-03-12 01:10:28.192199+05:30', '3', 0.876, 4.123, 'chunk 2');
-- create chunk 3
INSERT INTO compressed_ht VALUES ('2022-01-18 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-08 01:10:28.192199+05:30', '4', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-11 01:10:28.192199+05:30', '5', 0.876, 4.123, 'chunk 3');
INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'chunk 3');
ALTER TABLE compressed_ht SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id'
);
SELECT COMPRESS_CHUNK(SHOW_CHUNKS('compressed_ht'));
-- check compression status
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
-- should report 0 row
SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE';
INSERT INTO compressed_ht VALUES ('2017-12-28 01:10:28.192199+05:30', '1', 0.876, 4.123, 'new insert row')
ON conflict(sensor_id, time)
DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE';
-- should report 1 row
SELECT COUNT(*) FROM compressed_ht WHERE name = 'ON CONFLICT DO UPDATE';
-- check that chunk 1 compression status is set to partial
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;
INSERT INTO compressed_ht VALUES ('2022-01-24 01:10:28.192199+05:30', '6', 0.876, 4.123, 'new insert row')
ON conflict(sensor_id, time)
DO UPDATE SET sensor_id = excluded.sensor_id , name = 'ON CONFLICT DO UPDATE' RETURNING *;
-- check that chunks 1 and 3 compression status is set to partial
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'compressed_ht' ORDER BY chunk_name;