1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-17 19:13:16 +08:00

Fix chunk status when inserting into chunks

While executing compression operations in parallel with
inserting into chunks (both operations which can potentially
change the chunk status), we could get into situations where
the chunk status would end up inconsistent. This change re-reads
the chunk status after locking the chunk to make sure it can
decompress data when handling ON CONFLICT inserts correctly.
This commit is contained in:
Ante Kresic 2023-04-06 14:00:49 +02:00 committed by Ante Kresic
parent 54074f1fd4
commit 84b6783a19
8 changed files with 3157 additions and 8 deletions

@ -135,6 +135,18 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,
}
cis = ts_chunk_insert_state_create(chunk, dispatch);
/*
* We might have been blocked by a compression operation
* while trying to fetch the above lock so lets update the
* chunk catalog data because the status might have changed.
*
* This works even in higher levels of isolation since
* catalog data is always read from latest snapshot.
*/
chunk = ts_chunk_get_by_relid(chunk->table_id, true);
ts_set_compression_status(cis, chunk);
ts_subspace_store_add(dispatch->cache, chunk->cube, cis, destroy_chunk_insert_state);
if (found && ts_chunk_is_compressed(chunk) && !ts_chunk_is_distributed(chunk))

@ -608,10 +608,7 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
state->rel = rel;
state->result_relation_info = relinfo;
state->estate = dispatch->estate;
state->chunk_compressed = ts_chunk_is_compressed(chunk);
if (state->chunk_compressed)
state->chunk_partial = ts_chunk_is_partial(chunk);
ts_set_compression_status(state, chunk);
if (relinfo->ri_RelationDesc->rd_rel->relhasindex && relinfo->ri_IndexRelationDescs == NULL)
ExecOpenIndices(relinfo, onconflict_action != ONCONFLICT_NONE);
@ -718,6 +715,14 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
return state;
}
void
ts_set_compression_status(ChunkInsertState *state, const Chunk *chunk)
{
state->chunk_compressed = ts_chunk_is_compressed(chunk);
if (state->chunk_compressed)
state->chunk_partial = ts_chunk_is_partial(chunk);
}
extern void
ts_chunk_insert_state_destroy(ChunkInsertState *state)
{

@ -68,4 +68,5 @@ extern ChunkInsertState *ts_chunk_insert_state_create(const Chunk *chunk, ChunkD
extern void ts_chunk_insert_state_destroy(ChunkInsertState *state);
OnConflictAction chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch);
void ts_set_compression_status(ChunkInsertState *state, const Chunk *chunk);
#endif /* TIMESCALEDB_CHUNK_INSERT_STATE_H */

@ -1935,8 +1935,12 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
bms_free(key_columns);
TableScanDesc heapScan =
table_beginscan(in_rel, GetTransactionSnapshot(), num_scankeys, scankeys);
/*
* Using latest snapshot to scan the heap since we are doing this to build
* the index on the uncompressed chunks in order to do speculative insertion
* which is always built from all tuples (even in higher levels of isolation).
*/
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys);
for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection);
compressed_tuple != NULL;

File diff suppressed because it is too large Load Diff

@ -554,6 +554,7 @@ time|device|location|value
7| 1| 100| 20
8| 1| 100| 20
9| 1| 100| 20
1| 1| 200| 100
10| 1| 100| 20
11| 1| 100| 20
12| 1| 100| 20
@ -574,13 +575,13 @@ time|device|location|value
27| 1| 100| 20
28| 1| 100| 20
29| 1| 100| 20
(31 rows)
(32 rows)
step SChunkStat: SELECT status from _timescaledb_catalog.chunk
WHERE id = ( select min(ch.id) FROM _timescaledb_catalog.hypertable ht, _timescaledb_catalog.chunk ch WHERE ch.hypertable_id = ht.id AND ht.table_name like 'ts_device_table');
status
------
1
9
(1 row)

@ -35,6 +35,7 @@ list(
APPEND
TEST_FILES
compression_ddl_iso.spec
compression_conflicts_iso.spec
cagg_insert.spec
cagg_multi_iso.spec
cagg_concurrent_refresh.spec

@ -0,0 +1,169 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.
setup
{
CREATE TABLE ts_device_table(time INTEGER, device INTEGER, location INTEGER, value INTEGER);
CREATE UNIQUE INDEX device_time_idx on ts_device_table(time, device);
SELECT create_hypertable('ts_device_table', 'time', chunk_time_interval => 10);
INSERT INTO ts_device_table SELECT generate_series(0,9,1), 1, 100, 20;
ALTER TABLE ts_device_table set(timescaledb.compress, timescaledb.compress_segmentby='location', timescaledb.compress_orderby='time');
CREATE FUNCTION lock_chunktable( name text) RETURNS void AS $$
BEGIN EXECUTE format( 'lock table %s IN SHARE MODE', name);
END; $$ LANGUAGE plpgsql;
CREATE FUNCTION count_chunktable(tbl regclass) RETURNS TABLE("count(*)" int, "count(*) only" int) AS $$
DECLARE c int;c_only int;
BEGIN
EXECUTE format('SELECT count(*) FROM %s', tbl) INTO c;
EXECUTE format('SELECT count(*) FROM ONLY %s', tbl) INTO c_only;
RETURN QUERY SELECT c,c_only;
END; $$ LANGUAGE plpgsql;
}
teardown
{
DROP TABLE ts_device_table cascade;
DROP FUNCTION lock_chunktable;
DROP FUNCTION count_chunktable;
}
session "I"
step "IB" { BEGIN; }
step "IBRR" { BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; }
step "IBS" { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; }
step "I1" {
INSERT INTO ts_device_table VALUES (1, 1, 100, 100) ON CONFLICT DO NOTHING;
}
step "Iu1" {
INSERT INTO ts_device_table VALUES (1, 1, 100, 98) ON CONFLICT(time, device) DO UPDATE SET value = 98;
}
step "Ic" { COMMIT; }
session "IN"
step "IN1" { BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100) ON CONFLICT DO NOTHING; }
step "INu1" { BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 100, 99) ON CONFLICT(time, device) DO UPDATE SET value = 99; }
step "INc" { COMMIT; }
session "SI"
step "SChunkStat" { SELECT status from _timescaledb_catalog.chunk
WHERE id = ( select min(ch.id) FROM _timescaledb_catalog.hypertable ht, _timescaledb_catalog.chunk ch WHERE ch.hypertable_id = ht.id AND ht.table_name like 'ts_device_table'); }
session "S"
step "S1" { SELECT count(*) from ts_device_table; }
step "SC1" { SELECT (count_chunktable(ch)).* FROM show_chunks('ts_device_table') AS ch LIMIT 1; }
step "SH" { SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); }
step "SA" { SELECT * FROM ts_device_table; }
step "SU" { SELECT * FROM ts_device_table WHERE value IN (98,99); }
session "LCT"
step "LockChunkTuple" {
BEGIN;
SELECT status as chunk_status from _timescaledb_catalog.chunk
WHERE id = ( select min(ch.id) FROM _timescaledb_catalog.hypertable ht, _timescaledb_catalog.chunk ch WHERE ch.hypertable_id = ht.id AND ht.table_name like 'ts_device_table') FOR UPDATE;
}
step "UnlockChunkTuple" { ROLLBACK; }
session "LC"
step "LockChunk1" {
BEGIN;
SELECT
lock_chunktable(format('%I.%I',ch.schema_name, ch.table_name))
FROM _timescaledb_catalog.hypertable ht, _timescaledb_catalog.chunk ch
WHERE ch.hypertable_id = ht.id AND ht.table_name like 'ts_device_table'
ORDER BY ch.id LIMIT 1;
}
step "UnlockChunk" {ROLLBACK;}
session "C"
step "C1" {
BEGIN;
SET LOCAL lock_timeout = '500ms';
SET LOCAL deadlock_timeout = '10ms';
SELECT
CASE WHEN compress_chunk(format('%I.%I',ch.schema_name, ch.table_name)) IS NOT NULL THEN true ELSE false END AS compress
FROM _timescaledb_catalog.hypertable ht, _timescaledb_catalog.chunk ch
WHERE ch.hypertable_id = ht.id AND ht.table_name like 'ts_device_table'
ORDER BY ch.id LIMIT 1;
}
step "Cc" { COMMIT; }
session "CompressAll"
step "CA1" {
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
}
step "CAc" { COMMIT; }
session "RecompressChunk"
step "RC" {
DO $$
DECLARE
chunk_name text;
BEGIN
FOR chunk_name IN
SELECT ch FROM show_chunks('ts_device_table') ch
ORDER BY ch::text LIMIT 1
LOOP
CALL recompress_chunk(chunk_name);
END LOOP;
END;
$$;
}
#If insert is in progress, compression is blocked.
permutation "LockChunk1" "IB" "I1" "C1" "UnlockChunk" "Ic" "Cc" "SC1" "S1" "SChunkStat"
permutation "LockChunk1" "IBRR" "I1" "C1" "UnlockChunk" "Ic" "Cc" "SC1" "S1" "SChunkStat"
permutation "LockChunk1" "IBS" "I1" "C1" "UnlockChunk" "Ic" "Cc" "SC1" "S1" "SChunkStat"
permutation "LockChunk1" "IB" "Iu1" "C1" "UnlockChunk" "Ic" "Cc" "SC1" "S1" "SU" "SChunkStat"
permutation "LockChunk1" "IBRR" "Iu1" "C1" "UnlockChunk" "Ic" "Cc" "SC1" "S1" "SU" "SChunkStat"
permutation "LockChunk1" "IBS" "Iu1" "C1" "UnlockChunk" "Ic" "Cc" "SC1" "S1" "SU" "SChunkStat"
#Compress in progress, insert is blocked
permutation "LockChunk1" "C1" "IB" "I1" "UnlockChunk" "Cc" "Ic" "SC1" "SA" "SChunkStat"
permutation "LockChunk1" "C1" "IBRR" "I1" "UnlockChunk" "Cc" "Ic" "SC1" "SA" "SChunkStat"
permutation "LockChunk1" "C1" "IBS" "I1" "UnlockChunk" "Cc" "Ic" "SC1" "SA" "SChunkStat"
permutation "LockChunk1" "C1" "IB" "Iu1" "UnlockChunk" "Cc" "Ic" "SC1" "SA" "SChunkStat"
permutation "LockChunk1" "C1" "IBRR" "Iu1" "UnlockChunk" "Cc" "Ic" "SC1" "SA" "SChunkStat"
permutation "LockChunk1" "C1" "IBS" "Iu1" "UnlockChunk" "Cc" "Ic" "SC1" "SA" "SChunkStat"
# Concurrent inserts into compressed chunk will update chunk status and not error out.
permutation "C1" "Cc" "LockChunkTuple" "IB" "I1" "IN1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IBRR" "I1" "IN1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IBS" "I1" "IN1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IB" "I1" "INu1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SU" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IBRR" "I1" "INu1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IBS" "I1" "INu1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IB" "Iu1" "IN1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SU" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IBRR" "Iu1" "IN1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IBS" "Iu1" "IN1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IB" "Iu1" "INu1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SU" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IBRR" "Iu1" "INu1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SA"
permutation "C1" "Cc" "LockChunkTuple" "IBS" "Iu1" "INu1" "UnlockChunkTuple" "Ic" "INc" "SChunkStat" "SA"
# Testing concurrent recompress and insert.
# Insert will succeed after first phase of recompress completes.
# - First compress chunk and insert into chunk
# - Then start concurrent processes both recompress_chunk and insert
# - Wait for lock on the chunk.
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "RC" "IB" "I1" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "RC" "IBRR" "I1" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "RC" "IBS" "I1" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "RC" "IB" "Iu1" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "RC" "IBRR" "Iu1" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat" "SU"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "RC" "IBS" "Iu1" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat" "SU"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "RC" "IN1" "UnlockChunk" "INc" "SH" "SA" "SChunkStat"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "RC" "INu1" "UnlockChunk" "INc" "SH" "SA" "SChunkStat" "SU"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "IB" "I1" "RC" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "IBRR" "I1" "RC" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "IBS" "I1" "RC" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "IB" "Iu1" "RC" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat" "SU"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "IBRR" "Iu1" "RC" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat" "SU"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "IBS" "Iu1" "RC" "UnlockChunk" "Ic" "SH" "SA" "SChunkStat" "SU"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "IN1" "RC" "UnlockChunk" "INc" "SH" "SA" "SChunkStat"
permutation "CA1" "CAc" "I1" "SChunkStat" "LockChunk1" "INu1" "RC" "UnlockChunk" "INc" "SH" "SA" "SChunkStat" "SU"