1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-19 12:13:24 +08:00

Fix a deadlock in chunk decompression and SELECTs

This patch fixes a deadlock between chunk decompression and SELECT
queries executed in parallel. The change in
a608d7db614c930213dee8d6a5e9d26a0259da61 requests an AccessExclusiveLock
for the decompressed chunk instead of the compressed chunk, resulting in
deadlocks.

In addition, an isolation test has been added to test that SELECT
queries on a chunk that is currently decompressed can be executed.

Fixes 
This commit is contained in:
Jan Nidzwetzki 2022-09-06 16:24:20 +02:00 committed by Jan Nidzwetzki
parent 5600fc06d6
commit de30d190e4
8 changed files with 423 additions and 4 deletions

@ -23,6 +23,7 @@ argument or resolve the type ambiguity by casting to the intended type.
* #4681 Fix compression_chunk_size primary key
* #4685 Improve chunk exclusion for space dimensions
* #4696 Report warning when enabling compression on hypertable
* #4676 Fix a deadlock when decompressing chunks and performing SELECTs
**Thanks**
* @maxtwardowski for reporting problems with chunk exclusion and space dimensions

@ -396,12 +396,20 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_
/* Recreate FK constraints, since they were dropped during compression. */
ts_chunk_create_fks(uncompressed_chunk);
/* Prevent readers from using the compressed chunk that is going to be deleted */
LockRelationOid(uncompressed_chunk->table_id, AccessExclusiveLock);
/* Delete the compressed chunk */
ts_compression_chunk_size_delete(uncompressed_chunk->fd.id);
ts_chunk_clear_compressed_chunk(uncompressed_chunk);
/*
* Lock the compressed chunk that is going to be deleted. At this point,
* the reference to the compressed chunk is already removed from the
* catalog. So, new readers do not include it in their operations.
*
* Note: Calling performMultipleDeletions in chunk_index_tuple_delete
* also requests an AccessExclusiveLock on the compressed_chunk. However,
* this call makes the lock on the chunk explicit.
*/
LockRelationOid(compressed_chunk->table_id, AccessExclusiveLock);
ts_chunk_drop(compressed_chunk, DROP_RESTRICT, -1);
/* reenable autovacuum if necessary */

@ -38,6 +38,7 @@
#include "array.h"
#include "chunk.h"
#include "debug_point.h"
#include "deltadelta.h"
#include "dictionary.h"
#include "gorilla.h"
@ -1134,7 +1135,15 @@ decompress_chunk(Oid in_table, Oid out_table)
ReindexParams params = { 0 };
ReindexParams *options = &params;
#endif
/* The reindex_relation() function creates an AccessExclusiveLock on the
* chunk index (if present). After calling this function, concurrent
* SELECTs have to wait until the index lock is released. When no
* index is present concurrent SELECTs can be still performed in
* parallel. */
DEBUG_WAITPOINT("decompress_chunk_impl_before_reindex");
reindex_relation(out_table, 0, options);
DEBUG_WAITPOINT("decompress_chunk_impl_after_reindex");
table_close(out_rel, NoLock);
table_close(in_rel, NoLock);

@ -0,0 +1,88 @@
Parsed test spec with 3 sessions
starting permutation: s3_lock_decompression_locks s2_read_sensor_data s1_decompress s2_read_sensor_data s3_unlock_decompression_before_reindex_lock s2_read_sensor_data s3_unlock_decompression_after_reindex_lock
compression_status
------------------
Compressed
(1 row)
step s3_lock_decompression_locks:
-- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this
-- point parallel SELECTs should be possible.
SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex');
-- This waitpoint is defined after all locks for the decompression and the deletion
-- of the compressed chunk are requested.
SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex');
debug_waitpoint_enable
----------------------
(1 row)
debug_waitpoint_enable
----------------------
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;
step s1_decompress:
SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
<waiting ...>
step s2_read_sensor_data:
SELECT FROM sensor_data;
step s3_unlock_decompression_before_reindex_lock:
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_before_reindex = 3966149665.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex');
locktype|mode |granted| objid
--------+---------+-------+----------
advisory|ShareLock|f |3966149665
(1 row)
debug_waitpoint_release
-----------------------
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;
<waiting ...>
step s3_unlock_decompression_after_reindex_lock:
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_after_reindex = 1858017383.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex');
locktype|mode |granted| objid
--------+---------+-------+----------
advisory|ShareLock|f |1858017383
(1 row)
debug_waitpoint_release
-----------------------
(1 row)
step s1_decompress: <... completed>
count
-----
1
(1 row)
compression_status
------------------
Uncompressed
(1 row)
step s2_read_sensor_data: <... completed>

@ -0,0 +1,90 @@
Parsed test spec with 3 sessions
starting permutation: s3_lock_decompression_locks s2_read_sensor_data s1_decompress s2_read_sensor_data s3_unlock_decompression_before_reindex_lock s2_read_sensor_data s3_unlock_decompression_after_reindex_lock s2_read_sensor_data
compression_status
------------------
Compressed
(1 row)
step s3_lock_decompression_locks:
-- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this
-- point parallel SELECTs should be possible.
SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex');
-- This waitpoint is defined after all locks for the decompression and the deletion
-- of the compressed chunk are requested.
SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex');
debug_waitpoint_enable
----------------------
(1 row)
debug_waitpoint_enable
----------------------
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;
step s1_decompress:
SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
<waiting ...>
step s2_read_sensor_data:
SELECT FROM sensor_data;
step s3_unlock_decompression_before_reindex_lock:
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_before_reindex = 3966149665.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex');
locktype|mode |granted| objid
--------+---------+-------+----------
advisory|ShareLock|f |3966149665
(1 row)
debug_waitpoint_release
-----------------------
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;
step s3_unlock_decompression_after_reindex_lock:
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_after_reindex = 1858017383.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex');
locktype|mode |granted| objid
--------+---------+-------+----------
advisory|ShareLock|f |1858017383
(1 row)
debug_waitpoint_release
-----------------------
(1 row)
step s1_decompress: <... completed>
count
-----
1
(1 row)
compression_status
------------------
Uncompressed
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;

@ -9,7 +9,9 @@ set(TEST_TEMPLATES_MODULE_DEBUG
dist_restore_point.spec.in
cagg_drop_chunks.spec.in
telemetry.spec.in
compression_chunk_race.spec.in)
compression_chunk_race.spec.in
decompression_chunk_and_parallel_query.in
decompression_chunk_and_parallel_query_wo_idx.in)
# These tests are using markers for the isolation tests (to avoid race
# conditions causing differing output), which were added after 12.7, 13.3, and

@ -0,0 +1,112 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.
###
# This isolation test checks that SELECT queries can be performed in parallel to
# chunk decompression operations. This version of the isolation tests creates the
# default index on the time column. See the decompression_chunk_and_parallel_query_wo_idx
# test for a version without any index.
###
setup {
CREATE OR REPLACE FUNCTION debug_waitpoint_enable(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_enable';
CREATE OR REPLACE FUNCTION debug_waitpoint_release(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_release';
CREATE TABLE sensor_data (
time timestamptz not null,
sensor_id integer not null,
cpu double precision null,
temperature double precision null);
-- Create the hypertable
SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days');
-- SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days', create_default_indexes => FALSE);
-- All generated data is part of one chunk. Only one chunk is used because 'compress_chunk' is
-- used in this isolation test. In contrast to 'policy_compression_execute' all decompression
-- operations are executed in one transaction. So, processing more than one chunk with 'compress_chunk'
-- could lead to deadlocks that do not occur real-world scenarios (due to locks hold on a completely
-- decompressed chunk).
INSERT INTO sensor_data
SELECT time + (INTERVAL '1 minute' * random()) AS time,
sensor_id,
random() AS cpu,
random()* 100 AS temperature
FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '1 minute') AS g1(time),
generate_series(1, 50, 1) AS g2(sensor_id)
ORDER BY time;
SELECT count(*) FROM sensor_data;
ALTER TABLE sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby = 'sensor_id, cpu');
SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
}
teardown {
DROP TABLE sensor_data;
}
session "s1"
step "s1_decompress" {
SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
}
session "s2"
step "s2_read_sensor_data" {
SELECT FROM sensor_data;
}
session "s3"
step "s3_lock_decompression_locks" {
-- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this
-- point parallel SELECTs should be possible.
SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex');
-- This waitpoint is defined after all locks for the decompression and the deletion
-- of the compressed chunk are requested.
SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex');
}
step "s3_unlock_decompression_before_reindex_lock" {
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_before_reindex = 3966149665.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex');
}
step "s3_unlock_decompression_after_reindex_lock" {
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_after_reindex = 1858017383.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex');
}
# Desired execution:
# s3_lock_decompression_locks - Locks the decompression waitpoints.
# s2_read_sensor_data - Read the compressed chunk. This should be executed without blocking.
# s1_decompress - Start the decompression and block on the first waitpoint.
# s2_read_sensor_data - Read the compressed chunk again. This should be still possible without blocking.
# s3_unlock_decompression_before_reindex_lock - Releases the decompress_chunk_impl_before_reindex waitpoint.
# s1_decompress continues - The chunk is reindexed and the index is locked.
# s2_read_sensor_data - Read the chunk. This blocks due to the locked index.
# s3_unlock_decompression_after_reindex_lock - Releases the decompress_chunk_impl_after_compressed_chunk_lock.
# s1_decompress continues - Finishes the decompression operation and releases the locks.
# s2_read_sensor_data continues.
permutation "s3_lock_decompression_locks" "s2_read_sensor_data" "s1_decompress" "s2_read_sensor_data" "s3_unlock_decompression_before_reindex_lock" "s2_read_sensor_data" "s3_unlock_decompression_after_reindex_lock"

@ -0,0 +1,109 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.
###
# This isolation test checks that SELECT queries can be performed in parallel to
# chunk decompression operations. This version of the isolation test creates a
# hypertable without any index. So, no locks on the index can be creates which
# leads to a different level of concurrency.
###
setup {
CREATE OR REPLACE FUNCTION debug_waitpoint_enable(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_enable';
CREATE OR REPLACE FUNCTION debug_waitpoint_release(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_release';
CREATE TABLE sensor_data (
time timestamptz not null,
sensor_id integer not null,
cpu double precision null,
temperature double precision null);
-- Create the hypertable without an index on the hypertable
SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days', create_default_indexes => FALSE);
-- All generated data is part of one chunk. Only one chunk is used because 'compress_chunk' is
-- used in this isolation test. In contrast to 'policy_compression_execute' all decompression
-- operations are executed in one transaction. So, processing more than one chunk with 'compress_chunk'
-- could lead to deadlocks that do not occur real-world scenarios (due to locks hold on a completely
-- decompressed chunk).
INSERT INTO sensor_data
SELECT time + (INTERVAL '1 minute' * random()) AS time,
sensor_id,
random() AS cpu,
random()* 100 AS temperature
FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '1 minute') AS g1(time),
generate_series(1, 50, 1) AS g2(sensor_id)
ORDER BY time;
SELECT count(*) FROM sensor_data;
ALTER TABLE sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby = 'sensor_id, cpu');
SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
}
teardown {
DROP TABLE sensor_data;
}
session "s1"
step "s1_decompress" {
SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
}
session "s2"
step "s2_read_sensor_data" {
SELECT FROM sensor_data;
}
session "s3"
step "s3_lock_decompression_locks" {
-- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this
-- point parallel SELECTs should be possible.
SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex');
-- This waitpoint is defined after all locks for the decompression and the deletion
-- of the compressed chunk are requested.
SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex');
}
step "s3_unlock_decompression_before_reindex_lock" {
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_before_reindex = 3966149665.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex');
}
step "s3_unlock_decompression_after_reindex_lock" {
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_after_reindex = 1858017383.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex');
}
# Desired execution (no locks should be performed in s2_read_sensor_data):
# s3_lock_decompression_locks - Locks the decompression waitpoints.
# s2_read_sensor_data - Read the compressed chunk. This should be executed without blocking.
# s1_decompress - Start the decompression and block on the first waitpoint.
# s2_read_sensor_data - Read the compressed chunk again. This should be still possible without blocking.
# s3_unlock_decompression_before_reindex_lock - Releases the decompress_chunk_impl_before_reindex waitpoint.
# s1_decompress continues - No reindex is performed due to no existing indexes
# s2_read_sensor_data - Read the chunk without any lock.
# s3_unlock_decompression_after_reindex_lock - Releases the decompress_chunk_impl_after_compressed_chunk_lock.
# s1_decompress continues - Finishes the decompression operation and releases the locks.
# s2_read_sensor_data - Read the chunk without any lock.
permutation "s3_lock_decompression_locks" "s2_read_sensor_data" "s1_decompress" "s2_read_sensor_data" "s3_unlock_decompression_before_reindex_lock" "s2_read_sensor_data" "s3_unlock_decompression_after_reindex_lock" "s2_read_sensor_data"