mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 10:33:27 +08:00
Create low end invalidation when updating caggs
This change will add an invalidation to the materialization_invalidation_log for any region earlier than the ignore_invalidation_older_than parameter when updating a continuous aggregate to 2.0. This is needed as we do not record invalidations in this region prior to 2.0 and there is no way to ensure the aggregate is up to date within this range. Fixes #2450
This commit is contained in:
parent
ef7f21df6d
commit
0703822a83
@ -215,28 +215,23 @@ SET
|
|||||||
proc_schema = '_timescaledb_internal',
|
proc_schema = '_timescaledb_internal',
|
||||||
proc_name = 'policy_refresh_continuous_aggregate',
|
proc_name = 'policy_refresh_continuous_aggregate',
|
||||||
job_type = 'custom',
|
job_type = 'custom',
|
||||||
config = jsonb_build_object('mat_hypertable_id', cagg.mat_hypertable_id, 'start_interval',
|
config =
|
||||||
CASE WHEN cagg.ignore_invalidation_older_than IS NULL
|
CASE WHEN ts_tmp_get_time_type( cagg.raw_hypertable_id ) IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype)
|
||||||
THEN NULL
|
THEN
|
||||||
WHEN cagg.ignore_invalidation_older_than = 9223372036854775807
|
jsonb_build_object('mat_hypertable_id', cagg.mat_hypertable_id, 'start_interval',
|
||||||
AND
|
CASE WHEN cagg.ignore_invalidation_older_than IS NULL OR cagg.ignore_invalidation_older_than = 9223372036854775807
|
||||||
ts_tmp_get_time_type( cagg.raw_hypertable_id ) IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype)
|
THEN NULL
|
||||||
THEN NULL
|
ELSE ts_tmp_get_interval(cagg.ignore_invalidation_older_than)::TEXT
|
||||||
ELSE
|
END
|
||||||
CASE WHEN
|
, 'end_interval', ts_tmp_get_interval(cagg.refresh_lag)::TEXT)
|
||||||
ts_tmp_get_time_type( cagg.raw_hypertable_id ) IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype)
|
ELSE
|
||||||
THEN ts_tmp_get_interval(cagg.ignore_invalidation_older_than)::TEXT
|
jsonb_build_object('mat_hypertable_id', cagg.mat_hypertable_id, 'start_interval',
|
||||||
ELSE cagg.ignore_invalidation_older_than::TEXT
|
CASE WHEN cagg.ignore_invalidation_older_than IS NULL OR cagg.ignore_invalidation_older_than = 9223372036854775807
|
||||||
END
|
THEN NULL
|
||||||
END,
|
ELSE cagg.ignore_invalidation_older_than::BIGINT
|
||||||
'end_interval',
|
END
|
||||||
CASE
|
, 'end_interval', cagg.refresh_lag::BIGINT)
|
||||||
WHEN
|
END,
|
||||||
ts_tmp_get_time_type( cagg.raw_hypertable_id ) IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype)
|
|
||||||
THEN ts_tmp_get_interval(cagg.refresh_lag)::TEXT
|
|
||||||
ELSE cagg.refresh_lag::TEXT
|
|
||||||
END
|
|
||||||
),
|
|
||||||
hypertable_id = cagg.mat_hypertable_id,
|
hypertable_id = cagg.mat_hypertable_id,
|
||||||
owner = (
|
owner = (
|
||||||
SELECT relowner::regrole::text
|
SELECT relowner::regrole::text
|
||||||
@ -316,8 +311,46 @@ SELECT materialization_id, BIGINT '-9223372036854775808', watermark, 92233720368
|
|||||||
FROM _timescaledb_catalog.continuous_aggs_completed_threshold;
|
FROM _timescaledb_catalog.continuous_aggs_completed_threshold;
|
||||||
-- Also handle continuous aggs that have never been run
|
-- Also handle continuous aggs that have never been run
|
||||||
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
|
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
|
||||||
SELECT (SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg EXCEPT SELECT materialization_id FROM _timescaledb_catalog.continuous_aggs_completed_threshold),
|
SELECT unrun_cagg.id, -9223372036854775808, -9223372036854775808, 9223372036854775807 FROM
|
||||||
-9223372036854775808, -9223372036854775808, 9223372036854775807;
|
(SELECT mat_hypertable_id id FROM _timescaledb_catalog.continuous_agg EXCEPT SELECT materialization_id FROM _timescaledb_catalog.continuous_aggs_completed_threshold) as unrun_cagg;
|
||||||
|
|
||||||
|
-- Also add an invalidation from [-infinity, now() - ignore_invaliation_older_than] to cover any missed invalidations
|
||||||
|
-- For NULL or inf ignore_invalidations_older_than, use julian 0 for consistency with 2.0 (for int tables, use INT_MIN - 1)
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
cagg _timescaledb_catalog.continuous_agg%ROWTYPE;
|
||||||
|
dimrow _timescaledb_catalog.dimension%ROWTYPE;
|
||||||
|
end_val bigint;
|
||||||
|
getendval text;
|
||||||
|
BEGIN
|
||||||
|
FOR cagg in SELECT * FROM _timescaledb_catalog.continuous_agg
|
||||||
|
LOOP
|
||||||
|
SELECT * INTO dimrow
|
||||||
|
FROM _timescaledb_catalog.dimension dim
|
||||||
|
WHERE dim.hypertable_id = cagg.raw_hypertable_id AND dim.num_slices IS NULL AND dim.interval_length IS NOT NULL;
|
||||||
|
|
||||||
|
IF dimrow.column_type IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype)
|
||||||
|
THEN
|
||||||
|
IF cagg.ignore_invalidation_older_than IS NULL OR cagg.ignore_invalidation_older_than = 9223372036854775807
|
||||||
|
THEN
|
||||||
|
end_val := -210866803200000001;
|
||||||
|
ELSE
|
||||||
|
end_val := (extract(epoch from now()) * 1000000 - cagg.ignore_invalidation_older_than)::int8;
|
||||||
|
END IF;
|
||||||
|
ELSE
|
||||||
|
IF cagg.ignore_invalidation_older_than IS NULL OR cagg.ignore_invalidation_older_than = 9223372036854775807
|
||||||
|
THEN
|
||||||
|
end_val := -2147483649;
|
||||||
|
ELSE
|
||||||
|
getendval := format('SELECT %s.%s() - %s', dimrow.integer_now_func_schema, dimrow.integer_now_func, cagg.ignore_invalidation_older_than);
|
||||||
|
EXECUTE getendval INTO end_val;
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
|
||||||
|
VALUES (cagg.mat_hypertable_id, -9223372036854775808, -9223372036854775808, end_val);
|
||||||
|
END LOOP;
|
||||||
|
END $$;
|
||||||
|
|
||||||
-- drop completed_threshold table, which is no longer used
|
-- drop completed_threshold table, which is no longer used
|
||||||
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_aggs_completed_threshold;
|
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_aggs_completed_threshold;
|
||||||
|
@ -16,6 +16,9 @@ SELECT view_name, schedule_interval, materialized_only, materialization_hypertab
|
|||||||
|
|
||||||
SELECT maxtemp FROM mat_ignoreinval ORDER BY 1;
|
SELECT maxtemp FROM mat_ignoreinval ORDER BY 1;
|
||||||
|
|
||||||
|
SELECT materialization_id FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
|
||||||
|
WHERE lowest_modified_value = -9223372036854775808 ORDER BY 1;
|
||||||
|
|
||||||
SELECT count(*) FROM mat_inval;
|
SELECT count(*) FROM mat_inval;
|
||||||
CALL refresh_continuous_aggregate('mat_inval',NULL,NULL);
|
CALL refresh_continuous_aggregate('mat_inval',NULL,NULL);
|
||||||
SELECT count(*) FROM mat_inval;
|
SELECT count(*) FROM mat_inval;
|
||||||
|
@ -328,3 +328,69 @@ INSERT INTO inval_test
|
|||||||
SELECT generate_series('2118-12-01 00:00'::timestamp, '2118-12-20 00:00'::timestamp, '1 day'), 'POR', generate_series(135.25, 140.0, 0.25);
|
SELECT generate_series('2118-12-01 00:00'::timestamp, '2118-12-20 00:00'::timestamp, '1 day'), 'POR', generate_series(135.25, 140.0, 0.25);
|
||||||
INSERT INTO inval_test
|
INSERT INTO inval_test
|
||||||
SELECT generate_series('2118-12-01 00:00'::timestamp, '2118-12-20 00:00'::timestamp, '1 day'), 'NYC', generate_series(131.0, 150.0, 1.0);
|
SELECT generate_series('2118-12-01 00:00'::timestamp, '2118-12-20 00:00'::timestamp, '1 day'), 'NYC', generate_series(131.0, 150.0, 1.0);
|
||||||
|
|
||||||
|
-- Add an integer base table to ensure we handle it correctly
|
||||||
|
CREATE TABLE int_time_test(timeval integer, col1 integer, col2 integer);
|
||||||
|
select create_hypertable('int_time_test', 'timeval', chunk_time_interval=> 2);
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION integer_now_test() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(timeval), 0) FROM int_time_test $$;
|
||||||
|
SELECT set_integer_now_func('int_time_test', 'integer_now_test');
|
||||||
|
|
||||||
|
INSERT INTO int_time_test VALUES
|
||||||
|
(10, - 4, 1), (11, - 3, 5), (12, - 3, 7), (13, - 3, 9), (14,-4, 11),
|
||||||
|
(15, -4, 22), (16, -4, 23);
|
||||||
|
|
||||||
|
DO LANGUAGE PLPGSQL $$
|
||||||
|
DECLARE
|
||||||
|
ts_version TEXT;
|
||||||
|
BEGIN
|
||||||
|
SELECT extversion INTO ts_version FROM pg_extension WHERE extname = 'timescaledb';
|
||||||
|
|
||||||
|
IF ts_version < '2.0.0' THEN
|
||||||
|
CREATE VIEW mat_inttime
|
||||||
|
WITH ( timescaledb.continuous, timescaledb.materialized_only=true,
|
||||||
|
timescaledb.ignore_invalidation_older_than = 5,
|
||||||
|
timescaledb.refresh_lag = 2,
|
||||||
|
timescaledb.refresh_interval='12 hours')
|
||||||
|
AS
|
||||||
|
SELECT time_bucket( 2, timeval), COUNT(col1)
|
||||||
|
FROM int_time_test
|
||||||
|
GROUP BY 1;
|
||||||
|
|
||||||
|
CREATE VIEW mat_inttime2
|
||||||
|
WITH ( timescaledb.continuous, timescaledb.materialized_only=true,
|
||||||
|
timescaledb.refresh_lag = 2,
|
||||||
|
timescaledb.refresh_interval='12 hours')
|
||||||
|
AS
|
||||||
|
SELECT time_bucket( 2, timeval), COUNT(col1)
|
||||||
|
FROM int_time_test
|
||||||
|
GROUP BY 1;
|
||||||
|
|
||||||
|
ELSE
|
||||||
|
CREATE MATERIALIZED VIEW mat_inttime
|
||||||
|
WITH ( timescaledb.continuous, timescaledb.materialized_only=true )
|
||||||
|
AS
|
||||||
|
SELECT time_bucket( 2, timeval), COUNT(col1)
|
||||||
|
FROM int_time_test
|
||||||
|
GROUP BY 1 WITH NO DATA;
|
||||||
|
|
||||||
|
CREATE MATERIALIZED VIEW mat_inttime2
|
||||||
|
WITH ( timescaledb.continuous, timescaledb.materialized_only=true )
|
||||||
|
AS
|
||||||
|
SELECT time_bucket( 2, timeval), COUNT(col1)
|
||||||
|
FROM int_time_test
|
||||||
|
GROUP BY 1 WITH NO DATA;
|
||||||
|
|
||||||
|
PERFORM add_continuous_aggregate_policy('mat_inttime', 5, 2, '12 hours');
|
||||||
|
PERFORM add_continuous_aggregate_policy('mat_inttime2', NULL, 2, '12 hours');
|
||||||
|
END IF;
|
||||||
|
END $$;
|
||||||
|
|
||||||
|
|
||||||
|
\if :has_refresh_mat_view
|
||||||
|
REFRESH MATERIALIZED VIEW mat_inttime;
|
||||||
|
REFRESH MATERIALIZED VIEW mat_inttime2;
|
||||||
|
\else
|
||||||
|
CALL refresh_continuous_aggregate('mat_inttime',NULL,NULL);
|
||||||
|
CALL refresh_continuous_aggregate('mat_inttime2',NULL,NULL);
|
||||||
|
\endif
|
||||||
|
Loading…
x
Reference in New Issue
Block a user