diff --git a/.github/workflows/sanitizer-build-and-test.yaml b/.github/workflows/sanitizer-build-and-test.yaml index 1095c4708..8b22d99ac 100644 --- a/.github/workflows/sanitizer-build-and-test.yaml +++ b/.github/workflows/sanitizer-build-and-test.yaml @@ -8,6 +8,7 @@ name: Sanitizer test branches: - main - prerelease_test + - trigger/sanitizer pull_request: paths: .github/workflows/sanitizer-build-and-test.yaml diff --git a/src/chunk_scan.c b/src/chunk_scan.c index 0a561fb5f..10c576dd4 100644 --- a/src/chunk_scan.c +++ b/src/chunk_scan.c @@ -43,123 +43,98 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned { MemoryContext work_mcxt = AllocSetContextCreate(CurrentMemoryContext, "chunk-scan-work", ALLOCSET_DEFAULT_SIZES); - MemoryContext per_tuple_mcxt = - AllocSetContextCreate(work_mcxt, "chunk-scan-per-tuple", ALLOCSET_SMALL_SIZES); - MemoryContext orig_mcxt; Chunk **locked_chunks = NULL; - Chunk **unlocked_chunks = NULL; int locked_chunk_count = 0; - int unlocked_chunk_count = 0; ListCell *lc; int remote_chunk_count = 0; Assert(OidIsValid(hs->main_table_relid)); - orig_mcxt = MemoryContextSwitchTo(work_mcxt); + MemoryContext orig_mcxt = MemoryContextSwitchTo(work_mcxt); /* * For each matching chunk, fill in the metadata from the "chunk" table. * Make sure to filter out "dropped" chunks. */ ScanIterator chunk_it = ts_chunk_scan_iterator_create(orig_mcxt); - unlocked_chunks = MemoryContextAlloc(work_mcxt, sizeof(Chunk *) * list_length(chunk_ids)); + locked_chunks = MemoryContextAlloc(orig_mcxt, sizeof(Chunk *) * list_length(chunk_ids)); foreach (lc, chunk_ids) { int chunk_id = lfirst_int(lc); - TupleInfo *ti; Assert(CurrentMemoryContext == work_mcxt); ts_chunk_scan_iterator_set_chunk_id(&chunk_it, chunk_id); ts_scan_iterator_start_or_restart_scan(&chunk_it); - ti = ts_scan_iterator_next(&chunk_it); - - if (ti) + TupleInfo *ti = ts_scan_iterator_next(&chunk_it); + if (ti == NULL) { - bool isnull; - Datum datum = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull); - bool is_dropped = isnull ? false : DatumGetBool(datum); - - MemoryContextSwitchTo(per_tuple_mcxt); - MemoryContextReset(per_tuple_mcxt); - - if (!is_dropped) - { - Chunk *chunk = MemoryContextAllocZero(orig_mcxt, sizeof(Chunk)); - - MemoryContext old_mcxt = MemoryContextSwitchTo(ti->mctx); - ts_chunk_formdata_fill(&chunk->fd, ti); - MemoryContextSwitchTo(old_mcxt); - - chunk->constraints = NULL; - chunk->cube = NULL; - chunk->hypertable_relid = hs->main_table_relid; - - unlocked_chunks[unlocked_chunk_count] = chunk; - unlocked_chunk_count++; - } - - MemoryContextSwitchTo(work_mcxt); - /* Only one chunk should match */ - Assert(ts_scan_iterator_next(&chunk_it) == NULL); + continue; } + bool isnull; + Datum datum = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull); + const bool is_dropped = isnull ? false : DatumGetBool(datum); + if (is_dropped) + { + continue; + } + + /* We found a chunk that is not dropped. First, try to lock it. */ + Name schema_name = DatumGetName(slot_getattr(ti->slot, Anum_chunk_schema_name, &isnull)); + Assert(!isnull); + Name table_name = DatumGetName(slot_getattr(ti->slot, Anum_chunk_table_name, &isnull)); + Assert(!isnull); + + Oid chunk_reloid = ts_get_relation_relid(NameStr(*schema_name), + NameStr(*table_name), + /* return_invalid = */ false); + Assert(OidIsValid(chunk_reloid)); + + /* Only one chunk should match */ + Assert(ts_scan_iterator_next(&chunk_it) == NULL); + + DEBUG_WAITPOINT("hypertable_expansion_before_lock_chunk"); + if (!ts_chunk_lock_if_exists(chunk_reloid, AccessShareLock)) + { + continue; + } + + /* + * Now after we have locked the chunk, we have to reread its metadata. + * It might have been modified concurrently by decompression, for + * example. + */ + ts_chunk_scan_iterator_set_chunk_id(&chunk_it, chunk_id); + ts_scan_iterator_start_or_restart_scan(&chunk_it); + ti = ts_scan_iterator_next(&chunk_it); + Assert(ti != NULL); + Chunk *chunk = MemoryContextAllocZero(orig_mcxt, sizeof(Chunk)); + + ts_chunk_formdata_fill(&chunk->fd, ti); + + chunk->constraints = NULL; + chunk->cube = NULL; + chunk->hypertable_relid = hs->main_table_relid; + chunk->table_id = chunk_reloid; + + locked_chunks[locked_chunk_count] = chunk; + locked_chunk_count++; + + /* Only one chunk should match */ + Assert(ts_scan_iterator_next(&chunk_it) == NULL); } ts_scan_iterator_close(&chunk_it); - Assert(unlocked_chunk_count == 0 || unlocked_chunks != NULL); - Assert(unlocked_chunk_count <= list_length(chunk_ids)); + Assert(locked_chunk_count == 0 || locked_chunks != NULL); + Assert(locked_chunk_count <= list_length(chunk_ids)); Assert(CurrentMemoryContext == work_mcxt); - DEBUG_WAITPOINT("expanded_chunks"); - - /* - * Batch the lookups to each catalog cache to have more favorable access - * patterns. - * Schema oid isn't likely to change, so cache it. - */ - char *last_schema_name = NULL; - for (int i = 0; i < unlocked_chunk_count; i++) + for (int i = 0; i < locked_chunk_count; i++) { - Chunk *chunk = unlocked_chunks[i]; - - char *current_schema_name = NameStr(chunk->fd.schema_name); - if (last_schema_name == NULL || strcmp(last_schema_name, current_schema_name) != 0) - { - last_schema_name = current_schema_name; - } - - chunk->table_id = - ts_get_relation_relid(last_schema_name, NameStr(chunk->fd.table_name), false); - Assert(OidIsValid(chunk->table_id)); - } - - for (int i = 0; i < unlocked_chunk_count; i++) - { - Chunk *chunk = unlocked_chunks[i]; + Chunk *chunk = locked_chunks[i]; chunk->relkind = get_rel_relkind(chunk->table_id); - } - - /* - * Lock the chunks. - */ - for (int i = 0; i < unlocked_chunk_count; i++) - { - Chunk *chunk = unlocked_chunks[i]; - - if (ts_chunk_lock_if_exists(chunk->table_id, AccessShareLock)) - { - /* Lazy initialize the chunks array */ - if (NULL == locked_chunks) - locked_chunks = - MemoryContextAlloc(orig_mcxt, sizeof(Chunk *) * unlocked_chunk_count); - - locked_chunks[locked_chunk_count] = chunk; - - if (chunk->relkind == RELKIND_FOREIGN_TABLE) - remote_chunk_count++; - - locked_chunk_count++; - } + if (chunk->relkind == RELKIND_FOREIGN_TABLE) + remote_chunk_count++; } /* @@ -178,9 +153,7 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned while (ts_scan_iterator_next(&constr_it) != NULL) { TupleInfo *constr_ti = ts_scan_iterator_tuple_info(&constr_it); - MemoryContextSwitchTo(per_tuple_mcxt); ts_chunk_constraints_add_from_tuple(chunk->constraints, constr_ti); - MemoryContextSwitchTo(work_mcxt); } } ts_scan_iterator_close(&constr_it); @@ -264,9 +237,6 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned MemoryContext old_mcxt; HeapTuple tuple; - MemoryContextSwitchTo(per_tuple_mcxt); - MemoryContextReset(per_tuple_mcxt); - tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); form = (Form_chunk_data_node) GETSTRUCT(tuple); old_mcxt = MemoryContextSwitchTo(ti->mctx); @@ -280,8 +250,6 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned if (should_free) heap_freetuple(tuple); - - MemoryContextSwitchTo(work_mcxt); } } } diff --git a/src/planner/planner.c b/src/planner/planner.c index e080bbf61..65ebeef0a 100644 --- a/src/planner/planner.c +++ b/src/planner/planner.c @@ -1320,10 +1320,6 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID) { - Relation uncompressed_chunk = table_open(relation_objectid, NoLock); - - ts_get_private_reloptinfo(rel)->compressed = true; - /* Planning indexes is expensive, and if this is a fully compressed chunk, we * know we'll never need to use indexes on the uncompressed version, since * all the data is in the compressed chunk anyway. Therefore, it is much @@ -1334,7 +1330,6 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo */ if (!ts_chunk_is_partial(chunk)) rel->indexlist = NIL; - table_close(uncompressed_chunk, NoLock); } } break; diff --git a/src/planner/planner.h b/src/planner/planner.h index 5b453dd3f..0d8598a4f 100644 --- a/src/planner/planner.h +++ b/src/planner/planner.h @@ -33,7 +33,6 @@ typedef struct TimescaleDBPrivate /* attno of the time dimension in the parent table if appends are ordered */ int order_attno; List *nested_oids; - bool compressed; List *chunk_oids; List *serverids; Relids server_relids; diff --git a/test/isolation/expected/concurrent_query_and_drop_chunks.out b/test/isolation/expected/concurrent_query_and_drop_chunks.out index f2534fed4..aa0d48539 100644 --- a/test/isolation/expected/concurrent_query_and_drop_chunks.out +++ b/test/isolation/expected/concurrent_query_and_drop_chunks.out @@ -8,7 +8,7 @@ Fri Jan 03 10:30:00 2020 PST| 1| 1 Sun Jan 03 10:30:00 2021 PST| 2| 2 (2 rows) -step s1_wp_enable: SELECT debug_waitpoint_enable('expanded_chunks'); +step s1_wp_enable: SELECT debug_waitpoint_enable('hypertable_expansion_before_lock_chunk'); debug_waitpoint_enable ---------------------- @@ -21,7 +21,7 @@ count 1 (1 row) -step s1_wp_release: SELECT debug_waitpoint_release('expanded_chunks'); +step s1_wp_release: SELECT debug_waitpoint_release('hypertable_expansion_before_lock_chunk'); debug_waitpoint_release ----------------------- diff --git a/test/isolation/specs/concurrent_query_and_drop_chunks.spec b/test/isolation/specs/concurrent_query_and_drop_chunks.spec index 610916c4c..96769feae 100644 --- a/test/isolation/specs/concurrent_query_and_drop_chunks.spec +++ b/test/isolation/specs/concurrent_query_and_drop_chunks.spec @@ -19,9 +19,9 @@ teardown { # acqurired, the chunk should also be ignored. session "s1" -step "s1_wp_enable" { SELECT debug_waitpoint_enable('expanded_chunks'); } -step "s1_wp_release" { SELECT debug_waitpoint_release('expanded_chunks'); } -step "s1_drop_chunks" { SELECT count(*) FROM drop_chunks('measurements', TIMESTAMPTZ '2020-03-01'); } +step "s1_wp_enable" { SELECT debug_waitpoint_enable('hypertable_expansion_before_lock_chunk'); } +step "s1_wp_release" { SELECT debug_waitpoint_release('hypertable_expansion_before_lock_chunk'); } +step "s1_drop_chunks" { SELECT count(*) FROM drop_chunks('measurements', TIMESTAMPTZ '2020-03-01'); } session "s2" step "s2_show_num_chunks" { SELECT count(*) FROM show_chunks('measurements') ORDER BY 1; } diff --git a/tsl/src/planner.c b/tsl/src/planner.c index 074fb2de7..bf30fca5d 100644 --- a/tsl/src/planner.c +++ b/tsl/src/planner.c @@ -125,7 +125,7 @@ tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeT if (ts_guc_enable_transparent_decompression && ht && (rel->reloptkind == RELOPT_OTHER_MEMBER_REL || (rel->reloptkind == RELOPT_BASEREL && ts_rte_is_marked_for_expansion(rte))) && - TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht) && fdw_private != NULL && fdw_private->compressed) + TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht)) { if (fdw_private->cached_chunk_struct == NULL) { diff --git a/tsl/test/isolation/expected/concurrent_decompress_update.out b/tsl/test/isolation/expected/concurrent_decompress_update.out new file mode 100644 index 000000000..c6fc9223a --- /dev/null +++ b/tsl/test/isolation/expected/concurrent_decompress_update.out @@ -0,0 +1,38 @@ +Parsed test spec with 2 sessions + +starting permutation: s2_explain_update s1_begin s1_decompress s2_explain_update s1_commit s2_explain_update +compression_status +------------------ +Compressed +(1 row) + +step s2_explain_update: + UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111; + +step s1_begin: + BEGIN; + +step s1_decompress: + SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + +count +----- + 1 +(1 row) + +compression_status +------------------ +Uncompressed +(1 row) + +step s2_explain_update: + UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111; + +step s1_commit: + COMMIT; + +step s2_explain_update: <... completed> +step s2_explain_update: + UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111; + diff --git a/tsl/test/isolation/specs/CMakeLists.txt b/tsl/test/isolation/specs/CMakeLists.txt index 6f381b3aa..db5e80c29 100644 --- a/tsl/test/isolation/specs/CMakeLists.txt +++ b/tsl/test/isolation/specs/CMakeLists.txt @@ -18,6 +18,10 @@ list( cagg_concurrent_refresh_dist_ht.spec deadlock_drop_chunks_compress.spec) +if(PG_VERSION VERSION_GREATER_EQUAL "14.0") + list(APPEND TEST_FILES concurrent_decompress_update.spec) +endif() + if(CMAKE_BUILD_TYPE MATCHES Debug) list(APPEND TEST_TEMPLATES_MODULE ${TEST_TEMPLATES_MODULE_DEBUG}) list( diff --git a/tsl/test/isolation/specs/concurrent_decompress_update.spec b/tsl/test/isolation/specs/concurrent_decompress_update.spec new file mode 100644 index 000000000..10bf6b5f6 --- /dev/null +++ b/tsl/test/isolation/specs/concurrent_decompress_update.spec @@ -0,0 +1,78 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +### +# This isolation test checks that SELECT queries can be performed in parallel to +# chunk decompression operations. This version of the isolation tests creates the +# default index on the time column. See the decompression_chunk_and_parallel_query_wo_idx +# test for a version without any index. +### + +setup { + + CREATE TABLE sensor_data ( + time timestamptz not null, + sensor_id integer not null, + cpu double precision null, + temperature double precision null); + + -- Create the hypertable + SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days'); + +-- SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days', create_default_indexes => FALSE); + + + -- All generated data is part of one chunk. Only one chunk is used because 'compress_chunk' is + -- used in this isolation test. In contrast to 'policy_compression_execute' all decompression + -- operations are executed in one transaction. So, processing more than one chunk with 'compress_chunk' + -- could lead to deadlocks that do not occur real-world scenarios (due to locks hold on a completely + -- decompressed chunk). + + INSERT INTO sensor_data + SELECT time + (INTERVAL '1 minute' * random()) AS time, + sensor_id, + random() AS cpu, + random()* 100 AS temperature + FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '5 minute') AS g1(time), + generate_series(1, 10, 1) AS g2(sensor_id) + ORDER BY time; + + SELECT count(*) FROM sensor_data; + + ALTER TABLE sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby = 'sensor_id'); + + SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); +} + +teardown { + DROP TABLE sensor_data; +} + +session "s1" + +step "s1_begin" { + BEGIN; +} + +step "s1_decompress" { + SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); +} + +step "s1_commit" { + COMMIT; +} + +session "s2" + +step "s2_explain_update" { + UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111; +} + +# UPDATE/DELETE queries don't use TimescaleDB hypertable expansion, and use the +# Postgres expansion code, which might have different locking behavior. Test it +# as well. +permutation "s2_explain_update" "s1_begin" "s1_decompress" "s2_explain_update" "s1_commit" "s2_explain_update" + diff --git a/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in index 9e37cfdbf..784a908c0 100644 --- a/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in +++ b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in @@ -34,8 +34,8 @@ setup { sensor_id, random() AS cpu, random()* 100 AS temperature - FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '1 minute') AS g1(time), - generate_series(1, 50, 1) AS g2(sensor_id) + FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '5 minute') AS g1(time), + generate_series(1, 10, 1) AS g2(sensor_id) ORDER BY time; SELECT count(*) FROM sensor_data;