mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-15 01:53:41 +08:00
Add rescan function to CompressChunkDml CustomScan node
The CompressChunkDml custom scan was missing a rescan function leading to a segfault in plans that required a rescan.
This commit is contained in:
parent
d9aa40d36d
commit
039607dc1a
@ -30,6 +30,7 @@ static Node *compress_chunk_dml_state_create(CustomScan *scan);
|
||||
static void compress_chunk_dml_begin(CustomScanState *node, EState *estate, int eflags);
|
||||
static TupleTableSlot *compress_chunk_dml_exec(CustomScanState *node);
|
||||
static void compress_chunk_dml_end(CustomScanState *node);
|
||||
static void compress_chunk_dml_rescan(CustomScanState *node);
|
||||
|
||||
static CustomPathMethods compress_chunk_dml_path_methods = {
|
||||
.CustomName = "CompressChunkDml",
|
||||
@ -46,6 +47,7 @@ static CustomExecMethods compress_chunk_dml_state_methods = {
|
||||
.BeginCustomScan = compress_chunk_dml_begin,
|
||||
.EndCustomScan = compress_chunk_dml_end,
|
||||
.ExecCustomScan = compress_chunk_dml_exec,
|
||||
.ReScanCustomScan = compress_chunk_dml_rescan,
|
||||
};
|
||||
|
||||
static void
|
||||
@ -56,6 +58,14 @@ compress_chunk_dml_begin(CustomScanState *node, EState *estate, int eflags)
|
||||
node->custom_ps = list_make1(ExecInitNode(subplan, estate, eflags));
|
||||
}
|
||||
|
||||
/*
|
||||
* nothing to reset for rescan in dml blocker
|
||||
*/
|
||||
static void
|
||||
compress_chunk_dml_rescan(CustomScanState *node)
|
||||
{
|
||||
}
|
||||
|
||||
/* we cannot update/delete rows if we have a compressed chunk. so
|
||||
* throw an error. Note this subplan will return 0 tuples as the chunk is empty
|
||||
* and all rows are saved in the compressed chunk.
|
||||
|
@ -904,3 +904,58 @@ SELECT * FROM cagg_expr ORDER BY time LIMIT 5;
|
||||
(5 rows)
|
||||
|
||||
ALTER TABLE metrics set(timescaledb.compress);
|
||||
-- test rescan in compress chunk dml blocker
|
||||
CREATE TABLE rescan_test(id integer NOT NULL, t timestamptz NOT NULL, val double precision, PRIMARY KEY(id, t));
|
||||
SELECT create_hypertable('rescan_test', 't', chunk_time_interval => interval '1 day');
|
||||
create_hypertable
|
||||
---------------------------
|
||||
(16,public,rescan_test,t)
|
||||
(1 row)
|
||||
|
||||
-- compression
|
||||
ALTER TABLE rescan_test SET (timescaledb.compress, timescaledb.compress_segmentby = 'id');
|
||||
NOTICE: adding index _compressed_hypertable_17_id__ts_meta_sequence_num_idx ON _timescaledb_internal._compressed_hypertable_17 USING BTREE(id, _ts_meta_sequence_num)
|
||||
-- INSERT dummy data
|
||||
INSERT INTO rescan_test SELECT 1, time, random() FROM generate_series('2000-01-01'::timestamptz, '2000-01-05'::timestamptz, '1h'::interval) g(time);
|
||||
SELECT count(*) FROM rescan_test;
|
||||
count
|
||||
-------
|
||||
97
|
||||
(1 row)
|
||||
|
||||
-- compress first chunk
|
||||
SELECT compress_chunk(ch1.schema_name|| '.' || ch1.table_name)
|
||||
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id
|
||||
and ht.table_name like 'rescan_test' ORDER BY ch1.id LIMIT 1;
|
||||
compress_chunk
|
||||
------------------------------------------
|
||||
_timescaledb_internal._hyper_16_36_chunk
|
||||
(1 row)
|
||||
|
||||
-- count should be equal to count before compression
|
||||
SELECT count(*) FROM rescan_test;
|
||||
count
|
||||
-------
|
||||
97
|
||||
(1 row)
|
||||
|
||||
-- single row update is fine
|
||||
UPDATE rescan_test SET val = val + 1 WHERE rescan_test.id = 1 AND rescan_test.t = '2000-01-03 00:00:00+00';
|
||||
-- multi row update via WHERE is fine
|
||||
UPDATE rescan_test SET val = val + 1 WHERE rescan_test.id = 1 AND rescan_test.t > '2000-01-03 00:00:00+00';
|
||||
-- single row update with FROM is allowed if no compressed chunks are hit
|
||||
UPDATE rescan_test SET val = tmp.val
|
||||
FROM (SELECT x.id, x.t, x.val FROM unnest(array[(1, '2000-01-03 00:00:00+00', 2.045)]::rescan_test[]) AS x) AS tmp
|
||||
WHERE rescan_test.id = tmp.id AND rescan_test.t = tmp.t AND rescan_test.t >= '2000-01-03';
|
||||
-- single row update with FROM is blocked
|
||||
\set ON_ERROR_STOP 0
|
||||
UPDATE rescan_test SET val = tmp.val
|
||||
FROM (SELECT x.id, x.t, x.val FROM unnest(array[(1, '2000-01-03 00:00:00+00', 2.045)]::rescan_test[]) AS x) AS tmp
|
||||
WHERE rescan_test.id = tmp.id AND rescan_test.t = tmp.t;
|
||||
ERROR: cannot update/delete rows from chunk "_hyper_16_36_chunk" as it is compressed
|
||||
-- bulk row update with FROM is blocked
|
||||
UPDATE rescan_test SET val = tmp.val
|
||||
FROM (SELECT x.id, x.t, x.val FROM unnest(array[(1, '2000-01-03 00:00:00+00', 2.045), (1, '2000-01-03 01:00:00+00', 8.045)]::rescan_test[]) AS x) AS tmp
|
||||
WHERE rescan_test.id = tmp.id AND rescan_test.t = tmp.t;
|
||||
ERROR: cannot update/delete rows from chunk "_hyper_16_36_chunk" as it is compressed
|
||||
\set ON_ERROR_STOP 1
|
||||
|
@ -322,3 +322,49 @@ REFRESH MATERIALIZED VIEW cagg_expr;
|
||||
SELECT * FROM cagg_expr ORDER BY time LIMIT 5;
|
||||
|
||||
ALTER TABLE metrics set(timescaledb.compress);
|
||||
|
||||
-- test rescan in compress chunk dml blocker
|
||||
CREATE TABLE rescan_test(id integer NOT NULL, t timestamptz NOT NULL, val double precision, PRIMARY KEY(id, t));
|
||||
SELECT create_hypertable('rescan_test', 't', chunk_time_interval => interval '1 day');
|
||||
|
||||
-- compression
|
||||
ALTER TABLE rescan_test SET (timescaledb.compress, timescaledb.compress_segmentby = 'id');
|
||||
|
||||
-- INSERT dummy data
|
||||
INSERT INTO rescan_test SELECT 1, time, random() FROM generate_series('2000-01-01'::timestamptz, '2000-01-05'::timestamptz, '1h'::interval) g(time);
|
||||
|
||||
|
||||
SELECT count(*) FROM rescan_test;
|
||||
|
||||
-- compress first chunk
|
||||
SELECT compress_chunk(ch1.schema_name|| '.' || ch1.table_name)
|
||||
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id
|
||||
and ht.table_name like 'rescan_test' ORDER BY ch1.id LIMIT 1;
|
||||
|
||||
-- count should be equal to count before compression
|
||||
SELECT count(*) FROM rescan_test;
|
||||
|
||||
-- single row update is fine
|
||||
UPDATE rescan_test SET val = val + 1 WHERE rescan_test.id = 1 AND rescan_test.t = '2000-01-03 00:00:00+00';
|
||||
|
||||
-- multi row update via WHERE is fine
|
||||
UPDATE rescan_test SET val = val + 1 WHERE rescan_test.id = 1 AND rescan_test.t > '2000-01-03 00:00:00+00';
|
||||
|
||||
-- single row update with FROM is allowed if no compressed chunks are hit
|
||||
UPDATE rescan_test SET val = tmp.val
|
||||
FROM (SELECT x.id, x.t, x.val FROM unnest(array[(1, '2000-01-03 00:00:00+00', 2.045)]::rescan_test[]) AS x) AS tmp
|
||||
WHERE rescan_test.id = tmp.id AND rescan_test.t = tmp.t AND rescan_test.t >= '2000-01-03';
|
||||
|
||||
-- single row update with FROM is blocked
|
||||
\set ON_ERROR_STOP 0
|
||||
UPDATE rescan_test SET val = tmp.val
|
||||
FROM (SELECT x.id, x.t, x.val FROM unnest(array[(1, '2000-01-03 00:00:00+00', 2.045)]::rescan_test[]) AS x) AS tmp
|
||||
WHERE rescan_test.id = tmp.id AND rescan_test.t = tmp.t;
|
||||
|
||||
-- bulk row update with FROM is blocked
|
||||
UPDATE rescan_test SET val = tmp.val
|
||||
FROM (SELECT x.id, x.t, x.val FROM unnest(array[(1, '2000-01-03 00:00:00+00', 2.045), (1, '2000-01-03 01:00:00+00', 8.045)]::rescan_test[]) AS x) AS tmp
|
||||
WHERE rescan_test.id = tmp.id AND rescan_test.t = tmp.t;
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user