From 623381ce99978b7f05f32ec1f5c117345ef6cd8e Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 29 Aug 2023 12:39:17 +0200 Subject: [PATCH] Reread the catalog data after locking the chunk The compression status can change, and this is prevented by locking, so to keep our data consistent, we should reread the chunk metadata after we have locked it. Currently we have some code in place that masks this inconsistency by rereading the metadata in another place. This code is removed. --- .../workflows/sanitizer-build-and-test.yaml | 1 + src/chunk_scan.c | 156 +++++++----------- src/planner/planner.c | 5 - src/planner/planner.h | 1 - .../concurrent_query_and_drop_chunks.out | 4 +- .../concurrent_query_and_drop_chunks.spec | 6 +- tsl/src/planner.c | 2 +- .../expected/concurrent_decompress_update.out | 38 +++++ tsl/test/isolation/specs/CMakeLists.txt | 4 + .../specs/concurrent_decompress_update.spec | 78 +++++++++ .../decompression_chunk_and_parallel_query.in | 4 +- 11 files changed, 191 insertions(+), 108 deletions(-) create mode 100644 tsl/test/isolation/expected/concurrent_decompress_update.out create mode 100644 tsl/test/isolation/specs/concurrent_decompress_update.spec diff --git a/.github/workflows/sanitizer-build-and-test.yaml b/.github/workflows/sanitizer-build-and-test.yaml index 1095c4708..8b22d99ac 100644 --- a/.github/workflows/sanitizer-build-and-test.yaml +++ b/.github/workflows/sanitizer-build-and-test.yaml @@ -8,6 +8,7 @@ name: Sanitizer test branches: - main - prerelease_test + - trigger/sanitizer pull_request: paths: .github/workflows/sanitizer-build-and-test.yaml diff --git a/src/chunk_scan.c b/src/chunk_scan.c index 0a561fb5f..10c576dd4 100644 --- a/src/chunk_scan.c +++ b/src/chunk_scan.c @@ -43,123 +43,98 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned { MemoryContext work_mcxt = AllocSetContextCreate(CurrentMemoryContext, "chunk-scan-work", ALLOCSET_DEFAULT_SIZES); - MemoryContext per_tuple_mcxt = - AllocSetContextCreate(work_mcxt, "chunk-scan-per-tuple", ALLOCSET_SMALL_SIZES); - MemoryContext orig_mcxt; Chunk **locked_chunks = NULL; - Chunk **unlocked_chunks = NULL; int locked_chunk_count = 0; - int unlocked_chunk_count = 0; ListCell *lc; int remote_chunk_count = 0; Assert(OidIsValid(hs->main_table_relid)); - orig_mcxt = MemoryContextSwitchTo(work_mcxt); + MemoryContext orig_mcxt = MemoryContextSwitchTo(work_mcxt); /* * For each matching chunk, fill in the metadata from the "chunk" table. * Make sure to filter out "dropped" chunks. */ ScanIterator chunk_it = ts_chunk_scan_iterator_create(orig_mcxt); - unlocked_chunks = MemoryContextAlloc(work_mcxt, sizeof(Chunk *) * list_length(chunk_ids)); + locked_chunks = MemoryContextAlloc(orig_mcxt, sizeof(Chunk *) * list_length(chunk_ids)); foreach (lc, chunk_ids) { int chunk_id = lfirst_int(lc); - TupleInfo *ti; Assert(CurrentMemoryContext == work_mcxt); ts_chunk_scan_iterator_set_chunk_id(&chunk_it, chunk_id); ts_scan_iterator_start_or_restart_scan(&chunk_it); - ti = ts_scan_iterator_next(&chunk_it); - - if (ti) + TupleInfo *ti = ts_scan_iterator_next(&chunk_it); + if (ti == NULL) { - bool isnull; - Datum datum = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull); - bool is_dropped = isnull ? false : DatumGetBool(datum); - - MemoryContextSwitchTo(per_tuple_mcxt); - MemoryContextReset(per_tuple_mcxt); - - if (!is_dropped) - { - Chunk *chunk = MemoryContextAllocZero(orig_mcxt, sizeof(Chunk)); - - MemoryContext old_mcxt = MemoryContextSwitchTo(ti->mctx); - ts_chunk_formdata_fill(&chunk->fd, ti); - MemoryContextSwitchTo(old_mcxt); - - chunk->constraints = NULL; - chunk->cube = NULL; - chunk->hypertable_relid = hs->main_table_relid; - - unlocked_chunks[unlocked_chunk_count] = chunk; - unlocked_chunk_count++; - } - - MemoryContextSwitchTo(work_mcxt); - /* Only one chunk should match */ - Assert(ts_scan_iterator_next(&chunk_it) == NULL); + continue; } + bool isnull; + Datum datum = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull); + const bool is_dropped = isnull ? false : DatumGetBool(datum); + if (is_dropped) + { + continue; + } + + /* We found a chunk that is not dropped. First, try to lock it. */ + Name schema_name = DatumGetName(slot_getattr(ti->slot, Anum_chunk_schema_name, &isnull)); + Assert(!isnull); + Name table_name = DatumGetName(slot_getattr(ti->slot, Anum_chunk_table_name, &isnull)); + Assert(!isnull); + + Oid chunk_reloid = ts_get_relation_relid(NameStr(*schema_name), + NameStr(*table_name), + /* return_invalid = */ false); + Assert(OidIsValid(chunk_reloid)); + + /* Only one chunk should match */ + Assert(ts_scan_iterator_next(&chunk_it) == NULL); + + DEBUG_WAITPOINT("hypertable_expansion_before_lock_chunk"); + if (!ts_chunk_lock_if_exists(chunk_reloid, AccessShareLock)) + { + continue; + } + + /* + * Now after we have locked the chunk, we have to reread its metadata. + * It might have been modified concurrently by decompression, for + * example. + */ + ts_chunk_scan_iterator_set_chunk_id(&chunk_it, chunk_id); + ts_scan_iterator_start_or_restart_scan(&chunk_it); + ti = ts_scan_iterator_next(&chunk_it); + Assert(ti != NULL); + Chunk *chunk = MemoryContextAllocZero(orig_mcxt, sizeof(Chunk)); + + ts_chunk_formdata_fill(&chunk->fd, ti); + + chunk->constraints = NULL; + chunk->cube = NULL; + chunk->hypertable_relid = hs->main_table_relid; + chunk->table_id = chunk_reloid; + + locked_chunks[locked_chunk_count] = chunk; + locked_chunk_count++; + + /* Only one chunk should match */ + Assert(ts_scan_iterator_next(&chunk_it) == NULL); } ts_scan_iterator_close(&chunk_it); - Assert(unlocked_chunk_count == 0 || unlocked_chunks != NULL); - Assert(unlocked_chunk_count <= list_length(chunk_ids)); + Assert(locked_chunk_count == 0 || locked_chunks != NULL); + Assert(locked_chunk_count <= list_length(chunk_ids)); Assert(CurrentMemoryContext == work_mcxt); - DEBUG_WAITPOINT("expanded_chunks"); - - /* - * Batch the lookups to each catalog cache to have more favorable access - * patterns. - * Schema oid isn't likely to change, so cache it. - */ - char *last_schema_name = NULL; - for (int i = 0; i < unlocked_chunk_count; i++) + for (int i = 0; i < locked_chunk_count; i++) { - Chunk *chunk = unlocked_chunks[i]; - - char *current_schema_name = NameStr(chunk->fd.schema_name); - if (last_schema_name == NULL || strcmp(last_schema_name, current_schema_name) != 0) - { - last_schema_name = current_schema_name; - } - - chunk->table_id = - ts_get_relation_relid(last_schema_name, NameStr(chunk->fd.table_name), false); - Assert(OidIsValid(chunk->table_id)); - } - - for (int i = 0; i < unlocked_chunk_count; i++) - { - Chunk *chunk = unlocked_chunks[i]; + Chunk *chunk = locked_chunks[i]; chunk->relkind = get_rel_relkind(chunk->table_id); - } - - /* - * Lock the chunks. - */ - for (int i = 0; i < unlocked_chunk_count; i++) - { - Chunk *chunk = unlocked_chunks[i]; - - if (ts_chunk_lock_if_exists(chunk->table_id, AccessShareLock)) - { - /* Lazy initialize the chunks array */ - if (NULL == locked_chunks) - locked_chunks = - MemoryContextAlloc(orig_mcxt, sizeof(Chunk *) * unlocked_chunk_count); - - locked_chunks[locked_chunk_count] = chunk; - - if (chunk->relkind == RELKIND_FOREIGN_TABLE) - remote_chunk_count++; - - locked_chunk_count++; - } + if (chunk->relkind == RELKIND_FOREIGN_TABLE) + remote_chunk_count++; } /* @@ -178,9 +153,7 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned while (ts_scan_iterator_next(&constr_it) != NULL) { TupleInfo *constr_ti = ts_scan_iterator_tuple_info(&constr_it); - MemoryContextSwitchTo(per_tuple_mcxt); ts_chunk_constraints_add_from_tuple(chunk->constraints, constr_ti); - MemoryContextSwitchTo(work_mcxt); } } ts_scan_iterator_close(&constr_it); @@ -264,9 +237,6 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned MemoryContext old_mcxt; HeapTuple tuple; - MemoryContextSwitchTo(per_tuple_mcxt); - MemoryContextReset(per_tuple_mcxt); - tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); form = (Form_chunk_data_node) GETSTRUCT(tuple); old_mcxt = MemoryContextSwitchTo(ti->mctx); @@ -280,8 +250,6 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned if (should_free) heap_freetuple(tuple); - - MemoryContextSwitchTo(work_mcxt); } } } diff --git a/src/planner/planner.c b/src/planner/planner.c index e080bbf61..65ebeef0a 100644 --- a/src/planner/planner.c +++ b/src/planner/planner.c @@ -1320,10 +1320,6 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID) { - Relation uncompressed_chunk = table_open(relation_objectid, NoLock); - - ts_get_private_reloptinfo(rel)->compressed = true; - /* Planning indexes is expensive, and if this is a fully compressed chunk, we * know we'll never need to use indexes on the uncompressed version, since * all the data is in the compressed chunk anyway. Therefore, it is much @@ -1334,7 +1330,6 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo */ if (!ts_chunk_is_partial(chunk)) rel->indexlist = NIL; - table_close(uncompressed_chunk, NoLock); } } break; diff --git a/src/planner/planner.h b/src/planner/planner.h index 5b453dd3f..0d8598a4f 100644 --- a/src/planner/planner.h +++ b/src/planner/planner.h @@ -33,7 +33,6 @@ typedef struct TimescaleDBPrivate /* attno of the time dimension in the parent table if appends are ordered */ int order_attno; List *nested_oids; - bool compressed; List *chunk_oids; List *serverids; Relids server_relids; diff --git a/test/isolation/expected/concurrent_query_and_drop_chunks.out b/test/isolation/expected/concurrent_query_and_drop_chunks.out index f2534fed4..aa0d48539 100644 --- a/test/isolation/expected/concurrent_query_and_drop_chunks.out +++ b/test/isolation/expected/concurrent_query_and_drop_chunks.out @@ -8,7 +8,7 @@ Fri Jan 03 10:30:00 2020 PST| 1| 1 Sun Jan 03 10:30:00 2021 PST| 2| 2 (2 rows) -step s1_wp_enable: SELECT debug_waitpoint_enable('expanded_chunks'); +step s1_wp_enable: SELECT debug_waitpoint_enable('hypertable_expansion_before_lock_chunk'); debug_waitpoint_enable ---------------------- @@ -21,7 +21,7 @@ count 1 (1 row) -step s1_wp_release: SELECT debug_waitpoint_release('expanded_chunks'); +step s1_wp_release: SELECT debug_waitpoint_release('hypertable_expansion_before_lock_chunk'); debug_waitpoint_release ----------------------- diff --git a/test/isolation/specs/concurrent_query_and_drop_chunks.spec b/test/isolation/specs/concurrent_query_and_drop_chunks.spec index 610916c4c..96769feae 100644 --- a/test/isolation/specs/concurrent_query_and_drop_chunks.spec +++ b/test/isolation/specs/concurrent_query_and_drop_chunks.spec @@ -19,9 +19,9 @@ teardown { # acqurired, the chunk should also be ignored. session "s1" -step "s1_wp_enable" { SELECT debug_waitpoint_enable('expanded_chunks'); } -step "s1_wp_release" { SELECT debug_waitpoint_release('expanded_chunks'); } -step "s1_drop_chunks" { SELECT count(*) FROM drop_chunks('measurements', TIMESTAMPTZ '2020-03-01'); } +step "s1_wp_enable" { SELECT debug_waitpoint_enable('hypertable_expansion_before_lock_chunk'); } +step "s1_wp_release" { SELECT debug_waitpoint_release('hypertable_expansion_before_lock_chunk'); } +step "s1_drop_chunks" { SELECT count(*) FROM drop_chunks('measurements', TIMESTAMPTZ '2020-03-01'); } session "s2" step "s2_show_num_chunks" { SELECT count(*) FROM show_chunks('measurements') ORDER BY 1; } diff --git a/tsl/src/planner.c b/tsl/src/planner.c index 074fb2de7..bf30fca5d 100644 --- a/tsl/src/planner.c +++ b/tsl/src/planner.c @@ -125,7 +125,7 @@ tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeT if (ts_guc_enable_transparent_decompression && ht && (rel->reloptkind == RELOPT_OTHER_MEMBER_REL || (rel->reloptkind == RELOPT_BASEREL && ts_rte_is_marked_for_expansion(rte))) && - TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht) && fdw_private != NULL && fdw_private->compressed) + TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht)) { if (fdw_private->cached_chunk_struct == NULL) { diff --git a/tsl/test/isolation/expected/concurrent_decompress_update.out b/tsl/test/isolation/expected/concurrent_decompress_update.out new file mode 100644 index 000000000..c6fc9223a --- /dev/null +++ b/tsl/test/isolation/expected/concurrent_decompress_update.out @@ -0,0 +1,38 @@ +Parsed test spec with 2 sessions + +starting permutation: s2_explain_update s1_begin s1_decompress s2_explain_update s1_commit s2_explain_update +compression_status +------------------ +Compressed +(1 row) + +step s2_explain_update: + UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111; + +step s1_begin: + BEGIN; + +step s1_decompress: + SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + +count +----- + 1 +(1 row) + +compression_status +------------------ +Uncompressed +(1 row) + +step s2_explain_update: + UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111; + +step s1_commit: + COMMIT; + +step s2_explain_update: <... completed> +step s2_explain_update: + UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111; + diff --git a/tsl/test/isolation/specs/CMakeLists.txt b/tsl/test/isolation/specs/CMakeLists.txt index 6f381b3aa..db5e80c29 100644 --- a/tsl/test/isolation/specs/CMakeLists.txt +++ b/tsl/test/isolation/specs/CMakeLists.txt @@ -18,6 +18,10 @@ list( cagg_concurrent_refresh_dist_ht.spec deadlock_drop_chunks_compress.spec) +if(PG_VERSION VERSION_GREATER_EQUAL "14.0") + list(APPEND TEST_FILES concurrent_decompress_update.spec) +endif() + if(CMAKE_BUILD_TYPE MATCHES Debug) list(APPEND TEST_TEMPLATES_MODULE ${TEST_TEMPLATES_MODULE_DEBUG}) list( diff --git a/tsl/test/isolation/specs/concurrent_decompress_update.spec b/tsl/test/isolation/specs/concurrent_decompress_update.spec new file mode 100644 index 000000000..10bf6b5f6 --- /dev/null +++ b/tsl/test/isolation/specs/concurrent_decompress_update.spec @@ -0,0 +1,78 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +### +# This isolation test checks that SELECT queries can be performed in parallel to +# chunk decompression operations. This version of the isolation tests creates the +# default index on the time column. See the decompression_chunk_and_parallel_query_wo_idx +# test for a version without any index. +### + +setup { + + CREATE TABLE sensor_data ( + time timestamptz not null, + sensor_id integer not null, + cpu double precision null, + temperature double precision null); + + -- Create the hypertable + SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days'); + +-- SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days', create_default_indexes => FALSE); + + + -- All generated data is part of one chunk. Only one chunk is used because 'compress_chunk' is + -- used in this isolation test. In contrast to 'policy_compression_execute' all decompression + -- operations are executed in one transaction. So, processing more than one chunk with 'compress_chunk' + -- could lead to deadlocks that do not occur real-world scenarios (due to locks hold on a completely + -- decompressed chunk). + + INSERT INTO sensor_data + SELECT time + (INTERVAL '1 minute' * random()) AS time, + sensor_id, + random() AS cpu, + random()* 100 AS temperature + FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '5 minute') AS g1(time), + generate_series(1, 10, 1) AS g2(sensor_id) + ORDER BY time; + + SELECT count(*) FROM sensor_data; + + ALTER TABLE sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby = 'sensor_id'); + + SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); +} + +teardown { + DROP TABLE sensor_data; +} + +session "s1" + +step "s1_begin" { + BEGIN; +} + +step "s1_decompress" { + SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); +} + +step "s1_commit" { + COMMIT; +} + +session "s2" + +step "s2_explain_update" { + UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111; +} + +# UPDATE/DELETE queries don't use TimescaleDB hypertable expansion, and use the +# Postgres expansion code, which might have different locking behavior. Test it +# as well. +permutation "s2_explain_update" "s1_begin" "s1_decompress" "s2_explain_update" "s1_commit" "s2_explain_update" + diff --git a/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in index 9e37cfdbf..784a908c0 100644 --- a/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in +++ b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in @@ -34,8 +34,8 @@ setup { sensor_id, random() AS cpu, random()* 100 AS temperature - FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '1 minute') AS g1(time), - generate_series(1, 50, 1) AS g2(sensor_id) + FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '5 minute') AS g1(time), + generate_series(1, 10, 1) AS g2(sensor_id) ORDER BY time; SELECT count(*) FROM sensor_data;