1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-16 02:23:49 +08:00
timescaledb/tsl/test/expected/cagg_refresh_policy_incremental.out
Alexander Kuzmenkov c4e1680c5a
cleanup
2025-03-18 11:53:16 +01:00

604 lines
32 KiB
Plaintext

-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION ts_bgw_params_create() RETURNS VOID
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION ts_bgw_params_destroy() RETURNS VOID
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION ts_bgw_params_reset_time(set_time BIGINT = 0, wait BOOLEAN = false) RETURNS VOID
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
-- Create a user with specific timezone and mock time
CREATE ROLE test_cagg_refresh_policy_user WITH LOGIN;
ALTER ROLE test_cagg_refresh_policy_user SET timezone TO 'UTC';
ALTER ROLE test_cagg_refresh_policy_user SET timescaledb.current_timestamp_mock TO '2025-03-11 00:00:00+00';
GRANT ALL ON SCHEMA public TO test_cagg_refresh_policy_user;
\c :TEST_DBNAME test_cagg_refresh_policy_user
CREATE TABLE public.bgw_log(
msg_no INT,
mock_time BIGINT,
application_name TEXT,
msg TEXT
);
CREATE VIEW sorted_bgw_log AS
SELECT
msg_no,
mock_time,
application_name,
regexp_replace(regexp_replace(msg, '(Wait until|started at|execution time) [0-9]+(\.[0-9]+)?', '\1 (RANDOM)', 'g'), 'background worker "[^"]+"','connection') AS msg
FROM
bgw_log
ORDER BY
mock_time,
application_name COLLATE "C",
msg_no;
CREATE TABLE public.bgw_dsm_handle_store(
handle BIGINT
);
INSERT INTO public.bgw_dsm_handle_store VALUES (0);
SELECT ts_bgw_params_create();
ts_bgw_params_create
----------------------
(1 row)
CREATE TABLE conditions (
time TIMESTAMP WITH TIME ZONE NOT NULL,
device_id INTEGER,
temperature NUMERIC
);
SELECT FROM create_hypertable('conditions', by_range('time'));
--
(1 row)
INSERT INTO conditions
SELECT
t, d, 10
FROM
generate_series(
'2025-02-05 00:00:00+00',
'2025-03-05 00:00:00+00',
'1 hour'::interval) AS t,
generate_series(1,5) AS d;
CREATE MATERIALIZED VIEW conditions_by_day
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT
time_bucket('1 day', time),
device_id,
count(*),
min(temperature),
max(temperature),
avg(temperature),
sum(temperature)
FROM
conditions
GROUP BY
1, 2
WITH NO DATA;
SELECT
add_continuous_aggregate_policy(
'conditions_by_day',
start_offset => NULL,
end_offset => NULL,
schedule_interval => INTERVAL '1 h',
buckets_per_batch => 10
) AS job_id \gset
SELECT
config
FROM
timescaledb_information.jobs
WHERE
job_id = :'job_id' \gset
SELECT ts_bgw_params_reset_time(0, true);
ts_bgw_params_reset_time
--------------------------
(1 row)
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-----------+--------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
1 | 0 | DB Scheduler | [TESTING] Registered new background worker
2 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 0 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sat Mar 01 00:00:00 2025 UTC, Thu Mar 06 00:00:00 2025 UTC ] (batch 1 of 4)
1 | 0 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
2 | 0 | Refresh Continuous Aggregate Policy [1000] | inserted 25 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 0 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Wed Feb 19 00:00:00 2025 UTC, Sat Mar 01 00:00:00 2025 UTC ] (batch 2 of 4)
4 | 0 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
5 | 0 | Refresh Continuous Aggregate Policy [1000] | inserted 50 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
6 | 0 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sun Feb 09 00:00:00 2025 UTC, Wed Feb 19 00:00:00 2025 UTC ] (batch 3 of 4)
7 | 0 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
8 | 0 | Refresh Continuous Aggregate Policy [1000] | inserted 50 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
9 | 0 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Mon Nov 24 00:00:00 4714 UTC BC, Sun Feb 09 00:00:00 2025 UTC ] (batch 4 of 4)
10 | 0 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
11 | 0 | Refresh Continuous Aggregate Policy [1000] | inserted 20 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
(15 rows)
CREATE MATERIALIZED VIEW conditions_by_day_manual_refresh
WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS
SELECT
time_bucket('1 day', time),
device_id,
count(*),
min(temperature),
max(temperature),
avg(temperature),
sum(temperature)
FROM
conditions
GROUP BY
1, 2
WITH NO DATA;
CALL refresh_continuous_aggregate('conditions_by_day_manual_refresh', NULL, NULL);
SELECT count(*) FROM conditions_by_day;
count
-------
145
(1 row)
SELECT count(*) FROM conditions_by_day_manual_refresh;
count
-------
145
(1 row)
-- Should have no differences
SELECT
count(*) > 0 AS has_diff
FROM
((SELECT * FROM conditions_by_day_manual_refresh ORDER BY 1, 2)
EXCEPT
(SELECT * FROM conditions_by_day ORDER BY 1, 2)) AS diff;
has_diff
----------
f
(1 row)
TRUNCATE bgw_log, conditions_by_day;
SELECT
config
FROM
alter_job(
:'job_id',
config => jsonb_set(:'config', '{max_batches_per_execution}', '2')
);
config
-----------------------------------------------------------------------------------------------------------------------------
{"end_offset": null, "start_offset": null, "buckets_per_batch": 10, "mat_hypertable_id": 2, "max_batches_per_execution": 2}
(1 row)
-- advance time by 1h so that job runs one more time
SELECT ts_bgw_params_reset_time(extract(epoch from interval '1 hour')::bigint * 1000000, true);
ts_bgw_params_reset_time
--------------------------
(1 row)
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+------------+--------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 3600000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 3600000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sat Mar 01 00:00:00 2025 UTC, Thu Mar 06 00:00:00 2025 UTC ] (batch 1 of 4)
1 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
2 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | inserted 25 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Wed Feb 19 00:00:00 2025 UTC, Sat Mar 01 00:00:00 2025 UTC ] (batch 2 of 4)
4 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
5 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | inserted 50 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
6 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | reached maximum number of batches per execution (2), batches not processed (2)
(9 rows)
SELECT count(*) FROM conditions_by_day;
count
-------
75
(1 row)
SELECT count(*) FROM conditions_by_day_manual_refresh;
count
-------
145
(1 row)
-- Should have differences
SELECT
count(*) > 0 AS has_diff
FROM
((SELECT * FROM conditions_by_day_manual_refresh ORDER BY 1, 2)
EXCEPT
(SELECT * FROM conditions_by_day ORDER BY 1, 2)) AS diff;
has_diff
----------
t
(1 row)
-- advance time by 2h so that job runs one more time
SELECT ts_bgw_params_reset_time(extract(epoch from interval '2 hour')::bigint * 1000000, true);
ts_bgw_params_reset_time
--------------------------
(1 row)
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+------------+--------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 3600000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 3600000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sat Mar 01 00:00:00 2025 UTC, Thu Mar 06 00:00:00 2025 UTC ] (batch 1 of 4)
1 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
2 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | inserted 25 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Wed Feb 19 00:00:00 2025 UTC, Sat Mar 01 00:00:00 2025 UTC ] (batch 2 of 4)
4 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
5 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | inserted 50 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
6 | 3600000000 | Refresh Continuous Aggregate Policy [1000] | reached maximum number of batches per execution (2), batches not processed (2)
0 | 7200000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 7200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 7200000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sun Feb 09 00:00:00 2025 UTC, Wed Feb 19 00:00:00 2025 UTC ] (batch 1 of 2)
1 | 7200000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
2 | 7200000000 | Refresh Continuous Aggregate Policy [1000] | inserted 50 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 7200000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Mon Nov 24 00:00:00 4714 UTC BC, Sun Feb 09 00:00:00 2025 UTC ] (batch 2 of 2)
4 | 7200000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
5 | 7200000000 | Refresh Continuous Aggregate Policy [1000] | inserted 20 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
(17 rows)
-- Should have no differences
SELECT
count(*) > 0 AS has_diff
FROM
((SELECT * FROM conditions_by_day_manual_refresh ORDER BY 1, 2)
EXCEPT
(SELECT * FROM conditions_by_day ORDER BY 1, 2)) AS diff;
has_diff
----------
f
(1 row)
-- Set max_batches_per_execution to 10
SELECT
config
FROM
alter_job(
:'job_id',
config => jsonb_set(:'config', '{max_batches_per_execution}', '10')
);
config
------------------------------------------------------------------------------------------------------------------------------
{"end_offset": null, "start_offset": null, "buckets_per_batch": 10, "mat_hypertable_id": 2, "max_batches_per_execution": 10}
(1 row)
TRUNCATE bgw_log;
-- Insert data into the past
INSERT INTO conditions
SELECT
t, d, 10
FROM
generate_series(
'2020-02-05 00:00:00+00',
'2020-03-05 00:00:00+00',
'1 hour'::interval) AS t,
generate_series(1,5) AS d;
-- advance time by 3h so that job runs one more time
SELECT ts_bgw_params_reset_time(extract(epoch from interval '3 hour')::bigint * 1000000, true);
ts_bgw_params_reset_time
--------------------------
(1 row)
-- Should process all four batches in the past
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 10800000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 10800000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sat Feb 29 00:00:00 2020 UTC, Fri Mar 06 00:00:00 2020 UTC ] (batch 1 of 4)
1 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
2 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | inserted 30 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Wed Feb 19 00:00:00 2020 UTC, Sat Feb 29 00:00:00 2020 UTC ] (batch 2 of 4)
4 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
5 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | inserted 50 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
6 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sun Feb 09 00:00:00 2020 UTC, Wed Feb 19 00:00:00 2020 UTC ] (batch 3 of 4)
7 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
8 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | inserted 50 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
9 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Wed Feb 05 00:00:00 2020 UTC, Sun Feb 09 00:00:00 2020 UTC ] (batch 4 of 4)
10 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
11 | 10800000000 | Refresh Continuous Aggregate Policy [1000] | inserted 20 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
(14 rows)
SELECT count(*) FROM conditions_by_day;
count
-------
295
(1 row)
SELECT count(*) FROM conditions_by_day_manual_refresh;
count
-------
145
(1 row)
CALL refresh_continuous_aggregate('conditions_by_day_manual_refresh', NULL, NULL);
SELECT count(*) FROM conditions_by_day;
count
-------
295
(1 row)
SELECT count(*) FROM conditions_by_day_manual_refresh;
count
-------
295
(1 row)
-- Should have no differences
SELECT
count(*) > 0 AS has_diff
FROM
((SELECT * FROM conditions_by_day_manual_refresh ORDER BY 1, 2)
EXCEPT
(SELECT * FROM conditions_by_day ORDER BY 1, 2)) AS diff;
has_diff
----------
f
(1 row)
-- Check invalid configurations
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT
config
FROM
alter_job(
:'job_id',
config => jsonb_set(:'config', '{max_batches_per_execution}', '-1')
);
ERROR: invalid max batches per execution
DETAIL: max_batches_per_execution: -1
HINT: The max batches per execution should be greater than or equal to zero.
SELECT
config
FROM
alter_job(
:'job_id',
config => jsonb_set(:'config', '{buckets_per_batch}', '-1')
);
ERROR: invalid buckets per batch
DETAIL: buckets_per_batch: -1
HINT: The buckets per batch should be greater than or equal to zero.
\set VERBOSITY terse
\set ON_ERROR_STOP 1
-- Truncate all data from the original hypertable
TRUNCATE bgw_log, conditions;
-- advance time by 4h so that job runs one more time
SELECT ts_bgw_params_reset_time(extract(epoch from interval '4 hour')::bigint * 1000000, true);
ts_bgw_params_reset_time
--------------------------
(1 row)
-- Should fallback to single batch processing because there's no data to be refreshed on the original hypertable
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 14400000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 14400000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | no min slice range start for continuous aggregate "public.conditions_by_day", falling back to single batch processing
1 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Mon Nov 24 00:00:00 4714 UTC BC, Thu Mar 06 00:00:00 2025 UTC ]
2 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | deleted 295 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 14400000000 | Refresh Continuous Aggregate Policy [1000] | inserted 0 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
(6 rows)
-- Should return zero rows
SELECT count(*) FROM conditions_by_day;
count
-------
0
(1 row)
-- 1 day of data
INSERT INTO conditions
SELECT
t, d, 10
FROM
generate_series(
'2020-02-05 00:00:00+00',
'2020-02-06 00:00:00+00',
'1 hour'::interval) AS t,
generate_series(1,5) AS d;
TRUNCATE bgw_log;
-- advance time by 5h so that job runs one more time
SELECT ts_bgw_params_reset_time(extract(epoch from interval '5 hour')::bigint * 1000000, true);
ts_bgw_params_reset_time
--------------------------
(1 row)
-- Should fallback to single batch processing because the refresh size is too small
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 18000000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 18000000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | only one batch produced for continuous aggregate "public.conditions_by_day", falling back to single batch processing
1 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Wed Feb 05 00:00:00 2020 UTC, Fri Feb 07 00:00:00 2020 UTC ]
2 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 18000000000 | Refresh Continuous Aggregate Policy [1000] | inserted 10 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
(6 rows)
-- Should return 10 rows because the bucket width is `1 day` and buckets per batch is `10`
SELECT count(*) FROM conditions_by_day;
count
-------
10
(1 row)
TRUNCATE conditions_by_day, conditions, bgw_log;
-- Less than 1 day of data (smaller than the bucket width)
INSERT INTO conditions
VALUES ('2020-02-05 00:00:00+00', 1, 10);
-- advance time by 6h so that job runs one more time
SELECT ts_bgw_params_reset_time(extract(epoch from interval '6 hour')::bigint * 1000000, true);
ts_bgw_params_reset_time
--------------------------
(1 row)
-- Should fallback to single batch processing because the refresh size is too small
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 21600000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 21600000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | refresh window size (7 days) is smaller than or equal to batch size (10 days), falling back to single batch processing
1 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Mon Nov 24 00:00:00 4714 UTC BC, Thu Mar 06 00:00:00 2025 UTC ]
2 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 21600000000 | Refresh Continuous Aggregate Policy [1000] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
(6 rows)
-- Should return 1 row
SELECT count(*) FROM conditions_by_day;
count
-------
1
(1 row)
SELECT delete_job(:job_id);
delete_job
------------
(1 row)
SELECT
add_continuous_aggregate_policy(
'conditions_by_day',
start_offset => INTERVAL '15 days',
end_offset => NULL,
schedule_interval => INTERVAL '1 h',
buckets_per_batch => 5
) AS job_id \gset
SELECT
add_continuous_aggregate_policy(
'conditions_by_day_manual_refresh',
start_offset => INTERVAL '15 days',
end_offset => NULL,
schedule_interval => INTERVAL '1 h'
) AS job_id \gset
TRUNCATE bgw_log, conditions_by_day, conditions_by_day_manual_refresh, conditions;
INSERT INTO conditions
SELECT
t, d, 10
FROM
generate_series(
'2025-03-11 00:00:00+00'::timestamptz - INTERVAL '30 days',
'2025-03-11 00:00:00+00'::timestamptz,
'1 hour'::interval) AS t,
generate_series(1,5) AS d;
SELECT ts_bgw_params_reset_time(0, true);
ts_bgw_params_reset_time
--------------------------
(1 row)
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
------------------------------------------------------------
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-----------+--------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
1 | 0 | DB Scheduler | [TESTING] Registered new background worker
2 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
0 | 0 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Tue Mar 11 00:00:00 2025 UTC, Wed Mar 12 00:00:00 2025 UTC ] (batch 1 of 4)
1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 5 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
3 | 0 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Thu Mar 06 00:00:00 2025 UTC, Tue Mar 11 00:00:00 2025 UTC ] (batch 2 of 4)
4 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
5 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 25 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
6 | 0 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Sat Mar 01 00:00:00 2025 UTC, Thu Mar 06 00:00:00 2025 UTC ] (batch 3 of 4)
7 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
8 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 25 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
9 | 0 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "conditions_by_day" in window [ Mon Feb 24 00:00:00 2025 UTC, Sat Mar 01 00:00:00 2025 UTC ] (batch 4 of 4)
10 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
11 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 25 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
0 | 0 | Refresh Continuous Aggregate Policy [1002] | continuous aggregate refresh (individual invalidation) on "conditions_by_day_manual_refresh" in window [ Mon Feb 24 00:00:00 2025 UTC, Wed Mar 12 00:00:00 2025 UTC ]
1 | 0 | Refresh Continuous Aggregate Policy [1002] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
2 | 0 | Refresh Continuous Aggregate Policy [1002] | inserted 80 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
(18 rows)
-- Both continuous aggregates should have the same data
SELECT count(*) FROM conditions_by_day;
count
-------
80
(1 row)
SELECT count(*) FROM conditions_by_day_manual_refresh;
count
-------
80
(1 row)
-- Should have no differences
SELECT
count(*) > 0 AS has_diff
FROM
((SELECT * FROM conditions_by_day_manual_refresh ORDER BY 1, 2)
EXCEPT
(SELECT * FROM conditions_by_day ORDER BY 1, 2)) AS diff;
has_diff
----------
f
(1 row)
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
REASSIGN OWNED BY test_cagg_refresh_policy_user TO :ROLE_CLUSTER_SUPERUSER;
REVOKE ALL ON SCHEMA public FROM test_cagg_refresh_policy_user;
DROP ROLE test_cagg_refresh_policy_user;