diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a401e755..5dfc84529 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ argument or resolve the type ambiguity by casting to the intended type. * #4681 Fix compression_chunk_size primary key * #4685 Improve chunk exclusion for space dimensions * #4696 Report warning when enabling compression on hypertable +* #4676 Fix a deadlock when decompressing chunks and performing SELECTs **Thanks** * @maxtwardowski for reporting problems with chunk exclusion and space dimensions diff --git a/tsl/src/compression/api.c b/tsl/src/compression/api.c index ce00db1a7..1666320a9 100644 --- a/tsl/src/compression/api.c +++ b/tsl/src/compression/api.c @@ -396,12 +396,20 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_ /* Recreate FK constraints, since they were dropped during compression. */ ts_chunk_create_fks(uncompressed_chunk); - /* Prevent readers from using the compressed chunk that is going to be deleted */ - LockRelationOid(uncompressed_chunk->table_id, AccessExclusiveLock); - /* Delete the compressed chunk */ ts_compression_chunk_size_delete(uncompressed_chunk->fd.id); ts_chunk_clear_compressed_chunk(uncompressed_chunk); + + /* + * Lock the compressed chunk that is going to be deleted. At this point, + * the reference to the compressed chunk is already removed from the + * catalog. So, new readers do not include it in their operations. + * + * Note: Calling performMultipleDeletions in chunk_index_tuple_delete + * also requests an AccessExclusiveLock on the compressed_chunk. However, + * this call makes the lock on the chunk explicit. + */ + LockRelationOid(compressed_chunk->table_id, AccessExclusiveLock); ts_chunk_drop(compressed_chunk, DROP_RESTRICT, -1); /* reenable autovacuum if necessary */ diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 22fb304b5..357754b3d 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -38,6 +38,7 @@ #include "array.h" #include "chunk.h" +#include "debug_point.h" #include "deltadelta.h" #include "dictionary.h" #include "gorilla.h" @@ -1134,7 +1135,15 @@ decompress_chunk(Oid in_table, Oid out_table) ReindexParams params = { 0 }; ReindexParams *options = ¶ms; #endif + + /* The reindex_relation() function creates an AccessExclusiveLock on the + * chunk index (if present). After calling this function, concurrent + * SELECTs have to wait until the index lock is released. When no + * index is present concurrent SELECTs can be still performed in + * parallel. */ + DEBUG_WAITPOINT("decompress_chunk_impl_before_reindex"); reindex_relation(out_table, 0, options); + DEBUG_WAITPOINT("decompress_chunk_impl_after_reindex"); table_close(out_rel, NoLock); table_close(in_rel, NoLock); diff --git a/tsl/test/isolation/expected/decompression_chunk_and_parallel_query.out b/tsl/test/isolation/expected/decompression_chunk_and_parallel_query.out new file mode 100644 index 000000000..b1e7d0ddc --- /dev/null +++ b/tsl/test/isolation/expected/decompression_chunk_and_parallel_query.out @@ -0,0 +1,88 @@ +Parsed test spec with 3 sessions + +starting permutation: s3_lock_decompression_locks s2_read_sensor_data s1_decompress s2_read_sensor_data s3_unlock_decompression_before_reindex_lock s2_read_sensor_data s3_unlock_decompression_after_reindex_lock +compression_status +------------------ +Compressed +(1 row) + +step s3_lock_decompression_locks: + -- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this + -- point parallel SELECTs should be possible. + SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex'); + + -- This waitpoint is defined after all locks for the decompression and the deletion + -- of the compressed chunk are requested. + SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex'); + +debug_waitpoint_enable +---------------------- + +(1 row) + +debug_waitpoint_enable +---------------------- + +(1 row) + +step s2_read_sensor_data: + SELECT FROM sensor_data; + +step s1_decompress: + SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + +step s2_read_sensor_data: + SELECT FROM sensor_data; + +step s3_unlock_decompression_before_reindex_lock: + -- Ensure that we are waiting on our debug waitpoint + -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) + -- decompress_chunk_impl_before_reindex = 3966149665. + SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; + + SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex'); + +locktype|mode |granted| objid +--------+---------+-------+---------- +advisory|ShareLock|f |3966149665 +(1 row) + +debug_waitpoint_release +----------------------- + +(1 row) + +step s2_read_sensor_data: + SELECT FROM sensor_data; + +step s3_unlock_decompression_after_reindex_lock: + -- Ensure that we are waiting on our debug waitpoint + -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) + -- decompress_chunk_impl_after_reindex = 1858017383. + SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; + + SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex'); + +locktype|mode |granted| objid +--------+---------+-------+---------- +advisory|ShareLock|f |1858017383 +(1 row) + +debug_waitpoint_release +----------------------- + +(1 row) + +step s1_decompress: <... completed> +count +----- + 1 +(1 row) + +compression_status +------------------ +Uncompressed +(1 row) + +step s2_read_sensor_data: <... completed> diff --git a/tsl/test/isolation/expected/decompression_chunk_and_parallel_query_wo_idx.out b/tsl/test/isolation/expected/decompression_chunk_and_parallel_query_wo_idx.out new file mode 100644 index 000000000..334ea727d --- /dev/null +++ b/tsl/test/isolation/expected/decompression_chunk_and_parallel_query_wo_idx.out @@ -0,0 +1,90 @@ +Parsed test spec with 3 sessions + +starting permutation: s3_lock_decompression_locks s2_read_sensor_data s1_decompress s2_read_sensor_data s3_unlock_decompression_before_reindex_lock s2_read_sensor_data s3_unlock_decompression_after_reindex_lock s2_read_sensor_data +compression_status +------------------ +Compressed +(1 row) + +step s3_lock_decompression_locks: + -- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this + -- point parallel SELECTs should be possible. + SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex'); + + -- This waitpoint is defined after all locks for the decompression and the deletion + -- of the compressed chunk are requested. + SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex'); + +debug_waitpoint_enable +---------------------- + +(1 row) + +debug_waitpoint_enable +---------------------- + +(1 row) + +step s2_read_sensor_data: + SELECT FROM sensor_data; + +step s1_decompress: + SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + +step s2_read_sensor_data: + SELECT FROM sensor_data; + +step s3_unlock_decompression_before_reindex_lock: + -- Ensure that we are waiting on our debug waitpoint + -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) + -- decompress_chunk_impl_before_reindex = 3966149665. + SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; + + SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex'); + +locktype|mode |granted| objid +--------+---------+-------+---------- +advisory|ShareLock|f |3966149665 +(1 row) + +debug_waitpoint_release +----------------------- + +(1 row) + +step s2_read_sensor_data: + SELECT FROM sensor_data; + +step s3_unlock_decompression_after_reindex_lock: + -- Ensure that we are waiting on our debug waitpoint + -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) + -- decompress_chunk_impl_after_reindex = 1858017383. + SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; + + SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex'); + +locktype|mode |granted| objid +--------+---------+-------+---------- +advisory|ShareLock|f |1858017383 +(1 row) + +debug_waitpoint_release +----------------------- + +(1 row) + +step s1_decompress: <... completed> +count +----- + 1 +(1 row) + +compression_status +------------------ +Uncompressed +(1 row) + +step s2_read_sensor_data: + SELECT FROM sensor_data; + diff --git a/tsl/test/isolation/specs/CMakeLists.txt b/tsl/test/isolation/specs/CMakeLists.txt index 9e9b98020..59303ebaf 100644 --- a/tsl/test/isolation/specs/CMakeLists.txt +++ b/tsl/test/isolation/specs/CMakeLists.txt @@ -9,7 +9,9 @@ set(TEST_TEMPLATES_MODULE_DEBUG dist_restore_point.spec.in cagg_drop_chunks.spec.in telemetry.spec.in - compression_chunk_race.spec.in) + compression_chunk_race.spec.in + decompression_chunk_and_parallel_query.in + decompression_chunk_and_parallel_query_wo_idx.in) # These tests are using markers for the isolation tests (to avoid race # conditions causing differing output), which were added after 12.7, 13.3, and diff --git a/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in new file mode 100644 index 000000000..e537d4cf4 --- /dev/null +++ b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in @@ -0,0 +1,112 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +### +# This isolation test checks that SELECT queries can be performed in parallel to +# chunk decompression operations. This version of the isolation tests creates the +# default index on the time column. See the decompression_chunk_and_parallel_query_wo_idx +# test for a version without any index. +### + +setup { + CREATE OR REPLACE FUNCTION debug_waitpoint_enable(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT + AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_enable'; + + CREATE OR REPLACE FUNCTION debug_waitpoint_release(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT + AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_release'; + + CREATE TABLE sensor_data ( + time timestamptz not null, + sensor_id integer not null, + cpu double precision null, + temperature double precision null); + + -- Create the hypertable + SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days'); + +-- SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days', create_default_indexes => FALSE); + + + -- All generated data is part of one chunk. Only one chunk is used because 'compress_chunk' is + -- used in this isolation test. In contrast to 'policy_compression_execute' all decompression + -- operations are executed in one transaction. So, processing more than one chunk with 'compress_chunk' + -- could lead to deadlocks that do not occur real-world scenarios (due to locks hold on a completely + -- decompressed chunk). + + INSERT INTO sensor_data + SELECT time + (INTERVAL '1 minute' * random()) AS time, + sensor_id, + random() AS cpu, + random()* 100 AS temperature + FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '1 minute') AS g1(time), + generate_series(1, 50, 1) AS g2(sensor_id) + ORDER BY time; + + SELECT count(*) FROM sensor_data; + + ALTER TABLE sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby = 'sensor_id, cpu'); + + SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); +} + +teardown { + DROP TABLE sensor_data; +} + +session "s1" + +step "s1_decompress" { + SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); +} + +session "s2" + +step "s2_read_sensor_data" { + SELECT FROM sensor_data; +} + +session "s3" + +step "s3_lock_decompression_locks" { + -- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this + -- point parallel SELECTs should be possible. + SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex'); + + -- This waitpoint is defined after all locks for the decompression and the deletion + -- of the compressed chunk are requested. + SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex'); +} + +step "s3_unlock_decompression_before_reindex_lock" { + -- Ensure that we are waiting on our debug waitpoint + -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) + -- decompress_chunk_impl_before_reindex = 3966149665. + SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; + + SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex'); +} + +step "s3_unlock_decompression_after_reindex_lock" { + -- Ensure that we are waiting on our debug waitpoint + -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) + -- decompress_chunk_impl_after_reindex = 1858017383. + SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; + + SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex'); +} + +# Desired execution: +# s3_lock_decompression_locks - Locks the decompression waitpoints. +# s2_read_sensor_data - Read the compressed chunk. This should be executed without blocking. +# s1_decompress - Start the decompression and block on the first waitpoint. +# s2_read_sensor_data - Read the compressed chunk again. This should be still possible without blocking. +# s3_unlock_decompression_before_reindex_lock - Releases the decompress_chunk_impl_before_reindex waitpoint. +# s1_decompress continues - The chunk is reindexed and the index is locked. +# s2_read_sensor_data - Read the chunk. This blocks due to the locked index. +# s3_unlock_decompression_after_reindex_lock - Releases the decompress_chunk_impl_after_compressed_chunk_lock. +# s1_decompress continues - Finishes the decompression operation and releases the locks. +# s2_read_sensor_data continues. +permutation "s3_lock_decompression_locks" "s2_read_sensor_data" "s1_decompress" "s2_read_sensor_data" "s3_unlock_decompression_before_reindex_lock" "s2_read_sensor_data" "s3_unlock_decompression_after_reindex_lock" diff --git a/tsl/test/isolation/specs/decompression_chunk_and_parallel_query_wo_idx.in b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query_wo_idx.in new file mode 100644 index 000000000..7cc970ed2 --- /dev/null +++ b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query_wo_idx.in @@ -0,0 +1,109 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +### +# This isolation test checks that SELECT queries can be performed in parallel to +# chunk decompression operations. This version of the isolation test creates a +# hypertable without any index. So, no locks on the index can be creates which +# leads to a different level of concurrency. +### + +setup { + CREATE OR REPLACE FUNCTION debug_waitpoint_enable(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT + AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_enable'; + + CREATE OR REPLACE FUNCTION debug_waitpoint_release(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT + AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_release'; + + CREATE TABLE sensor_data ( + time timestamptz not null, + sensor_id integer not null, + cpu double precision null, + temperature double precision null); + + -- Create the hypertable without an index on the hypertable + SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days', create_default_indexes => FALSE); + + -- All generated data is part of one chunk. Only one chunk is used because 'compress_chunk' is + -- used in this isolation test. In contrast to 'policy_compression_execute' all decompression + -- operations are executed in one transaction. So, processing more than one chunk with 'compress_chunk' + -- could lead to deadlocks that do not occur real-world scenarios (due to locks hold on a completely + -- decompressed chunk). + + INSERT INTO sensor_data + SELECT time + (INTERVAL '1 minute' * random()) AS time, + sensor_id, + random() AS cpu, + random()* 100 AS temperature + FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '1 minute') AS g1(time), + generate_series(1, 50, 1) AS g2(sensor_id) + ORDER BY time; + + SELECT count(*) FROM sensor_data; + + ALTER TABLE sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby = 'sensor_id, cpu'); + + SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); +} + +teardown { + DROP TABLE sensor_data; +} + +session "s1" + +step "s1_decompress" { + SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); +} + +session "s2" + +step "s2_read_sensor_data" { + SELECT FROM sensor_data; +} + +session "s3" + +step "s3_lock_decompression_locks" { + -- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this + -- point parallel SELECTs should be possible. + SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex'); + + -- This waitpoint is defined after all locks for the decompression and the deletion + -- of the compressed chunk are requested. + SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex'); +} + +step "s3_unlock_decompression_before_reindex_lock" { + -- Ensure that we are waiting on our debug waitpoint + -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) + -- decompress_chunk_impl_before_reindex = 3966149665. + SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; + + SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex'); +} + +step "s3_unlock_decompression_after_reindex_lock" { + -- Ensure that we are waiting on our debug waitpoint + -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) + -- decompress_chunk_impl_after_reindex = 1858017383. + SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; + + SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex'); +} + +# Desired execution (no locks should be performed in s2_read_sensor_data): +# s3_lock_decompression_locks - Locks the decompression waitpoints. +# s2_read_sensor_data - Read the compressed chunk. This should be executed without blocking. +# s1_decompress - Start the decompression and block on the first waitpoint. +# s2_read_sensor_data - Read the compressed chunk again. This should be still possible without blocking. +# s3_unlock_decompression_before_reindex_lock - Releases the decompress_chunk_impl_before_reindex waitpoint. +# s1_decompress continues - No reindex is performed due to no existing indexes +# s2_read_sensor_data - Read the chunk without any lock. +# s3_unlock_decompression_after_reindex_lock - Releases the decompress_chunk_impl_after_compressed_chunk_lock. +# s1_decompress continues - Finishes the decompression operation and releases the locks. +# s2_read_sensor_data - Read the chunk without any lock. +permutation "s3_lock_decompression_locks" "s2_read_sensor_data" "s1_decompress" "s2_read_sensor_data" "s3_unlock_decompression_before_reindex_lock" "s2_read_sensor_data" "s3_unlock_decompression_after_reindex_lock" "s2_read_sensor_data"