diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index 58ec021fb..e38e9c89b 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -215,28 +215,23 @@ SET proc_schema = '_timescaledb_internal', proc_name = 'policy_refresh_continuous_aggregate', job_type = 'custom', - config = jsonb_build_object('mat_hypertable_id', cagg.mat_hypertable_id, 'start_interval', - CASE WHEN cagg.ignore_invalidation_older_than IS NULL - THEN NULL - WHEN cagg.ignore_invalidation_older_than = 9223372036854775807 - AND - ts_tmp_get_time_type( cagg.raw_hypertable_id ) IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype) - THEN NULL - ELSE - CASE WHEN - ts_tmp_get_time_type( cagg.raw_hypertable_id ) IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype) - THEN ts_tmp_get_interval(cagg.ignore_invalidation_older_than)::TEXT - ELSE cagg.ignore_invalidation_older_than::TEXT - END - END, - 'end_interval', - CASE - WHEN - ts_tmp_get_time_type( cagg.raw_hypertable_id ) IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype) - THEN ts_tmp_get_interval(cagg.refresh_lag)::TEXT - ELSE cagg.refresh_lag::TEXT - END -), + config = + CASE WHEN ts_tmp_get_time_type( cagg.raw_hypertable_id ) IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype) + THEN + jsonb_build_object('mat_hypertable_id', cagg.mat_hypertable_id, 'start_interval', + CASE WHEN cagg.ignore_invalidation_older_than IS NULL OR cagg.ignore_invalidation_older_than = 9223372036854775807 + THEN NULL + ELSE ts_tmp_get_interval(cagg.ignore_invalidation_older_than)::TEXT + END + , 'end_interval', ts_tmp_get_interval(cagg.refresh_lag)::TEXT) + ELSE + jsonb_build_object('mat_hypertable_id', cagg.mat_hypertable_id, 'start_interval', + CASE WHEN cagg.ignore_invalidation_older_than IS NULL OR cagg.ignore_invalidation_older_than = 9223372036854775807 + THEN NULL + ELSE cagg.ignore_invalidation_older_than::BIGINT + END + , 'end_interval', cagg.refresh_lag::BIGINT) + END, hypertable_id = cagg.mat_hypertable_id, owner = ( SELECT relowner::regrole::text @@ -316,8 +311,46 @@ SELECT materialization_id, BIGINT '-9223372036854775808', watermark, 92233720368 FROM _timescaledb_catalog.continuous_aggs_completed_threshold; -- Also handle continuous aggs that have never been run INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log -SELECT (SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg EXCEPT SELECT materialization_id FROM _timescaledb_catalog.continuous_aggs_completed_threshold), --9223372036854775808, -9223372036854775808, 9223372036854775807; +SELECT unrun_cagg.id, -9223372036854775808, -9223372036854775808, 9223372036854775807 FROM +(SELECT mat_hypertable_id id FROM _timescaledb_catalog.continuous_agg EXCEPT SELECT materialization_id FROM _timescaledb_catalog.continuous_aggs_completed_threshold) as unrun_cagg; + +-- Also add an invalidation from [-infinity, now() - ignore_invaliation_older_than] to cover any missed invalidations +-- For NULL or inf ignore_invalidations_older_than, use julian 0 for consistency with 2.0 (for int tables, use INT_MIN - 1) +DO $$ +DECLARE + cagg _timescaledb_catalog.continuous_agg%ROWTYPE; + dimrow _timescaledb_catalog.dimension%ROWTYPE; + end_val bigint; + getendval text; +BEGIN + FOR cagg in SELECT * FROM _timescaledb_catalog.continuous_agg + LOOP + SELECT * INTO dimrow + FROM _timescaledb_catalog.dimension dim + WHERE dim.hypertable_id = cagg.raw_hypertable_id AND dim.num_slices IS NULL AND dim.interval_length IS NOT NULL; + + IF dimrow.column_type IN ('TIMESTAMP'::regtype, 'DATE'::regtype, 'TIMESTAMPTZ'::regtype) + THEN + IF cagg.ignore_invalidation_older_than IS NULL OR cagg.ignore_invalidation_older_than = 9223372036854775807 + THEN + end_val := -210866803200000001; + ELSE + end_val := (extract(epoch from now()) * 1000000 - cagg.ignore_invalidation_older_than)::int8; + END IF; + ELSE + IF cagg.ignore_invalidation_older_than IS NULL OR cagg.ignore_invalidation_older_than = 9223372036854775807 + THEN + end_val := -2147483649; + ELSE + getendval := format('SELECT %s.%s() - %s', dimrow.integer_now_func_schema, dimrow.integer_now_func, cagg.ignore_invalidation_older_than); + EXECUTE getendval INTO end_val; + END IF; + END IF; + + INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + VALUES (cagg.mat_hypertable_id, -9223372036854775808, -9223372036854775808, end_val); + END LOOP; +END $$; -- drop completed_threshold table, which is no longer used ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.continuous_aggs_completed_threshold; diff --git a/test/sql/updates/post.continuous_aggs.v2.sql b/test/sql/updates/post.continuous_aggs.v2.sql index 2fb56d748..23dcc8d79 100644 --- a/test/sql/updates/post.continuous_aggs.v2.sql +++ b/test/sql/updates/post.continuous_aggs.v2.sql @@ -16,6 +16,9 @@ SELECT view_name, schedule_interval, materialized_only, materialization_hypertab SELECT maxtemp FROM mat_ignoreinval ORDER BY 1; +SELECT materialization_id FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log +WHERE lowest_modified_value = -9223372036854775808 ORDER BY 1; + SELECT count(*) FROM mat_inval; CALL refresh_continuous_aggregate('mat_inval',NULL,NULL); SELECT count(*) FROM mat_inval; diff --git a/test/sql/updates/setup.continuous_aggs.v2.sql b/test/sql/updates/setup.continuous_aggs.v2.sql index cb1ac9df0..59db61dfe 100644 --- a/test/sql/updates/setup.continuous_aggs.v2.sql +++ b/test/sql/updates/setup.continuous_aggs.v2.sql @@ -328,3 +328,69 @@ INSERT INTO inval_test SELECT generate_series('2118-12-01 00:00'::timestamp, '2118-12-20 00:00'::timestamp, '1 day'), 'POR', generate_series(135.25, 140.0, 0.25); INSERT INTO inval_test SELECT generate_series('2118-12-01 00:00'::timestamp, '2118-12-20 00:00'::timestamp, '1 day'), 'NYC', generate_series(131.0, 150.0, 1.0); + +-- Add an integer base table to ensure we handle it correctly +CREATE TABLE int_time_test(timeval integer, col1 integer, col2 integer); +select create_hypertable('int_time_test', 'timeval', chunk_time_interval=> 2); + +CREATE OR REPLACE FUNCTION integer_now_test() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(timeval), 0) FROM int_time_test $$; +SELECT set_integer_now_func('int_time_test', 'integer_now_test'); + +INSERT INTO int_time_test VALUES +(10, - 4, 1), (11, - 3, 5), (12, - 3, 7), (13, - 3, 9), (14,-4, 11), +(15, -4, 22), (16, -4, 23); + +DO LANGUAGE PLPGSQL $$ +DECLARE + ts_version TEXT; +BEGIN + SELECT extversion INTO ts_version FROM pg_extension WHERE extname = 'timescaledb'; + + IF ts_version < '2.0.0' THEN + CREATE VIEW mat_inttime + WITH ( timescaledb.continuous, timescaledb.materialized_only=true, + timescaledb.ignore_invalidation_older_than = 5, + timescaledb.refresh_lag = 2, + timescaledb.refresh_interval='12 hours') + AS + SELECT time_bucket( 2, timeval), COUNT(col1) + FROM int_time_test + GROUP BY 1; + + CREATE VIEW mat_inttime2 + WITH ( timescaledb.continuous, timescaledb.materialized_only=true, + timescaledb.refresh_lag = 2, + timescaledb.refresh_interval='12 hours') + AS + SELECT time_bucket( 2, timeval), COUNT(col1) + FROM int_time_test + GROUP BY 1; + + ELSE + CREATE MATERIALIZED VIEW mat_inttime + WITH ( timescaledb.continuous, timescaledb.materialized_only=true ) + AS + SELECT time_bucket( 2, timeval), COUNT(col1) + FROM int_time_test + GROUP BY 1 WITH NO DATA; + + CREATE MATERIALIZED VIEW mat_inttime2 + WITH ( timescaledb.continuous, timescaledb.materialized_only=true ) + AS + SELECT time_bucket( 2, timeval), COUNT(col1) + FROM int_time_test + GROUP BY 1 WITH NO DATA; + + PERFORM add_continuous_aggregate_policy('mat_inttime', 5, 2, '12 hours'); + PERFORM add_continuous_aggregate_policy('mat_inttime2', NULL, 2, '12 hours'); + END IF; +END $$; + + +\if :has_refresh_mat_view +REFRESH MATERIALIZED VIEW mat_inttime; +REFRESH MATERIALIZED VIEW mat_inttime2; +\else +CALL refresh_continuous_aggregate('mat_inttime',NULL,NULL); +CALL refresh_continuous_aggregate('mat_inttime2',NULL,NULL); +\endif