Fix on-insert decompression for unique constraints

Inserting multiple rows into a compressed chunk could have bypassed
constraint check in case the table had segment_by columns.

Decompression is narrowed to only consider candidates by the actual
segment_by value.
Because of caching - decompression was skipped for follow-up rows of
the same Chunk.

Fixes #5553
This commit is contained in:
Zoltan Haindrich 2023-04-14 14:24:42 +00:00 committed by Zoltan Haindrich
parent a49fdbcffb
commit a0df8c8e6d
4 changed files with 111 additions and 21 deletions

View File

@ -49,6 +49,7 @@ accidentally triggering the load of a previous DB version.**
* #5544 Fix refresh from beginning of Continuous Aggregate with variable time bucket
* #5558 Use regrole for job owner
* #5542 Enable indexscan on uncompressed part of partially compressed chunks
* #5573 Fix unique constraint on compressed tables
**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher

View File

@ -64,6 +64,8 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
{
ChunkInsertState *cis;
bool cis_changed = true;
bool found = true;
Chunk *chunk = NULL;
/* Direct inserts into internal compressed hypertable is not supported.
* For compression chunks are created explicitly by compress_chunk and
@ -75,15 +77,14 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
cis = ts_subspace_store_get(dispatch->cache, point);
/*
* The chunk search functions may leak memory, so switch to a temporary
* memory context.
*/
MemoryContext old_context = MemoryContextSwitchTo(GetPerTupleMemoryContext(dispatch->estate));
if (!cis)
{
/*
* The chunk search functions may leak memory, so switch to a temporary
* memory context.
*/
MemoryContext old_context =
MemoryContextSwitchTo(GetPerTupleMemoryContext(dispatch->estate));
/*
* Normally, for every row of the chunk except the first one, we expect
* the chunk to exist already. The "create" function would take a lock
@ -92,9 +93,8 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
* locking the hypertable. This serves as a fast path for the usual case
* where the chunk already exists.
*/
bool found;
Assert(slot);
Chunk *chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point);
chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point);
#if PG14_GE
/*
@ -108,10 +108,6 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
{
chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found);
}
else
{
found = true;
}
if (!chunk)
elog(ERROR, "no chunk found or created");
@ -148,8 +144,16 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
ts_set_compression_status(cis, chunk);
ts_subspace_store_add(dispatch->cache, chunk->cube, cis, destroy_chunk_insert_state);
}
else if (cis->rel->rd_id == dispatch->prev_cis_oid && cis == dispatch->prev_cis)
{
/* got the same item from cache as before */
cis_changed = false;
}
if (found && ts_chunk_is_compressed(chunk) && !ts_chunk_is_distributed(chunk))
if (found)
{
if (cis->chunk_compressed && cis->chunk_data_nodes == NIL)
{
/*
* If this is an INSERT into a compressed chunk with UNIQUE or
@ -159,6 +163,12 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
*/
if (ts_cm_functions->decompress_batches_for_insert)
{
/* Get the chunk if its not already been loaded.
* It's needed for decompress_batches_for_insert
* which only uses some ids from it.
*/
if (chunk == NULL)
chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point);
ts_cm_functions->decompress_batches_for_insert(cis, chunk, slot);
OnConflictAction onconflict_action =
chunk_dispatch_get_on_conflict_action(dispatch);
@ -175,14 +185,9 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
errhint("To access all features and the best time-series "
"experience, try out Timescale Cloud")));
}
}
MemoryContextSwitchTo(old_context);
}
else if (cis->rel->rd_id == dispatch->prev_cis_oid && cis == dispatch->prev_cis)
{
/* got the same item from cache as before */
cis_changed = false;
}
MemoryContextSwitchTo(old_context);
if (cis_changed && on_chunk_changed)
on_chunk_changed(cis, data);

View File

@ -3,6 +3,7 @@
-- LICENSE-TIMESCALE for a copy of the license.
\set ON_ERROR_STOP 0
\set VERBOSITY default
\set ECHO none
--table with special column names --
create table foo2 (a integer, "bacB toD" integer, c integer, d integer);
select table_name from create_hypertable('foo2', 'a', chunk_time_interval=> 10);
@ -660,3 +661,49 @@ SELECT readings FROM readings;
("Fri Nov 11 11:11:11 2022 PST",0.2)
(2 rows)
-- Unique constraints are not always respected on compressed tables #5553
CREATE TABLE main_table AS
SELECT '2011-11-11 11:11:11'::timestamptz AS time, 'foo' AS device_id;
CREATE UNIQUE INDEX xm ON main_table(time, device_id);
SELECT create_hypertable('main_table', 'time', chunk_time_interval => interval '12 hour', migrate_data => TRUE);
NOTICE: adding not-null constraint to column "time"
NOTICE: migrating data to chunks
create_hypertable
--------------------------
(37,public,main_table,t)
(1 row)
ALTER TABLE main_table SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id',
timescaledb.compress_orderby = '');
SELECT compress_chunk(show_chunks('main_table'));
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_37_25_chunk
(1 row)
-- insert rejected
\set ON_ERROR_STOP 0
INSERT INTO main_table VALUES
('2011-11-11 11:11:11', 'foo');
ERROR: duplicate key value violates unique constraint "_hyper_37_25_chunk_xm"
-- insert rejected in case 1st row doesn't violate constraint with different segmentby
INSERT INTO main_table VALUES
('2011-11-11 11:12:11', 'bar'),
('2011-11-11 11:11:11', 'foo');
ERROR: duplicate key value violates unique constraint "_hyper_37_25_chunk_xm"
\set ON_ERROR_STOP 1
SELECT assert_equal(count(1), 1::bigint) FROM main_table;
assert_equal
--------------
(1 row)
-- no unique check failure during decompression
SELECT decompress_chunk(show_chunks('main_table'), TRUE);
decompress_chunk
------------------------------------------
_timescaledb_internal._hyper_37_25_chunk
(1 row)

View File

@ -5,6 +5,12 @@
\set ON_ERROR_STOP 0
\set VERBOSITY default
\set ECHO none
\o /dev/null
\ir ../../../test/sql/include/test_utils.sql
\o
\set ECHO all
--table with special column names --
create table foo2 (a integer, "bacB toD" integer, c integer, d integer);
select table_name from create_hypertable('foo2', 'a', chunk_time_interval=> 10);
@ -382,3 +388,34 @@ SELECT compress_chunk(show_chunks('readings'));
ALTER TABLE readings DROP COLUMN battery_status;
INSERT INTO readings ("time", battery_temperature) VALUES ('2022-11-11 11:11:11', 0.2);
SELECT readings FROM readings;
-- Unique constraints are not always respected on compressed tables #5553
CREATE TABLE main_table AS
SELECT '2011-11-11 11:11:11'::timestamptz AS time, 'foo' AS device_id;
CREATE UNIQUE INDEX xm ON main_table(time, device_id);
SELECT create_hypertable('main_table', 'time', chunk_time_interval => interval '12 hour', migrate_data => TRUE);
ALTER TABLE main_table SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id',
timescaledb.compress_orderby = '');
SELECT compress_chunk(show_chunks('main_table'));
-- insert rejected
\set ON_ERROR_STOP 0
INSERT INTO main_table VALUES
('2011-11-11 11:11:11', 'foo');
-- insert rejected in case 1st row doesn't violate constraint with different segmentby
INSERT INTO main_table VALUES
('2011-11-11 11:12:11', 'bar'),
('2011-11-11 11:11:11', 'foo');
\set ON_ERROR_STOP 1
SELECT assert_equal(count(1), 1::bigint) FROM main_table;
-- no unique check failure during decompression
SELECT decompress_chunk(show_chunks('main_table'), TRUE);