Fix segfault when deleting from compressed chunk

During UPDATE/DELETE on compressed hypertables, we iterate over plan
tree to collect all scan nodes. For each scan nodes there can be
filter conditions.

Prior to this patch we collect only first filter condition and use
for first chunk which may be wrong. In this patch as and when we
encounter a target scan node, we immediatly process those chunks.

Fixes #5640
This commit is contained in:
Bharathy 2023-05-03 22:37:08 +05:30
parent 90f585ed7f
commit 769f9fe609
9 changed files with 252 additions and 116 deletions

View File

@ -139,7 +139,7 @@ typedef struct CrossModuleFunctions
PGFunction decompress_chunk;
void (*decompress_batches_for_insert)(ChunkInsertState *state, Chunk *chunk,
TupleTableSlot *slot);
void (*decompress_batches_for_update_delete)(List *chunks, List *predicates);
bool (*decompress_target_segments)(PlanState *ps);
/* The compression functions below are not installed in SQL as part of create extension;
* They are installed and tested during testing scripts. They are exposed in cross-module
* functions because they may be very useful for debugging customer problems if the sql

View File

@ -93,53 +93,6 @@ get_chunk_dispatch_states(PlanState *substate)
return NIL;
}
#if PG14_GE
typedef struct ChunkScanNodes
{
/* list of compressed chunks */
List *chunks;
/* list of conditions specified in WHERE */
List *predicates;
} ChunkScanNodes;
/*
* Traverse the plan tree to look for Scan nodes on uncompressed chunks.
* Once Scan node is found check if chunk is compressed, if so then save
* the chunk in HypertableModifyState to be used during plan execution.
* We also save the WHERE quals to get information about predicates.
*/
static bool
collect_chunks_from_scan(PlanState *ps, ChunkScanNodes *sn)
{
RangeTblEntry *rte = NULL;
Chunk *current_chunk;
if (ps == NULL || ts_guc_enable_transparent_decompression == false)
return false;
switch (nodeTag(ps))
{
case T_SeqScanState:
case T_SampleScanState:
case T_IndexScanState:
case T_IndexOnlyScanState:
case T_BitmapHeapScanState:
case T_TidScanState:
case T_TidRangeScanState:
rte = rt_fetch(((Scan *) ps->plan)->scanrelid, ps->state->es_range_table);
current_chunk = ts_chunk_get_by_relid(rte->relid, false);
if (current_chunk && ts_chunk_is_compressed(current_chunk))
{
sn->chunks = lappend(sn->chunks, current_chunk);
if (ps->plan->qual && !sn->predicates)
sn->predicates = ps->plan->qual;
}
break;
default:
break;
}
return planstate_tree_walker(ps, collect_chunks_from_scan, sn);
}
#endif
/*
* HypertableInsert (with corresponding executor node) is a plan node that
* implements INSERTs for hypertables. It is mostly a wrapper around the
@ -799,11 +752,9 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate)
*/
if ((operation == CMD_DELETE || operation == CMD_UPDATE) && !ht_state->comp_chunks_processed)
{
ChunkScanNodes *sn = palloc0(sizeof(ChunkScanNodes));
collect_chunks_from_scan(pstate, sn);
if (sn->chunks && ts_cm_functions->decompress_batches_for_update_delete)
if (ts_cm_functions->decompress_target_segments)
{
ts_cm_functions->decompress_batches_for_update_delete(sn->chunks, sn->predicates);
ts_cm_functions->decompress_target_segments(pstate);
ht_state->comp_chunks_processed = true;
/*
* save snapshot set during ExecutorStart(), since this is the same
@ -816,7 +767,6 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate)
/* mark rows visible */
estate->es_output_cid = GetCurrentCommandId(true);
}
pfree(sn);
}
/*
* Fetch rows from subplan, and execute the required table modification

View File

@ -21,8 +21,10 @@
#include <funcapi.h>
#include <libpq/pqformat.h>
#include <miscadmin.h>
#include <nodes/nodeFuncs.h>
#include <nodes/pg_list.h>
#include <nodes/print.h>
#include <parser/parsetree.h>
#include <storage/lmgr.h>
#include <storage/predicate.h>
#include <utils/builtins.h>
@ -2340,61 +2342,57 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
* 3. Move scanned rows to staging area.
* 4. Update catalog table to change status of moved chunk.
*/
void
decompress_batches_for_update_delete(List *chunks, List *predicates)
static void
decompress_batches_for_update_delete(Chunk *chunk, List *predicates)
{
/* process each chunk with its corresponding predicates */
List *filters = NIL;
List *is_null = NIL;
ListCell *ch = NULL;
ListCell *lc = NULL;
Relation chunk_rel;
Relation comp_chunk_rel;
Chunk *chunk, *comp_chunk;
Chunk *comp_chunk;
RowDecompressor decompressor;
SegmentFilter *filter;
if (predicates)
fill_predicate_context(linitial(chunks), predicates, &filters, &is_null);
foreach (ch, chunks)
bool chunk_status_changed = false;
ScanKeyData *scankeys = NULL;
Bitmapset *null_columns = NULL;
int num_scankeys = 0;
fill_predicate_context(chunk, predicates, &filters, &is_null);
chunk_rel = table_open(chunk->table_id, RowExclusiveLock);
comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock);
decompressor = build_decompressor(comp_chunk_rel, chunk_rel);
if (filters)
{
chunk = (Chunk *) lfirst(ch);
bool chunk_status_changed = false;
ScanKeyData *scankeys = NULL;
Bitmapset *null_columns = NULL;
int num_scankeys = 0;
chunk_rel = table_open(chunk->table_id, RowExclusiveLock);
comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock);
decompressor = build_decompressor(comp_chunk_rel, chunk_rel);
if (filters)
{
scankeys =
build_update_delete_scankeys(&decompressor, filters, &num_scankeys, &null_columns);
}
if (decompress_batches(&decompressor,
scankeys,
num_scankeys,
null_columns,
is_null,
&chunk_status_changed))
{
/*
* tuples from compressed chunk has been decompressed and moved
* to staging area, thus mark this chunk as partially compressed
*/
if (chunk_status_changed == true)
ts_chunk_set_partial(lfirst(ch));
}
ts_catalog_close_indexes(decompressor.indexstate);
FreeBulkInsertState(decompressor.bistate);
table_close(chunk_rel, NoLock);
table_close(comp_chunk_rel, NoLock);
scankeys =
build_update_delete_scankeys(&decompressor, filters, &num_scankeys, &null_columns);
}
if (decompress_batches(&decompressor,
scankeys,
num_scankeys,
null_columns,
is_null,
&chunk_status_changed))
{
/*
* tuples from compressed chunk has been decompressed and moved
* to staging area, thus mark this chunk as partially compressed
*/
if (chunk_status_changed == true)
ts_chunk_set_partial(chunk);
}
ts_catalog_close_indexes(decompressor.indexstate);
FreeBulkInsertState(decompressor.bistate);
table_close(chunk_rel, NoLock);
table_close(comp_chunk_rel, NoLock);
foreach (lc, filters)
{
@ -2402,4 +2400,39 @@ decompress_batches_for_update_delete(List *chunks, List *predicates)
pfree(filter);
}
}
/*
* Traverse the plan tree to look for Scan nodes on uncompressed chunks.
* Once Scan node is found check if chunk is compressed, if so then
* decompress those segments which match the filter conditions if present.
*/
bool
decompress_target_segments(PlanState *ps)
{
RangeTblEntry *rte = NULL;
Chunk *current_chunk;
if (ps == NULL)
return false;
switch (nodeTag(ps))
{
case T_SeqScanState:
case T_SampleScanState:
case T_IndexScanState:
case T_IndexOnlyScanState:
case T_BitmapHeapScanState:
case T_TidScanState:
case T_TidRangeScanState:
rte = rt_fetch(((Scan *) ps->plan)->scanrelid, ps->state->es_range_table);
current_chunk = ts_chunk_get_by_relid(rte->relid, false);
if (current_chunk && ts_chunk_is_compressed(current_chunk))
{
decompress_batches_for_update_delete(current_chunk, ps->plan->qual);
}
break;
default:
break;
}
return planstate_tree_walker(ps, decompress_target_segments, NULL);
}
#endif

View File

@ -307,7 +307,7 @@ typedef struct ChunkInsertState ChunkInsertState;
extern void decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk,
TupleTableSlot *slot);
#if PG14_GE
extern void decompress_batches_for_update_delete(List *chunks, List *predicates);
extern bool decompress_target_segments(PlanState *ps);
#endif
/* CompressSingleRowState methods */
struct CompressSingleRowState;

View File

@ -185,9 +185,9 @@ CrossModuleFunctions tsl_cm_functions = {
.decompress_chunk = tsl_decompress_chunk,
.decompress_batches_for_insert = decompress_batches_for_insert,
#if PG14_GE
.decompress_batches_for_update_delete = decompress_batches_for_update_delete,
.decompress_target_segments = decompress_target_segments,
#else
.decompress_batches_for_update_delete = NULL,
.decompress_target_segments = NULL,
#endif
.data_node_add = data_node_add,

View File

@ -206,10 +206,11 @@ do update set b = excluded.b;
SELECT * from foo ORDER BY a,b;
a | b | c | d
----+-----+----+----
3 | 111 | 20 |
10 | 12 | 12 | 12
20 | 111 | 20 |
30 | 111 | 40 |
(3 rows)
(4 rows)
--TEST2c Do DML directly on the chunk.
insert into _timescaledb_internal._hyper_1_2_chunk values(10, 12, 12, 12)
@ -218,10 +219,11 @@ do update set b = excluded.b + 12;
SELECT * from foo ORDER BY a,b;
a | b | c | d
----+-----+----+----
3 | 111 | 20 |
10 | 24 | 12 | 12
20 | 111 | 20 |
30 | 111 | 40 |
(3 rows)
(4 rows)
update _timescaledb_internal._hyper_1_2_chunk
set b = 12;
@ -238,9 +240,8 @@ update foo set b =20 where a = 10;
select * from _timescaledb_internal._hyper_1_2_chunk order by a,b;
a | b | c | d
----+----+----+-----
10 | 20 | 20 |
11 | 10 | 20 | 120
(2 rows)
(1 row)
delete from foo where a = 10;
select * from _timescaledb_internal._hyper_1_2_chunk order by a,b;
@ -444,15 +445,6 @@ vacuum full conditions;
-- After vacuum, table_bytes is 0, but any associated index/toast storage is not
-- completely reclaimed. Sets it at 8K (page size). So a chunk which has
-- been compressed still incurs an overhead of n * 8KB (for every index + toast table) storage on the original uncompressed chunk.
select pg_size_pretty(table_bytes), pg_size_pretty(index_bytes),
pg_size_pretty(toast_bytes), pg_size_pretty(total_bytes)
from hypertable_detailed_size('foo');
-[ RECORD 1 ]--+-----------
pg_size_pretty | 32 kB
pg_size_pretty | 144 kB
pg_size_pretty | 8192 bytes
pg_size_pretty | 184 kB
select pg_size_pretty(table_bytes), pg_size_pretty(index_bytes),
pg_size_pretty(toast_bytes), pg_size_pretty(total_bytes)
from hypertable_detailed_size('conditions');

View File

@ -1722,3 +1722,108 @@ SELECT COUNT(*) = :total_rows - :total_affected_rows FROM sample_table;
(1 row)
ROLLBACK;
--github issue: 5640
CREATE TABLE tab1(filler_1 int, filler_2 int, filler_3 int, time timestamptz NOT NULL, device_id int, v0 int, v1 int, v2 float, v3 float);
CREATE INDEX ON tab1(time);
CREATE INDEX ON tab1(device_id,time);
SELECT create_hypertable('tab1','time',create_default_indexes:=false);
create_hypertable
--------------------
(21,public,tab1,t)
(1 row)
ALTER TABLE tab1 DROP COLUMN filler_1;
INSERT INTO tab1(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,'2000-01-05 23:55:00+0','57m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ALTER TABLE tab1 DROP COLUMN filler_2;
INSERT INTO tab1(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id-1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-06 0:00:00+0'::timestamptz,'2000-01-12 23:55:00+0','58m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ALTER TABLE tab1 DROP COLUMN filler_3;
INSERT INTO tab1(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-13 0:00:00+0'::timestamptz,'2000-01-19 23:55:00+0','59m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ANALYZE tab1;
-- compress chunks
ALTER TABLE tab1 SET (timescaledb.compress, timescaledb.compress_orderby='time DESC', timescaledb.compress_segmentby='device_id');
SELECT compress_chunk(show_chunks('tab1'));
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_21_39_chunk
_timescaledb_internal._hyper_21_40_chunk
_timescaledb_internal._hyper_21_41_chunk
(3 rows)
-- ensure there is an index scan generated for below DELETE query
BEGIN;
SELECT count(*) FROM tab1 WHERE device_id = 1;
count
-------
472
(1 row)
INSERT INTO tab1(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 1000, NULL FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,'2000-01-05 23:55:00+0','2m') gtime(time), generate_series(1,5,1) gdevice(device_id);
SELECT count(*) FROM tab1 WHERE device_id = 1;
count
-------
4070
(1 row)
ANALYZE tab1;
EXPLAIN (costs off) DELETE FROM public.tab1 WHERE public.tab1.device_id = 1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Custom Scan (HypertableModify)
-> Delete on tab1
Delete on _hyper_21_39_chunk tab1_1
Delete on _hyper_21_40_chunk tab1_2
Delete on _hyper_21_41_chunk tab1_3
-> Append
-> Index Scan using _hyper_21_39_chunk_tab1_device_id_time_idx on _hyper_21_39_chunk tab1_1
Index Cond: (device_id = 1)
-> Seq Scan on _hyper_21_40_chunk tab1_2
Filter: (device_id = 1)
-> Seq Scan on _hyper_21_41_chunk tab1_3
Filter: (device_id = 1)
(12 rows)
DELETE FROM tab1 WHERE tab1.device_id = 1;
SELECT count(*) FROM tab1 WHERE device_id = 1;
count
-------
0
(1 row)
ROLLBACK;
-- create hypertable with space partitioning and compression
CREATE TABLE tab2(filler_1 int, filler_2 int, filler_3 int, time timestamptz NOT NULL, device_id int, v0 int, v1 int, v2 float, v3 float);
CREATE INDEX ON tab2(time);
CREATE INDEX ON tab2(device_id,time);
SELECT create_hypertable('tab2','time','device_id',3,create_default_indexes:=false);
create_hypertable
--------------------
(23,public,tab2,t)
(1 row)
ALTER TABLE tab2 DROP COLUMN filler_1;
INSERT INTO tab2(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,'2000-01-05 23:55:00+0','35m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ALTER TABLE tab2 DROP COLUMN filler_2;
INSERT INTO tab2(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-06 0:00:00+0'::timestamptz,'2000-01-12 23:55:00+0','45m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ALTER TABLE tab2 DROP COLUMN filler_3;
INSERT INTO tab2(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-13 0:00:00+0'::timestamptz,'2000-01-19 23:55:00+0','55m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ANALYZE tab2;
-- compress chunks
ALTER TABLE tab2 SET (timescaledb.compress, timescaledb.compress_orderby='time DESC', timescaledb.compress_segmentby='device_id');
SELECT compress_chunk(show_chunks('tab2'));
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_23_45_chunk
_timescaledb_internal._hyper_23_46_chunk
_timescaledb_internal._hyper_23_47_chunk
(3 rows)
-- below test will cause chunks of tab2 to get decompressed
-- without fix for issue #5460
SET timescaledb.enable_optimizations = OFF;
BEGIN;
DELETE FROM tab1 t1 USING tab2 t2 WHERE t1.device_id = t2.device_id AND t2.time > '2000-01-01';
ROLLBACK;
--cleanup
RESET timescaledb.enable_optimizations;
DROP table tab1;
DROP table tab2;

View File

@ -184,9 +184,6 @@ vacuum full conditions;
-- been compressed still incurs an overhead of n * 8KB (for every index + toast table) storage on the original uncompressed chunk.
select pg_size_pretty(table_bytes), pg_size_pretty(index_bytes),
pg_size_pretty(toast_bytes), pg_size_pretty(total_bytes)
from hypertable_detailed_size('foo');
select pg_size_pretty(table_bytes), pg_size_pretty(index_bytes),
pg_size_pretty(toast_bytes), pg_size_pretty(total_bytes)
from hypertable_detailed_size('conditions');
select * from timescaledb_information.hypertables
where hypertable_name like 'foo' or hypertable_name like 'conditions'

View File

@ -941,3 +941,62 @@ SELECT COUNT(*) FROM :COMPRESS_CHUNK_1 WHERE c4 > 5 AND _ts_meta_min_2 <= 5 and
-- report true
SELECT COUNT(*) = :total_rows - :total_affected_rows FROM sample_table;
ROLLBACK;
--github issue: 5640
CREATE TABLE tab1(filler_1 int, filler_2 int, filler_3 int, time timestamptz NOT NULL, device_id int, v0 int, v1 int, v2 float, v3 float);
CREATE INDEX ON tab1(time);
CREATE INDEX ON tab1(device_id,time);
SELECT create_hypertable('tab1','time',create_default_indexes:=false);
ALTER TABLE tab1 DROP COLUMN filler_1;
INSERT INTO tab1(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,'2000-01-05 23:55:00+0','57m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ALTER TABLE tab1 DROP COLUMN filler_2;
INSERT INTO tab1(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id-1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-06 0:00:00+0'::timestamptz,'2000-01-12 23:55:00+0','58m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ALTER TABLE tab1 DROP COLUMN filler_3;
INSERT INTO tab1(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-13 0:00:00+0'::timestamptz,'2000-01-19 23:55:00+0','59m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ANALYZE tab1;
-- compress chunks
ALTER TABLE tab1 SET (timescaledb.compress, timescaledb.compress_orderby='time DESC', timescaledb.compress_segmentby='device_id');
SELECT compress_chunk(show_chunks('tab1'));
-- ensure there is an index scan generated for below DELETE query
BEGIN;
SELECT count(*) FROM tab1 WHERE device_id = 1;
INSERT INTO tab1(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 1000, NULL FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,'2000-01-05 23:55:00+0','2m') gtime(time), generate_series(1,5,1) gdevice(device_id);
SELECT count(*) FROM tab1 WHERE device_id = 1;
ANALYZE tab1;
EXPLAIN (costs off) DELETE FROM public.tab1 WHERE public.tab1.device_id = 1;
DELETE FROM tab1 WHERE tab1.device_id = 1;
SELECT count(*) FROM tab1 WHERE device_id = 1;
ROLLBACK;
-- create hypertable with space partitioning and compression
CREATE TABLE tab2(filler_1 int, filler_2 int, filler_3 int, time timestamptz NOT NULL, device_id int, v0 int, v1 int, v2 float, v3 float);
CREATE INDEX ON tab2(time);
CREATE INDEX ON tab2(device_id,time);
SELECT create_hypertable('tab2','time','device_id',3,create_default_indexes:=false);
ALTER TABLE tab2 DROP COLUMN filler_1;
INSERT INTO tab2(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,'2000-01-05 23:55:00+0','35m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ALTER TABLE tab2 DROP COLUMN filler_2;
INSERT INTO tab2(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-06 0:00:00+0'::timestamptz,'2000-01-12 23:55:00+0','45m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ALTER TABLE tab2 DROP COLUMN filler_3;
INSERT INTO tab2(time,device_id,v0,v1,v2,v3) SELECT time, device_id, device_id+1, device_id + 2, device_id + 0.5, NULL FROM generate_series('2000-01-13 0:00:00+0'::timestamptz,'2000-01-19 23:55:00+0','55m') gtime(time), generate_series(1,1,1) gdevice(device_id);
ANALYZE tab2;
-- compress chunks
ALTER TABLE tab2 SET (timescaledb.compress, timescaledb.compress_orderby='time DESC', timescaledb.compress_segmentby='device_id');
SELECT compress_chunk(show_chunks('tab2'));
-- below test will cause chunks of tab2 to get decompressed
-- without fix for issue #5460
SET timescaledb.enable_optimizations = OFF;
BEGIN;
DELETE FROM tab1 t1 USING tab2 t2 WHERE t1.device_id = t2.device_id AND t2.time > '2000-01-01';
ROLLBACK;
--cleanup
RESET timescaledb.enable_optimizations;
DROP table tab1;
DROP table tab2;