From 2858110f050c8bdf59064fe426639f746c58c031 Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Mon, 4 Mar 2024 17:11:13 +0100 Subject: [PATCH] Fix NULL start value handling in CAgg refresh The CAgg refresh job did not handle the NULL value of start_offset for a time_bucket function with a variable width properly. This problem has led to the creation of invalid invalidation records and 'timestamp out of range' errors during the next refresh. Fixes: #5474 --- .unreleased/bugfix_6729 | 1 + tsl/src/bgw_policy/continuous_aggregate_api.c | 11 ++++- tsl/src/bgw_policy/continuous_aggregate_api.h | 4 +- tsl/src/bgw_policy/job.c | 6 ++- tsl/test/expected/cagg_invalidation.out | 41 ++++++++++++++++++- tsl/test/sql/cagg_invalidation.sql | 40 +++++++++++++++++- 6 files changed, 95 insertions(+), 8 deletions(-) create mode 100644 .unreleased/bugfix_6729 diff --git a/.unreleased/bugfix_6729 b/.unreleased/bugfix_6729 new file mode 100644 index 000000000..e1ded6ebf --- /dev/null +++ b/.unreleased/bugfix_6729 @@ -0,0 +1 @@ +Fixes: #6729 Fix NULL start value handling in CAgg refresh diff --git a/tsl/src/bgw_policy/continuous_aggregate_api.c b/tsl/src/bgw_policy/continuous_aggregate_api.c index c209dc7eb..1e2477993 100644 --- a/tsl/src/bgw_policy/continuous_aggregate_api.c +++ b/tsl/src/bgw_policy/continuous_aggregate_api.c @@ -111,13 +111,20 @@ get_time_from_config(const Dimension *dim, const Jsonb *config, const char *json } int64 -policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config, bool *start_isnull) +policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dimension *dim, + const Jsonb *config, bool *start_isnull) { int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_START_OFFSET, start_isnull); /* interpret NULL as min value for that type */ if (*start_isnull) - return ts_time_get_min(ts_dimension_get_partition_type(dim)); + { + Oid type = ts_dimension_get_partition_type(dim); + + return ts_continuous_agg_bucket_width_variable(cagg) ? ts_time_get_nobegin_or_min(type) : + ts_time_get_min(type); + } + return res; } diff --git a/tsl/src/bgw_policy/continuous_aggregate_api.h b/tsl/src/bgw_policy/continuous_aggregate_api.h index 5f481613b..a8329ddc7 100644 --- a/tsl/src/bgw_policy/continuous_aggregate_api.h +++ b/tsl/src/bgw_policy/continuous_aggregate_api.h @@ -16,8 +16,8 @@ extern Datum policy_refresh_cagg_check(PG_FUNCTION_ARGS); extern Datum policy_refresh_cagg_remove(PG_FUNCTION_ARGS); int32 policy_continuous_aggregate_get_mat_hypertable_id(const Jsonb *config); -int64 policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config, - bool *start_isnull); +int64 policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dimension *dim, + const Jsonb *config, bool *start_isnull); int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config, bool *end_isnull); bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type, diff --git a/tsl/src/bgw_policy/job.c b/tsl/src/bgw_policy/job.c index 5d27b7d20..ef9a306c2 100644 --- a/tsl/src/bgw_policy/job.c +++ b/tsl/src/bgw_policy/job.c @@ -401,9 +401,11 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD errmsg("configuration materialization hypertable id %d not found", materialization_id))); + ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false); + open_dim = get_open_dimension_for_hypertable(mat_ht, true); dim_type = ts_dimension_get_partition_type(open_dim); - refresh_start = policy_refresh_cagg_get_refresh_start(open_dim, config, &start_isnull); + refresh_start = policy_refresh_cagg_get_refresh_start(cagg, open_dim, config, &start_isnull); refresh_end = policy_refresh_cagg_get_refresh_end(open_dim, config, &end_isnull); if (refresh_start >= refresh_end) @@ -420,7 +422,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD policy_data->refresh_window.type = dim_type; policy_data->refresh_window.start = refresh_start; policy_data->refresh_window.end = refresh_end; - policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false); + policy_data->cagg = cagg; policy_data->start_is_null = start_isnull; policy_data->end_is_null = end_isnull; } diff --git a/tsl/test/expected/cagg_invalidation.out b/tsl/test/expected/cagg_invalidation.out index ebf886607..87ac5fc4f 100644 --- a/tsl/test/expected/cagg_invalidation.out +++ b/tsl/test/expected/cagg_invalidation.out @@ -1189,7 +1189,7 @@ WHERE cagg_id = :cond_10_id; -- should trigger two individual refreshes CALL refresh_continuous_aggregate('cond_10', 0, 200); --- Allow at most 5 individual invalidations per refreshe +-- Allow at most 5 individual invalidations per refresh SET timescaledb.materializations_per_refresh_window=5; -- Insert into every second bucket INSERT INTO conditions VALUES (20, 1, 1.0); @@ -1226,6 +1226,7 @@ CALL refresh_continuous_aggregate('cond_10', 0, 200); WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" DETAIL: Expected an integer but current value is "-". \set VERBOSITY terse +RESET timescaledb.materializations_per_refresh_window; -- Test refresh with undefined invalidation threshold and variable sized buckets CREATE TABLE timestamp_ht ( time timestamptz NOT NULL, @@ -1275,3 +1276,41 @@ CREATE MATERIALIZED VIEW temperature_1month_hierarchical_ts FROM temperature_4h_2 GROUP BY 1 ORDER BY 1; NOTICE: continuous aggregate "temperature_1month_hierarchical_ts" is already up-to-date +--------------------------------------------------------------------- +--- Issue 5474 +--------------------------------------------------------------------- +CREATE TABLE i5474 ( +time timestamptz NOT NULL, +sensor_id integer NOT NULL, +cpu double precision NOT NULL, +temperature double precision NOT NULL); +SELECT create_hypertable('i5474','time'); + create_hypertable +--------------------- + (16,public,i5474,t) +(1 row) + +CREATE MATERIALIZED VIEW i5474_summary_daily + WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 hour', time, 'AWST') AS bucket, + sensor_id, + avg(cpu) AS avg_cpu + FROM i5474 + GROUP BY bucket, sensor_id; +NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date +SELECT add_continuous_aggregate_policy('i5474_summary_daily', + start_offset => NULL, + end_offset => INTERVAL '10 MINUTES', + schedule_interval => INTERVAL '1 MINUTE' +) new_job_id \gset +-- Check that start_offset = NULL is handled properly by the refresh job... +CALL run_job(:new_job_id); +-- ...and the CAgg can be refreshed afterward +CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-03-21 05:00:00+00'); +NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date +INSERT INTO i5474 (time, sensor_id, cpu, temperature) VALUES ('2000-01-01 05:00:00+00', 1, 1.0, 1.0); +CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00'); +-- CAgg should be up-to-date now +CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00'); +NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date diff --git a/tsl/test/sql/cagg_invalidation.sql b/tsl/test/sql/cagg_invalidation.sql index 133c2b242..d9bdee8a8 100644 --- a/tsl/test/sql/cagg_invalidation.sql +++ b/tsl/test/sql/cagg_invalidation.sql @@ -683,7 +683,7 @@ WHERE cagg_id = :cond_10_id; -- should trigger two individual refreshes CALL refresh_continuous_aggregate('cond_10', 0, 200); --- Allow at most 5 individual invalidations per refreshe +-- Allow at most 5 individual invalidations per refresh SET timescaledb.materializations_per_refresh_window=5; -- Insert into every second bucket @@ -719,6 +719,7 @@ SET timescaledb.materializations_per_refresh_window='-'; INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); \set VERBOSITY terse +RESET timescaledb.materializations_per_refresh_window; -- Test refresh with undefined invalidation threshold and variable sized buckets CREATE TABLE timestamp_ht ( @@ -765,3 +766,40 @@ CREATE MATERIALIZED VIEW temperature_1month_hierarchical_ts SELECT time_bucket('1 month', bucket_4h, 'Europe/Berlin'), avg(average) FROM temperature_4h_2 GROUP BY 1 ORDER BY 1; + +--------------------------------------------------------------------- +--- Issue 5474 +--------------------------------------------------------------------- +CREATE TABLE i5474 ( +time timestamptz NOT NULL, +sensor_id integer NOT NULL, +cpu double precision NOT NULL, +temperature double precision NOT NULL); + +SELECT create_hypertable('i5474','time'); + +CREATE MATERIALIZED VIEW i5474_summary_daily + WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 hour', time, 'AWST') AS bucket, + sensor_id, + avg(cpu) AS avg_cpu + FROM i5474 + GROUP BY bucket, sensor_id; + +SELECT add_continuous_aggregate_policy('i5474_summary_daily', + start_offset => NULL, + end_offset => INTERVAL '10 MINUTES', + schedule_interval => INTERVAL '1 MINUTE' +) new_job_id \gset + +-- Check that start_offset = NULL is handled properly by the refresh job... +CALL run_job(:new_job_id); + +-- ...and the CAgg can be refreshed afterward +CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-03-21 05:00:00+00'); +INSERT INTO i5474 (time, sensor_id, cpu, temperature) VALUES ('2000-01-01 05:00:00+00', 1, 1.0, 1.0); +CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00'); + +-- CAgg should be up-to-date now +CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');