From 81e2f35d4b8d52ed3381cff846556611a8974cf9 Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Tue, 20 Jun 2023 12:58:31 +0200 Subject: [PATCH] Mark cagg_watermark as PARALLEL RESTRICTED MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch marks the function cagg_watermark as PARALLEL RESTRICTED. It partially reverts the change of c0f2ed18095f21ac737f96fe93e4035dbfeeaf2c. The reason is as follows: for transaction isolation levels < REPEATABLE READ it can not be ensured that parallel worker reads the same watermark (e.g., using read committed isolation level: worker A reads the watermark, the CAGG is refreshed and the watermark changes, worker B reads the newer watermark). The different views on the CAGG can cause unexpected results and crashes (e.g., the chunk exclusion excludes different chunks in worker A and in worker B). In addition, a correct snapshot is used when the watermark is read from the CAGG and a TAP test is added, which detects inconsistent watermark reads. Co-authored-by: Fabrízio de Royes Mello Co-authored-by: Zoltan Haindrich --- .unreleased/bugfix_5804 | 1 + sql/util_time.sql | 2 +- src/ts_catalog/continuous_aggs_watermark.c | 18 +++ tsl/test/expected/continuous_aggs-12.out | 46 +++--- tsl/test/expected/continuous_aggs-13.out | 46 +++--- tsl/test/expected/continuous_aggs-14.out | 46 +++--- tsl/test/expected/continuous_aggs-15.out | 48 +++--- tsl/test/t/008_mvcc_cagg.pl | 179 +++++++++++++++++++++ tsl/test/t/CMakeLists.txt | 2 +- 9 files changed, 281 insertions(+), 107 deletions(-) create mode 100644 .unreleased/bugfix_5804 create mode 100644 tsl/test/t/008_mvcc_cagg.pl diff --git a/.unreleased/bugfix_5804 b/.unreleased/bugfix_5804 new file mode 100644 index 000000000..d33cf75b2 --- /dev/null +++ b/.unreleased/bugfix_5804 @@ -0,0 +1 @@ +Fixes: #5804 Mark cagg_watermark function as PARALLEL RESTRICTED diff --git a/sql/util_time.sql b/sql/util_time.sql index 6645971b9..e9027033a 100644 --- a/sql/util_time.sql +++ b/sql/util_time.sql @@ -39,7 +39,7 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.time_to_internal(time_val ANYEL RETURNS BIGINT AS '@MODULE_PATHNAME@', 'ts_time_to_internal' LANGUAGE C VOLATILE STRICT; CREATE OR REPLACE FUNCTION _timescaledb_internal.cagg_watermark(hypertable_id INTEGER) -RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT PARALLEL SAFE; +RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT PARALLEL RESTRICTED; CREATE OR REPLACE FUNCTION _timescaledb_internal.cagg_watermark_materialized(hypertable_id INTEGER) RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark_materialized' LANGUAGE C STABLE STRICT PARALLEL SAFE; diff --git a/src/ts_catalog/continuous_aggs_watermark.c b/src/ts_catalog/continuous_aggs_watermark.c index a66a54ce3..88c34442b 100644 --- a/src/ts_catalog/continuous_aggs_watermark.c +++ b/src/ts_catalog/continuous_aggs_watermark.c @@ -13,6 +13,7 @@ #include #include #include +#include #include "ts_catalog/continuous_agg.h" #include "ts_catalog/continuous_aggs_watermark.h" @@ -88,6 +89,16 @@ cagg_watermark_get(Hypertable *mat_ht) ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGGS_WATERMARK, AccessShareLock, CurrentMemoryContext); + /* + * The watermark of a CAGG has to be fetched by using the transaction snapshot. + * + * By default, the ts_scanner uses the SnapshotSelf to perform a scan. However, reading the + * watermark must be done using the transaction snapshot in order to ensure that the view on the + * watermark and the materialized part of the CAGG match. + */ + iterator.ctx.snapshot = GetTransactionSnapshot(); + Assert(iterator.ctx.snapshot != NULL); + cagg_watermark_init_scan_by_mat_hypertable_id(&iterator, mat_ht->fd.id); ts_scanner_foreach(&iterator) @@ -105,6 +116,13 @@ cagg_watermark_get(Hypertable *mat_ht) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("watermark not defined for continuous aggregate: %d", mat_ht->fd.id))); + /* Log the read watermark, needed for MVCC tap tests */ + ereport(DEBUG5, + (errcode(ERRCODE_SUCCESSFUL_COMPLETION), + errmsg("watermark for continuous aggregate, '%d' is: " INT64_FORMAT, + mat_ht->fd.id, + DatumGetInt64(watermark)))); + return DatumGetInt64(watermark); } diff --git a/tsl/test/expected/continuous_aggs-12.out b/tsl/test/expected/continuous_aggs-12.out index 8c5fde7af..b5485eef1 100644 --- a/tsl/test/expected/continuous_aggs-12.out +++ b/tsl/test/expected/continuous_aggs-12.out @@ -2315,34 +2315,28 @@ SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; -- Parallel planning EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01'; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Merge Append Sort Key: _materialized_hypertable_64.sum DESC - -> Gather Merge - Workers Planned: 2 - -> Sort - Sort Key: _materialized_hypertable_64.sum DESC - -> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64 - Chunks excluded during startup: 0 - -> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - -> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - -> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Sort + Sort Key: _materialized_hypertable_64.sum DESC + -> Custom Scan (ChunkAppend) on _materialized_hypertable_64 + Chunks excluded during startup: 0 + -> Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) -> Sort Sort Key: (sum(conditions.temperature)) DESC - -> Finalize HashAggregate - Group Key: (time_bucket('@ 1 day'::interval, conditions.timec)) - -> Gather - Workers Planned: 4 - -> Partial HashAggregate - Group Key: time_bucket('@ 1 day'::interval, conditions.timec) - -> Parallel Custom Scan (ChunkAppend) on conditions - Chunks excluded during startup: 26 - -> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk - Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone) -(27 rows) + -> HashAggregate + Group Key: time_bucket('@ 1 day'::interval, conditions.timec) + -> Custom Scan (ChunkAppend) on conditions + Chunks excluded during startup: 26 + -> Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk + Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone) +(21 rows) diff --git a/tsl/test/expected/continuous_aggs-13.out b/tsl/test/expected/continuous_aggs-13.out index 8c5fde7af..b5485eef1 100644 --- a/tsl/test/expected/continuous_aggs-13.out +++ b/tsl/test/expected/continuous_aggs-13.out @@ -2315,34 +2315,28 @@ SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; -- Parallel planning EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01'; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Merge Append Sort Key: _materialized_hypertable_64.sum DESC - -> Gather Merge - Workers Planned: 2 - -> Sort - Sort Key: _materialized_hypertable_64.sum DESC - -> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64 - Chunks excluded during startup: 0 - -> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - -> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - -> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Sort + Sort Key: _materialized_hypertable_64.sum DESC + -> Custom Scan (ChunkAppend) on _materialized_hypertable_64 + Chunks excluded during startup: 0 + -> Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) -> Sort Sort Key: (sum(conditions.temperature)) DESC - -> Finalize HashAggregate - Group Key: (time_bucket('@ 1 day'::interval, conditions.timec)) - -> Gather - Workers Planned: 4 - -> Partial HashAggregate - Group Key: time_bucket('@ 1 day'::interval, conditions.timec) - -> Parallel Custom Scan (ChunkAppend) on conditions - Chunks excluded during startup: 26 - -> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk - Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone) -(27 rows) + -> HashAggregate + Group Key: time_bucket('@ 1 day'::interval, conditions.timec) + -> Custom Scan (ChunkAppend) on conditions + Chunks excluded during startup: 26 + -> Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk + Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone) +(21 rows) diff --git a/tsl/test/expected/continuous_aggs-14.out b/tsl/test/expected/continuous_aggs-14.out index 8edb85829..4975fc0a8 100644 --- a/tsl/test/expected/continuous_aggs-14.out +++ b/tsl/test/expected/continuous_aggs-14.out @@ -2314,34 +2314,28 @@ SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; -- Parallel planning EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01'; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Merge Append Sort Key: _materialized_hypertable_64.sum DESC - -> Gather Merge - Workers Planned: 2 - -> Sort - Sort Key: _materialized_hypertable_64.sum DESC - -> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64 - Chunks excluded during startup: 0 - -> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - -> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - -> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Sort + Sort Key: _materialized_hypertable_64.sum DESC + -> Custom Scan (ChunkAppend) on _materialized_hypertable_64 + Chunks excluded during startup: 0 + -> Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) -> Sort Sort Key: (sum(conditions.temperature)) DESC - -> Finalize HashAggregate - Group Key: (time_bucket('@ 1 day'::interval, conditions.timec)) - -> Gather - Workers Planned: 4 - -> Partial HashAggregate - Group Key: time_bucket('@ 1 day'::interval, conditions.timec) - -> Parallel Custom Scan (ChunkAppend) on conditions - Chunks excluded during startup: 26 - -> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk - Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone) -(27 rows) + -> HashAggregate + Group Key: time_bucket('@ 1 day'::interval, conditions.timec) + -> Custom Scan (ChunkAppend) on conditions + Chunks excluded during startup: 26 + -> Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk + Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone) +(21 rows) diff --git a/tsl/test/expected/continuous_aggs-15.out b/tsl/test/expected/continuous_aggs-15.out index 56d844734..c42f528ad 100644 --- a/tsl/test/expected/continuous_aggs-15.out +++ b/tsl/test/expected/continuous_aggs-15.out @@ -2316,35 +2316,29 @@ SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; -- Parallel planning EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01'; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Merge Append Sort Key: _materialized_hypertable_64.sum DESC - -> Gather Merge - Workers Planned: 2 - -> Sort - Sort Key: _materialized_hypertable_64.sum DESC - -> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64 - Chunks excluded during startup: 0 - -> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - -> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - -> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk - Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Sort + Sort Key: _materialized_hypertable_64.sum DESC + -> Custom Scan (ChunkAppend) on _materialized_hypertable_64 + Chunks excluded during startup: 0 + -> Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + -> Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk + Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) -> Sort Sort Key: (sum(conditions.temperature)) DESC - -> Finalize HashAggregate - Group Key: (time_bucket('@ 1 day'::interval, conditions.timec)) - -> Gather - Workers Planned: 4 - -> Partial HashAggregate - Group Key: time_bucket('@ 1 day'::interval, conditions.timec) - -> Result - -> Parallel Custom Scan (ChunkAppend) on conditions - Chunks excluded during startup: 26 - -> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk - Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) - Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone) -(28 rows) + -> HashAggregate + Group Key: time_bucket('@ 1 day'::interval, conditions.timec) + -> Result + -> Custom Scan (ChunkAppend) on conditions + Chunks excluded during startup: 26 + -> Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk + Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)) + Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone) +(22 rows) diff --git a/tsl/test/t/008_mvcc_cagg.pl b/tsl/test/t/008_mvcc_cagg.pl new file mode 100644 index 000000000..8c797fda8 --- /dev/null +++ b/tsl/test/t/008_mvcc_cagg.pl @@ -0,0 +1,179 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +# This TAP test tests the MVCC behavior of the watermark function of the CAGGs +# It creates a hypertable, a CAGG, and a refresh policy. Afterward, it inserts +# into the hypertable and checks that all workers of a parallel query see the same +# watermark. + +use strict; +use warnings; +use TimescaleNode; +use Data::Dumper; +use Test::More tests => 5; + +# See debug output in 'tsl/test/tmp_check/log/regress_log_008_mvcc_cagg' +my $debug = 0; + +# Should be enabled as soon as _timescaledb_internal.cagg_watermark is declared as parallel safe +my $check_for_parallel_plan = 0; + +my $timescale_node = TimescaleNode->create('insert'); + +# Test 1 - Create table +my $result = $timescale_node->safe_psql( + 'postgres', q{ + CREATE TABLE adapter_metrics ( + time timestamptz NOT NULL, + adapter_token text NOT NULL, + dyno text NOT NULL, + queue text NOT NULL, + value double precision NOT NULL); +} +); +is($result, '', 'create table'); + +# Test 2 - Convert to hypertable +$result = $timescale_node->safe_psql( + 'postgres', q{ + SELECT FROM create_hypertable('adapter_metrics','time', chunk_time_interval => INTERVAL '5 seconds'); +} +); +is($result, '', 'create hypertable'); + +# Test 3 - Create CAGG +$result = $timescale_node->safe_psql( + 'postgres', q{ + CREATE MATERIALIZED VIEW adapter_metrics_rollup WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS + SELECT + time_bucket('00:00:05'::interval, adapter_metrics.time) AS bucket, + avg(adapter_metrics.value) AS avg_value, + max(adapter_metrics.value) AS max_value, + count(*) AS count + FROM adapter_metrics + GROUP BY time_bucket('00:00:05'::interval, adapter_metrics.time) + WITH DATA; +} +); +is($result, '', 'crete cagg'); + +# Test 4 - Create CAGG policy +$result = $timescale_node->safe_psql( + 'postgres', q{ + SELECT add_continuous_aggregate_policy('adapter_metrics_rollup', + start_offset => INTERVAL '20 seconds', + end_offset => NULL, + schedule_interval => INTERVAL '1 seconds'); +} +); +is($result, '1000', 'job id of the cagg'); + +# Test 5 - Test consistent CAGG watermarks +for (my $iteration = 0; $iteration < 10; $iteration++) +{ + # Insert data + $timescale_node->safe_psql( + 'postgres', q{ + INSERT INTO adapter_metrics + SELECT + now(), + random()::text AS dyno, + random()::text AS queue, + random()::text AS adapter_token, + value + FROM + generate_series(1, 100, 1) AS g2(value); + } + ); + + # Perform selects while the CAGG update is running + for (my $query = 0; $query < 10; $query++) + { + # Encourage the use of parallel workers + my $sql = q{ + SET force_parallel_mode = 1; + SET enable_bitmapscan = 0; + SET parallel_setup_cost = 0; + SET parallel_tuple_cost = 0; + SET parallel_tuple_cost = 0; + SET client_min_messages TO DEBUG5; + SELECT * FROM adapter_metrics_rollup WHERE adapter_metrics_rollup.bucket > now() - INTERVAL '30 seconds'; + }; + my ($stdout, $stderr); + + if ($debug == 1) + { + print "===== New query\n"; + } + + # To be able to access stderr, use psql instead of safe_psql + $timescale_node->psql( + 'postgres', $sql, + stdout => \$stdout, + stderr => \$stderr, + on_error_die => 1, + on_error_stop => 1); + + # Ensure all parallel workers have seen the same watermark + my @log_lines = split "\n", $stderr; + my $seen_watermarks = 0; + my $cagg_id = -1; + my $watermark = -1; + + # Check that all workers have seen the same watermark + foreach (@log_lines) + { + if (/watermark for continuous aggregate .+'(\d+)' is: (\d+)/) + { + my $current_cagg = $1; + my $current_watermark = $2; + + if ($debug == 1) + { + print $_ . "\n"; + } + + if ($watermark == -1) + { + $watermark = $current_watermark; + $cagg_id = $current_cagg; + } + elsif ($watermark != $current_watermark) + { + diag( + "Some workers have read the watermark $watermark, one worker read the watermark $current_watermark\n" + ); + fail(); + BAIL_OUT(); + } + elsif ($cagg_id != $current_cagg) + { + diag( + "Got watermarks for differnt CAGGs, this test can not handle this ($cagg_id / $current_cagg)\n" + ); + fail(); + BAIL_OUT(); + } + + $seen_watermarks++; + } + } + + if ($check_for_parallel_plan == 1 && $seen_watermarks < 2) + { + diag( + "Got watermark only from one worker, was a parallel plan used?" + ); + fail(); + BAIL_OUT(); + } + } +} + +# No errors before? Let test 5 pass +pass(); + +done_testing(); + +1; diff --git a/tsl/test/t/CMakeLists.txt b/tsl/test/t/CMakeLists.txt index 57c45e820..e905cd362 100644 --- a/tsl/test/t/CMakeLists.txt +++ b/tsl/test/t/CMakeLists.txt @@ -1,7 +1,7 @@ set(PROVE_TEST_FILES 001_simple_multinode.pl 003_connections_privs.pl) set(PROVE_DEBUG_TEST_FILES 002_chunk_copy_move.pl 004_multinode_rdwr_1pc.pl 005_add_data_node.pl - 006_job_crash_log.pl 007_healthcheck.pl) + 006_job_crash_log.pl 007_healthcheck.pl 008_mvcc_cagg.pl) if(CMAKE_BUILD_TYPE MATCHES Debug) list(APPEND PROVE_TEST_FILES ${PROVE_DEBUG_TEST_FILES})