1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-20 20:54:29 +08:00

Reread the catalog data after locking the chunk

The compression status can change, and this is prevented by locking, so
to keep our data consistent, we should reread the chunk metadata after
we have locked it. Currently we have some code in place that masks this
inconsistency by rereading the metadata in another place. This code is
removed.
This commit is contained in:
Alexander Kuzmenkov 2023-08-29 12:39:17 +02:00
parent a9751ccd5e
commit 623381ce99
11 changed files with 191 additions and 108 deletions

@ -8,6 +8,7 @@ name: Sanitizer test
branches:
- main
- prerelease_test
- trigger/sanitizer
pull_request:
paths: .github/workflows/sanitizer-build-and-test.yaml

@ -43,123 +43,98 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned
{
MemoryContext work_mcxt =
AllocSetContextCreate(CurrentMemoryContext, "chunk-scan-work", ALLOCSET_DEFAULT_SIZES);
MemoryContext per_tuple_mcxt =
AllocSetContextCreate(work_mcxt, "chunk-scan-per-tuple", ALLOCSET_SMALL_SIZES);
MemoryContext orig_mcxt;
Chunk **locked_chunks = NULL;
Chunk **unlocked_chunks = NULL;
int locked_chunk_count = 0;
int unlocked_chunk_count = 0;
ListCell *lc;
int remote_chunk_count = 0;
Assert(OidIsValid(hs->main_table_relid));
orig_mcxt = MemoryContextSwitchTo(work_mcxt);
MemoryContext orig_mcxt = MemoryContextSwitchTo(work_mcxt);
/*
* For each matching chunk, fill in the metadata from the "chunk" table.
* Make sure to filter out "dropped" chunks.
*/
ScanIterator chunk_it = ts_chunk_scan_iterator_create(orig_mcxt);
unlocked_chunks = MemoryContextAlloc(work_mcxt, sizeof(Chunk *) * list_length(chunk_ids));
locked_chunks = MemoryContextAlloc(orig_mcxt, sizeof(Chunk *) * list_length(chunk_ids));
foreach (lc, chunk_ids)
{
int chunk_id = lfirst_int(lc);
TupleInfo *ti;
Assert(CurrentMemoryContext == work_mcxt);
ts_chunk_scan_iterator_set_chunk_id(&chunk_it, chunk_id);
ts_scan_iterator_start_or_restart_scan(&chunk_it);
ti = ts_scan_iterator_next(&chunk_it);
if (ti)
TupleInfo *ti = ts_scan_iterator_next(&chunk_it);
if (ti == NULL)
{
bool isnull;
Datum datum = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull);
bool is_dropped = isnull ? false : DatumGetBool(datum);
MemoryContextSwitchTo(per_tuple_mcxt);
MemoryContextReset(per_tuple_mcxt);
if (!is_dropped)
{
Chunk *chunk = MemoryContextAllocZero(orig_mcxt, sizeof(Chunk));
MemoryContext old_mcxt = MemoryContextSwitchTo(ti->mctx);
ts_chunk_formdata_fill(&chunk->fd, ti);
MemoryContextSwitchTo(old_mcxt);
chunk->constraints = NULL;
chunk->cube = NULL;
chunk->hypertable_relid = hs->main_table_relid;
unlocked_chunks[unlocked_chunk_count] = chunk;
unlocked_chunk_count++;
}
MemoryContextSwitchTo(work_mcxt);
/* Only one chunk should match */
Assert(ts_scan_iterator_next(&chunk_it) == NULL);
continue;
}
bool isnull;
Datum datum = slot_getattr(ti->slot, Anum_chunk_dropped, &isnull);
const bool is_dropped = isnull ? false : DatumGetBool(datum);
if (is_dropped)
{
continue;
}
/* We found a chunk that is not dropped. First, try to lock it. */
Name schema_name = DatumGetName(slot_getattr(ti->slot, Anum_chunk_schema_name, &isnull));
Assert(!isnull);
Name table_name = DatumGetName(slot_getattr(ti->slot, Anum_chunk_table_name, &isnull));
Assert(!isnull);
Oid chunk_reloid = ts_get_relation_relid(NameStr(*schema_name),
NameStr(*table_name),
/* return_invalid = */ false);
Assert(OidIsValid(chunk_reloid));
/* Only one chunk should match */
Assert(ts_scan_iterator_next(&chunk_it) == NULL);
DEBUG_WAITPOINT("hypertable_expansion_before_lock_chunk");
if (!ts_chunk_lock_if_exists(chunk_reloid, AccessShareLock))
{
continue;
}
/*
* Now after we have locked the chunk, we have to reread its metadata.
* It might have been modified concurrently by decompression, for
* example.
*/
ts_chunk_scan_iterator_set_chunk_id(&chunk_it, chunk_id);
ts_scan_iterator_start_or_restart_scan(&chunk_it);
ti = ts_scan_iterator_next(&chunk_it);
Assert(ti != NULL);
Chunk *chunk = MemoryContextAllocZero(orig_mcxt, sizeof(Chunk));
ts_chunk_formdata_fill(&chunk->fd, ti);
chunk->constraints = NULL;
chunk->cube = NULL;
chunk->hypertable_relid = hs->main_table_relid;
chunk->table_id = chunk_reloid;
locked_chunks[locked_chunk_count] = chunk;
locked_chunk_count++;
/* Only one chunk should match */
Assert(ts_scan_iterator_next(&chunk_it) == NULL);
}
ts_scan_iterator_close(&chunk_it);
Assert(unlocked_chunk_count == 0 || unlocked_chunks != NULL);
Assert(unlocked_chunk_count <= list_length(chunk_ids));
Assert(locked_chunk_count == 0 || locked_chunks != NULL);
Assert(locked_chunk_count <= list_length(chunk_ids));
Assert(CurrentMemoryContext == work_mcxt);
DEBUG_WAITPOINT("expanded_chunks");
/*
* Batch the lookups to each catalog cache to have more favorable access
* patterns.
* Schema oid isn't likely to change, so cache it.
*/
char *last_schema_name = NULL;
for (int i = 0; i < unlocked_chunk_count; i++)
for (int i = 0; i < locked_chunk_count; i++)
{
Chunk *chunk = unlocked_chunks[i];
char *current_schema_name = NameStr(chunk->fd.schema_name);
if (last_schema_name == NULL || strcmp(last_schema_name, current_schema_name) != 0)
{
last_schema_name = current_schema_name;
}
chunk->table_id =
ts_get_relation_relid(last_schema_name, NameStr(chunk->fd.table_name), false);
Assert(OidIsValid(chunk->table_id));
}
for (int i = 0; i < unlocked_chunk_count; i++)
{
Chunk *chunk = unlocked_chunks[i];
Chunk *chunk = locked_chunks[i];
chunk->relkind = get_rel_relkind(chunk->table_id);
}
/*
* Lock the chunks.
*/
for (int i = 0; i < unlocked_chunk_count; i++)
{
Chunk *chunk = unlocked_chunks[i];
if (ts_chunk_lock_if_exists(chunk->table_id, AccessShareLock))
{
/* Lazy initialize the chunks array */
if (NULL == locked_chunks)
locked_chunks =
MemoryContextAlloc(orig_mcxt, sizeof(Chunk *) * unlocked_chunk_count);
locked_chunks[locked_chunk_count] = chunk;
if (chunk->relkind == RELKIND_FOREIGN_TABLE)
remote_chunk_count++;
locked_chunk_count++;
}
if (chunk->relkind == RELKIND_FOREIGN_TABLE)
remote_chunk_count++;
}
/*
@ -178,9 +153,7 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned
while (ts_scan_iterator_next(&constr_it) != NULL)
{
TupleInfo *constr_ti = ts_scan_iterator_tuple_info(&constr_it);
MemoryContextSwitchTo(per_tuple_mcxt);
ts_chunk_constraints_add_from_tuple(chunk->constraints, constr_ti);
MemoryContextSwitchTo(work_mcxt);
}
}
ts_scan_iterator_close(&constr_it);
@ -264,9 +237,6 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned
MemoryContext old_mcxt;
HeapTuple tuple;
MemoryContextSwitchTo(per_tuple_mcxt);
MemoryContextReset(per_tuple_mcxt);
tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
form = (Form_chunk_data_node) GETSTRUCT(tuple);
old_mcxt = MemoryContextSwitchTo(ti->mctx);
@ -280,8 +250,6 @@ ts_chunk_scan_by_chunk_ids(const Hyperspace *hs, const List *chunk_ids, unsigned
if (should_free)
heap_freetuple(tuple);
MemoryContextSwitchTo(work_mcxt);
}
}
}

@ -1320,10 +1320,6 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo
if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
{
Relation uncompressed_chunk = table_open(relation_objectid, NoLock);
ts_get_private_reloptinfo(rel)->compressed = true;
/* Planning indexes is expensive, and if this is a fully compressed chunk, we
* know we'll never need to use indexes on the uncompressed version, since
* all the data is in the compressed chunk anyway. Therefore, it is much
@ -1334,7 +1330,6 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo
*/
if (!ts_chunk_is_partial(chunk))
rel->indexlist = NIL;
table_close(uncompressed_chunk, NoLock);
}
}
break;

@ -33,7 +33,6 @@ typedef struct TimescaleDBPrivate
/* attno of the time dimension in the parent table if appends are ordered */
int order_attno;
List *nested_oids;
bool compressed;
List *chunk_oids;
List *serverids;
Relids server_relids;

@ -8,7 +8,7 @@ Fri Jan 03 10:30:00 2020 PST| 1| 1
Sun Jan 03 10:30:00 2021 PST| 2| 2
(2 rows)
step s1_wp_enable: SELECT debug_waitpoint_enable('expanded_chunks');
step s1_wp_enable: SELECT debug_waitpoint_enable('hypertable_expansion_before_lock_chunk');
debug_waitpoint_enable
----------------------
@ -21,7 +21,7 @@ count
1
(1 row)
step s1_wp_release: SELECT debug_waitpoint_release('expanded_chunks');
step s1_wp_release: SELECT debug_waitpoint_release('hypertable_expansion_before_lock_chunk');
debug_waitpoint_release
-----------------------

@ -19,9 +19,9 @@ teardown {
# acqurired, the chunk should also be ignored.
session "s1"
step "s1_wp_enable" { SELECT debug_waitpoint_enable('expanded_chunks'); }
step "s1_wp_release" { SELECT debug_waitpoint_release('expanded_chunks'); }
step "s1_drop_chunks" { SELECT count(*) FROM drop_chunks('measurements', TIMESTAMPTZ '2020-03-01'); }
step "s1_wp_enable" { SELECT debug_waitpoint_enable('hypertable_expansion_before_lock_chunk'); }
step "s1_wp_release" { SELECT debug_waitpoint_release('hypertable_expansion_before_lock_chunk'); }
step "s1_drop_chunks" { SELECT count(*) FROM drop_chunks('measurements', TIMESTAMPTZ '2020-03-01'); }
session "s2"
step "s2_show_num_chunks" { SELECT count(*) FROM show_chunks('measurements') ORDER BY 1; }

@ -125,7 +125,7 @@ tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeT
if (ts_guc_enable_transparent_decompression && ht &&
(rel->reloptkind == RELOPT_OTHER_MEMBER_REL ||
(rel->reloptkind == RELOPT_BASEREL && ts_rte_is_marked_for_expansion(rte))) &&
TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht) && fdw_private != NULL && fdw_private->compressed)
TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht))
{
if (fdw_private->cached_chunk_struct == NULL)
{

@ -0,0 +1,38 @@
Parsed test spec with 2 sessions
starting permutation: s2_explain_update s1_begin s1_decompress s2_explain_update s1_commit s2_explain_update
compression_status
------------------
Compressed
(1 row)
step s2_explain_update:
UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111;
step s1_begin:
BEGIN;
step s1_decompress:
SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
count
-----
1
(1 row)
compression_status
------------------
Uncompressed
(1 row)
step s2_explain_update:
UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111;
<waiting ...>
step s1_commit:
COMMIT;
step s2_explain_update: <... completed>
step s2_explain_update:
UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111;

@ -18,6 +18,10 @@ list(
cagg_concurrent_refresh_dist_ht.spec
deadlock_drop_chunks_compress.spec)
if(PG_VERSION VERSION_GREATER_EQUAL "14.0")
list(APPEND TEST_FILES concurrent_decompress_update.spec)
endif()
if(CMAKE_BUILD_TYPE MATCHES Debug)
list(APPEND TEST_TEMPLATES_MODULE ${TEST_TEMPLATES_MODULE_DEBUG})
list(

@ -0,0 +1,78 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.
###
# This isolation test checks that SELECT queries can be performed in parallel to
# chunk decompression operations. This version of the isolation tests creates the
# default index on the time column. See the decompression_chunk_and_parallel_query_wo_idx
# test for a version without any index.
###
setup {
CREATE TABLE sensor_data (
time timestamptz not null,
sensor_id integer not null,
cpu double precision null,
temperature double precision null);
-- Create the hypertable
SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days');
-- SELECT FROM create_hypertable('sensor_data','time', chunk_time_interval => INTERVAL '60 days', create_default_indexes => FALSE);
-- All generated data is part of one chunk. Only one chunk is used because 'compress_chunk' is
-- used in this isolation test. In contrast to 'policy_compression_execute' all decompression
-- operations are executed in one transaction. So, processing more than one chunk with 'compress_chunk'
-- could lead to deadlocks that do not occur real-world scenarios (due to locks hold on a completely
-- decompressed chunk).
INSERT INTO sensor_data
SELECT time + (INTERVAL '1 minute' * random()) AS time,
sensor_id,
random() AS cpu,
random()* 100 AS temperature
FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '5 minute') AS g1(time),
generate_series(1, 10, 1) AS g2(sensor_id)
ORDER BY time;
SELECT count(*) FROM sensor_data;
ALTER TABLE sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby = 'sensor_id');
SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
}
teardown {
DROP TABLE sensor_data;
}
session "s1"
step "s1_begin" {
BEGIN;
}
step "s1_decompress" {
SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
}
step "s1_commit" {
COMMIT;
}
session "s2"
step "s2_explain_update" {
UPDATE sensor_data SET cpu = cpu + 1 WHERE cpu = 0.1111111;
}
# UPDATE/DELETE queries don't use TimescaleDB hypertable expansion, and use the
# Postgres expansion code, which might have different locking behavior. Test it
# as well.
permutation "s2_explain_update" "s1_begin" "s1_decompress" "s2_explain_update" "s1_commit" "s2_explain_update"

@ -34,8 +34,8 @@ setup {
sensor_id,
random() AS cpu,
random()* 100 AS temperature
FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '1 minute') AS g1(time),
generate_series(1, 50, 1) AS g2(sensor_id)
FROM generate_series('2022-01-01', '2022-01-15', INTERVAL '5 minute') AS g1(time),
generate_series(1, 10, 1) AS g2(sensor_id)
ORDER BY time;
SELECT count(*) FROM sensor_data;