mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-15 01:53:41 +08:00
Fix NULL start value handling in CAgg refresh
The CAgg refresh job did not handle the NULL value of start_offset for a time_bucket function with a variable width properly. This problem has led to the creation of invalid invalidation records and 'timestamp out of range' errors during the next refresh. Fixes: #5474
This commit is contained in:
parent
a1cd7c45f6
commit
2858110f05
1
.unreleased/bugfix_6729
Normal file
1
.unreleased/bugfix_6729
Normal file
@ -0,0 +1 @@
|
||||
Fixes: #6729 Fix NULL start value handling in CAgg refresh
|
@ -111,13 +111,20 @@ get_time_from_config(const Dimension *dim, const Jsonb *config, const char *json
|
||||
}
|
||||
|
||||
int64
|
||||
policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config, bool *start_isnull)
|
||||
policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dimension *dim,
|
||||
const Jsonb *config, bool *start_isnull)
|
||||
{
|
||||
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_START_OFFSET, start_isnull);
|
||||
|
||||
/* interpret NULL as min value for that type */
|
||||
if (*start_isnull)
|
||||
return ts_time_get_min(ts_dimension_get_partition_type(dim));
|
||||
{
|
||||
Oid type = ts_dimension_get_partition_type(dim);
|
||||
|
||||
return ts_continuous_agg_bucket_width_variable(cagg) ? ts_time_get_nobegin_or_min(type) :
|
||||
ts_time_get_min(type);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -16,8 +16,8 @@ extern Datum policy_refresh_cagg_check(PG_FUNCTION_ARGS);
|
||||
extern Datum policy_refresh_cagg_remove(PG_FUNCTION_ARGS);
|
||||
|
||||
int32 policy_continuous_aggregate_get_mat_hypertable_id(const Jsonb *config);
|
||||
int64 policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config,
|
||||
bool *start_isnull);
|
||||
int64 policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dimension *dim,
|
||||
const Jsonb *config, bool *start_isnull);
|
||||
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config,
|
||||
bool *end_isnull);
|
||||
bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type,
|
||||
|
@ -401,9 +401,11 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
|
||||
errmsg("configuration materialization hypertable id %d not found",
|
||||
materialization_id)));
|
||||
|
||||
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false);
|
||||
|
||||
open_dim = get_open_dimension_for_hypertable(mat_ht, true);
|
||||
dim_type = ts_dimension_get_partition_type(open_dim);
|
||||
refresh_start = policy_refresh_cagg_get_refresh_start(open_dim, config, &start_isnull);
|
||||
refresh_start = policy_refresh_cagg_get_refresh_start(cagg, open_dim, config, &start_isnull);
|
||||
refresh_end = policy_refresh_cagg_get_refresh_end(open_dim, config, &end_isnull);
|
||||
|
||||
if (refresh_start >= refresh_end)
|
||||
@ -420,7 +422,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
|
||||
policy_data->refresh_window.type = dim_type;
|
||||
policy_data->refresh_window.start = refresh_start;
|
||||
policy_data->refresh_window.end = refresh_end;
|
||||
policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false);
|
||||
policy_data->cagg = cagg;
|
||||
policy_data->start_is_null = start_isnull;
|
||||
policy_data->end_is_null = end_isnull;
|
||||
}
|
||||
|
@ -1189,7 +1189,7 @@ WHERE cagg_id = :cond_10_id;
|
||||
|
||||
-- should trigger two individual refreshes
|
||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||
-- Allow at most 5 individual invalidations per refreshe
|
||||
-- Allow at most 5 individual invalidations per refresh
|
||||
SET timescaledb.materializations_per_refresh_window=5;
|
||||
-- Insert into every second bucket
|
||||
INSERT INTO conditions VALUES (20, 1, 1.0);
|
||||
@ -1226,6 +1226,7 @@ CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
|
||||
DETAIL: Expected an integer but current value is "-".
|
||||
\set VERBOSITY terse
|
||||
RESET timescaledb.materializations_per_refresh_window;
|
||||
-- Test refresh with undefined invalidation threshold and variable sized buckets
|
||||
CREATE TABLE timestamp_ht (
|
||||
time timestamptz NOT NULL,
|
||||
@ -1275,3 +1276,41 @@ CREATE MATERIALIZED VIEW temperature_1month_hierarchical_ts
|
||||
FROM temperature_4h_2
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
NOTICE: continuous aggregate "temperature_1month_hierarchical_ts" is already up-to-date
|
||||
---------------------------------------------------------------------
|
||||
--- Issue 5474
|
||||
---------------------------------------------------------------------
|
||||
CREATE TABLE i5474 (
|
||||
time timestamptz NOT NULL,
|
||||
sensor_id integer NOT NULL,
|
||||
cpu double precision NOT NULL,
|
||||
temperature double precision NOT NULL);
|
||||
SELECT create_hypertable('i5474','time');
|
||||
create_hypertable
|
||||
---------------------
|
||||
(16,public,i5474,t)
|
||||
(1 row)
|
||||
|
||||
CREATE MATERIALIZED VIEW i5474_summary_daily
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT
|
||||
time_bucket('1 hour', time, 'AWST') AS bucket,
|
||||
sensor_id,
|
||||
avg(cpu) AS avg_cpu
|
||||
FROM i5474
|
||||
GROUP BY bucket, sensor_id;
|
||||
NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date
|
||||
SELECT add_continuous_aggregate_policy('i5474_summary_daily',
|
||||
start_offset => NULL,
|
||||
end_offset => INTERVAL '10 MINUTES',
|
||||
schedule_interval => INTERVAL '1 MINUTE'
|
||||
) new_job_id \gset
|
||||
-- Check that start_offset = NULL is handled properly by the refresh job...
|
||||
CALL run_job(:new_job_id);
|
||||
-- ...and the CAgg can be refreshed afterward
|
||||
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-03-21 05:00:00+00');
|
||||
NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date
|
||||
INSERT INTO i5474 (time, sensor_id, cpu, temperature) VALUES ('2000-01-01 05:00:00+00', 1, 1.0, 1.0);
|
||||
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');
|
||||
-- CAgg should be up-to-date now
|
||||
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');
|
||||
NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date
|
||||
|
@ -683,7 +683,7 @@ WHERE cagg_id = :cond_10_id;
|
||||
-- should trigger two individual refreshes
|
||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||
|
||||
-- Allow at most 5 individual invalidations per refreshe
|
||||
-- Allow at most 5 individual invalidations per refresh
|
||||
SET timescaledb.materializations_per_refresh_window=5;
|
||||
|
||||
-- Insert into every second bucket
|
||||
@ -719,6 +719,7 @@ SET timescaledb.materializations_per_refresh_window='-';
|
||||
INSERT INTO conditions VALUES (140, 1, 1.0);
|
||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||
\set VERBOSITY terse
|
||||
RESET timescaledb.materializations_per_refresh_window;
|
||||
|
||||
-- Test refresh with undefined invalidation threshold and variable sized buckets
|
||||
CREATE TABLE timestamp_ht (
|
||||
@ -765,3 +766,40 @@ CREATE MATERIALIZED VIEW temperature_1month_hierarchical_ts
|
||||
SELECT time_bucket('1 month', bucket_4h, 'Europe/Berlin'), avg(average)
|
||||
FROM temperature_4h_2
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
---------------------------------------------------------------------
|
||||
--- Issue 5474
|
||||
---------------------------------------------------------------------
|
||||
CREATE TABLE i5474 (
|
||||
time timestamptz NOT NULL,
|
||||
sensor_id integer NOT NULL,
|
||||
cpu double precision NOT NULL,
|
||||
temperature double precision NOT NULL);
|
||||
|
||||
SELECT create_hypertable('i5474','time');
|
||||
|
||||
CREATE MATERIALIZED VIEW i5474_summary_daily
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT
|
||||
time_bucket('1 hour', time, 'AWST') AS bucket,
|
||||
sensor_id,
|
||||
avg(cpu) AS avg_cpu
|
||||
FROM i5474
|
||||
GROUP BY bucket, sensor_id;
|
||||
|
||||
SELECT add_continuous_aggregate_policy('i5474_summary_daily',
|
||||
start_offset => NULL,
|
||||
end_offset => INTERVAL '10 MINUTES',
|
||||
schedule_interval => INTERVAL '1 MINUTE'
|
||||
) new_job_id \gset
|
||||
|
||||
-- Check that start_offset = NULL is handled properly by the refresh job...
|
||||
CALL run_job(:new_job_id);
|
||||
|
||||
-- ...and the CAgg can be refreshed afterward
|
||||
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-03-21 05:00:00+00');
|
||||
INSERT INTO i5474 (time, sensor_id, cpu, temperature) VALUES ('2000-01-01 05:00:00+00', 1, 1.0, 1.0);
|
||||
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');
|
||||
|
||||
-- CAgg should be up-to-date now
|
||||
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');
|
||||
|
Loading…
x
Reference in New Issue
Block a user