mirror of
https://github.com/timescale/timescaledb.git
synced 2025-06-01 18:56:47 +08:00
Several scheduler regression tests create the plpgsql function `wait_for_job_to_run` with the same purpose of waiting for a given job to execute or fail, so refactor the regression tests by adding it to the testsupport.sql library.
714 lines
36 KiB
Plaintext
714 lines
36 KiB
Plaintext
-- This file and its contents are licensed under the Timescale License.
|
|
-- Please see the included NOTICE for copyright information and
|
|
-- LICENSE-TIMESCALE for a copy of the license.
|
|
--
|
|
-- Setup
|
|
--
|
|
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
|
|
CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID
|
|
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
|
|
CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID
|
|
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
|
|
CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_wait_for_scheduler_finish() RETURNS VOID
|
|
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
|
|
CREATE OR REPLACE FUNCTION ts_bgw_params_create() RETURNS VOID
|
|
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
|
|
CREATE OR REPLACE FUNCTION ts_bgw_params_destroy() RETURNS VOID
|
|
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
|
|
CREATE OR REPLACE FUNCTION ts_bgw_params_reset_time(set_time BIGINT = 0, wait BOOLEAN = false) RETURNS VOID
|
|
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
|
|
--test that this all works under the community license
|
|
ALTER DATABASE :TEST_DBNAME SET timescaledb.license_key='Community';
|
|
--create a function with no permissions to execute
|
|
CREATE FUNCTION get_constant_no_perms() RETURNS INTEGER LANGUAGE SQL IMMUTABLE AS
|
|
$BODY$
|
|
SELECT 10;
|
|
$BODY$;
|
|
REVOKE EXECUTE ON FUNCTION get_constant_no_perms() FROM PUBLIC;
|
|
\set WAIT_ON_JOB 0
|
|
\set IMMEDIATELY_SET_UNTIL 1
|
|
\set WAIT_FOR_OTHER_TO_ADVANCE 2
|
|
CREATE OR REPLACE FUNCTION ts_bgw_params_mock_wait_returns_immediately(new_val INTEGER) RETURNS VOID
|
|
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
|
|
-- Remove any default jobs, e.g., telemetry
|
|
DELETE FROM _timescaledb_config.bgw_job WHERE TRUE;
|
|
TRUNCATE _timescaledb_internal.bgw_job_stat;
|
|
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
|
|
CREATE TABLE public.bgw_log(
|
|
msg_no INT,
|
|
mock_time BIGINT,
|
|
application_name TEXT,
|
|
msg TEXT
|
|
);
|
|
CREATE VIEW sorted_bgw_log AS
|
|
SELECT msg_no,
|
|
mock_time,
|
|
application_name,
|
|
regexp_replace(regexp_replace(msg, '(Wait until|started at|execution time) [0-9]+(\.[0-9]+)?', '\1 (RANDOM)', 'g'), 'background worker "[^"]+"','connection') AS msg
|
|
FROM bgw_log ORDER BY mock_time, application_name COLLATE "C", msg_no;
|
|
CREATE TABLE public.bgw_dsm_handle_store(
|
|
handle BIGINT
|
|
);
|
|
INSERT INTO public.bgw_dsm_handle_store VALUES (0);
|
|
SELECT ts_bgw_params_create();
|
|
ts_bgw_params_create
|
|
----------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM _timescaledb_config.bgw_job;
|
|
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone
|
|
----+------------------+-------------------+-------------+-------------+--------------+-------------+-----------+-------+-----------+----------------+---------------+---------------+--------+--------------+------------+----------
|
|
(0 rows)
|
|
|
|
SELECT * FROM timescaledb_information.job_stats;
|
|
hypertable_schema | hypertable_name | job_id | last_run_started_at | last_successful_finish | last_run_status | job_status | last_run_duration | next_start | total_runs | total_successes | total_failures
|
|
-------------------+-----------------+--------+---------------------+------------------------+-----------------+------------+-------------------+------------+------------+-----------------+----------------
|
|
(0 rows)
|
|
|
|
SELECT * FROM _timescaledb_catalog.continuous_agg;
|
|
mat_hypertable_id | raw_hypertable_id | parent_mat_hypertable_id | user_view_schema | user_view_name | partial_view_schema | partial_view_name | direct_view_schema | direct_view_name | materialized_only | finalized
|
|
-------------------+-------------------+--------------------------+------------------+----------------+---------------------+-------------------+--------------------+------------------+-------------------+-----------
|
|
(0 rows)
|
|
|
|
-- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes
|
|
GRANT CREATE ON SCHEMA public TO :ROLE_DEFAULT_PERM_USER;
|
|
WARNING: no privileges were granted for "public"
|
|
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
|
|
CREATE TABLE test_continuous_agg_table(time int, data int);
|
|
SELECT create_hypertable('test_continuous_agg_table', 'time', chunk_time_interval => 10);
|
|
NOTICE: adding not-null constraint to column "time"
|
|
create_hypertable
|
|
----------------------------------------
|
|
(1,public,test_continuous_agg_table,t)
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION integer_now_test() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table $$;
|
|
SELECT set_integer_now_func('test_continuous_agg_table', 'integer_now_test');
|
|
set_integer_now_func
|
|
----------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE MATERIALIZED VIEW test_continuous_agg_view
|
|
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
|
|
AS SELECT time_bucket('2', time), SUM(data) as value
|
|
FROM test_continuous_agg_table
|
|
GROUP BY 1 WITH NO DATA;
|
|
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, 4::integer, '12 h'::interval);
|
|
add_continuous_aggregate_policy
|
|
---------------------------------
|
|
1000
|
|
(1 row)
|
|
|
|
SELECT id as raw_table_id FROM _timescaledb_catalog.hypertable WHERE table_name='test_continuous_agg_table' \gset
|
|
-- min distance from end should be 1
|
|
SELECT mat_hypertable_id, user_view_schema, user_view_name FROM _timescaledb_catalog.continuous_agg;
|
|
mat_hypertable_id | user_view_schema | user_view_name
|
|
-------------------+------------------+--------------------------
|
|
2 | public | test_continuous_agg_view
|
|
(1 row)
|
|
|
|
SELECT mat_hypertable_id, bucket_width FROM _timescaledb_catalog.continuous_aggs_bucket_function;
|
|
mat_hypertable_id | bucket_width
|
|
-------------------+--------------
|
|
2 | 2
|
|
(1 row)
|
|
|
|
SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset
|
|
SELECT id AS job_id FROM _timescaledb_config.bgw_job where hypertable_id=:mat_hypertable_id \gset
|
|
-- job was created
|
|
SELECT * FROM _timescaledb_config.bgw_job where hypertable_id=:mat_hypertable_id;
|
|
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone
|
|
------+--------------------------------------------+-------------------+-------------+-------------+--------------+------------------------+-------------------------------------+-------------------+-----------+----------------+---------------+---------------+-----------------------------------------------------------------+------------------------+-------------------------------------------+----------
|
|
1000 | Refresh Continuous Aggregate Policy [1000] | @ 12 hours | @ 0 | -1 | @ 12 hours | _timescaledb_functions | policy_refresh_continuous_aggregate | default_perm_user | t | f | | 2 | {"end_offset": 4, "start_offset": null, "mat_hypertable_id": 2} | _timescaledb_functions | policy_refresh_continuous_aggregate_check |
|
|
(1 row)
|
|
|
|
-- create 10 time buckets
|
|
INSERT INTO test_continuous_agg_table
|
|
SELECT i, i FROM
|
|
(SELECT generate_series(0, 10) as i) AS j;
|
|
-- no stats
|
|
SELECT job_id, next_start, last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
ORDER BY job_id;
|
|
job_id | next_start | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes
|
|
--------+------------+------------+------------------+------------+-----------------+----------------+---------------
|
|
(0 rows)
|
|
|
|
-- no data in view
|
|
SELECT * FROM test_continuous_agg_view ORDER BY 1;
|
|
time_bucket | value
|
|
-------------+-------
|
|
(0 rows)
|
|
|
|
-- run first time
|
|
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
|
|
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
|
|
------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM sorted_bgw_log;
|
|
msg_no | mock_time | application_name | msg
|
|
--------+-----------+--------------------------------------------+-------------------------------------------------------------------------------------------------------------------
|
|
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
|
|
1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
|
|
0 | 0 | Refresh Continuous Aggregate Policy [1000] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ -2147483648, 6 ]
|
|
1 | 0 | Refresh Continuous Aggregate Policy [1000] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_2"
|
|
2 | 0 | Refresh Continuous Aggregate Policy [1000] | inserted 3 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_2"
|
|
(5 rows)
|
|
|
|
SELECT * FROM _timescaledb_config.bgw_job where id=:job_id;
|
|
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone
|
|
------+--------------------------------------------+-------------------+-------------+-------------+--------------+------------------------+-------------------------------------+-------------------+-----------+----------------+---------------+---------------+-----------------------------------------------------------------+------------------------+-------------------------------------------+----------
|
|
1000 | Refresh Continuous Aggregate Policy [1000] | @ 12 hours | @ 0 | -1 | @ 12 hours | _timescaledb_functions | policy_refresh_continuous_aggregate | default_perm_user | t | f | | 2 | {"end_offset": 4, "start_offset": null, "mat_hypertable_id": 2} | _timescaledb_functions | policy_refresh_continuous_aggregate_check |
|
|
(1 row)
|
|
|
|
-- job ran once, successfully
|
|
SELECT job_id, next_start-last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
where job_id=:job_id;
|
|
job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes
|
|
--------+------------+------------------+------------+-----------------+----------------+---------------
|
|
1000 | @ 12 hours | t | 1 | 1 | 0 | 0
|
|
(1 row)
|
|
|
|
--clear log for next run of scheduler.
|
|
TRUNCATE public.bgw_log;
|
|
CREATE FUNCTION wait_for_timer_to_run(started_at INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS
|
|
$BODY$
|
|
DECLARE
|
|
num_runs INTEGER;
|
|
message TEXT;
|
|
BEGIN
|
|
select format('[TESTING] Wait until %%, started at %s', started_at) into message;
|
|
FOR i in 1..spins
|
|
LOOP
|
|
SELECT COUNT(*) from bgw_log where msg LIKE message INTO num_runs;
|
|
if (num_runs > 0) THEN
|
|
RETURN true;
|
|
ELSE
|
|
PERFORM pg_sleep(0.1);
|
|
END IF;
|
|
END LOOP;
|
|
RETURN false;
|
|
END
|
|
$BODY$;
|
|
--make sure there is 1 job to start with
|
|
SELECT test.wait_for_job_to_run(:job_id, 1);
|
|
wait_for_job_to_run
|
|
---------------------
|
|
t
|
|
(1 row)
|
|
|
|
SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_FOR_OTHER_TO_ADVANCE);
|
|
ts_bgw_params_mock_wait_returns_immediately
|
|
---------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
--start the scheduler on 0 time
|
|
SELECT ts_bgw_params_reset_time(0, true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT ts_bgw_db_scheduler_test_run(extract(epoch from interval '24 hour')::int * 1000, 0);
|
|
ts_bgw_db_scheduler_test_run
|
|
------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT wait_for_timer_to_run(0);
|
|
wait_for_timer_to_run
|
|
-----------------------
|
|
t
|
|
(1 row)
|
|
|
|
--advance to 12:00 so that it runs one more time; now we know the
|
|
--scheduler has loaded up the job with the old schedule_interval
|
|
SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT test.wait_for_job_to_run(:job_id, 2);
|
|
wait_for_job_to_run
|
|
---------------------
|
|
t
|
|
(1 row)
|
|
|
|
--advance clock 1us to make the scheduler realize the job is done
|
|
SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+1, true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
--alter the refresh interval and check if next_start is altered
|
|
SELECT alter_job(:job_id, schedule_interval => '1m', retry_period => '1m');
|
|
alter_job
|
|
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
|
(1000,"@ 1 min","@ 0",-1,"@ 1 min",t,"{""end_offset"": 4, ""start_offset"": null, ""mat_hypertable_id"": 2}","Sat Jan 01 04:01:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,)
|
|
(1 row)
|
|
|
|
SELECT job_id, next_start - last_finish as until_next, total_runs
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
WHERE job_id=:job_id;;
|
|
job_id | until_next | total_runs
|
|
--------+------------+------------
|
|
1000 | @ 1 min | 2
|
|
(1 row)
|
|
|
|
--advance to 12:02, job should have run at 12:01
|
|
SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+(extract(epoch from interval '2 minute')::bigint * 1000000), true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT test.wait_for_job_to_run(:job_id, 3);
|
|
wait_for_job_to_run
|
|
---------------------
|
|
t
|
|
(1 row)
|
|
|
|
--next run in 1 minute
|
|
SELECT job_id, next_start-last_finish as until_next, total_runs
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
WHERE job_id=:job_id;
|
|
job_id | until_next | total_runs
|
|
--------+------------+------------
|
|
1000 | @ 1 min | 3
|
|
(1 row)
|
|
|
|
--change next run to be after 30s instead
|
|
SELECT (next_start - '30s'::interval) AS "NEW_NEXT_START"
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
WHERE job_id=:job_id \gset
|
|
SELECT alter_job(:job_id, next_start => :'NEW_NEXT_START');
|
|
alter_job
|
|
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
|
(1000,"@ 1 min","@ 0",-1,"@ 1 min",t,"{""end_offset"": 4, ""start_offset"": null, ""mat_hypertable_id"": 2}","Sat Jan 01 04:02:30 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,)
|
|
(1 row)
|
|
|
|
SELECT ts_bgw_params_reset_time((extract(epoch from interval '12 hour')::bigint * 1000000)+(extract(epoch from interval '2 minute 30 seconds')::bigint * 1000000), true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT test.wait_for_job_to_run(:job_id, 4);
|
|
wait_for_job_to_run
|
|
---------------------
|
|
t
|
|
(1 row)
|
|
|
|
--advance clock to quit scheduler
|
|
SELECT ts_bgw_params_reset_time(extract(epoch from interval '25 hour')::bigint * 1000000, true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
select ts_bgw_db_scheduler_test_wait_for_scheduler_finish();
|
|
ts_bgw_db_scheduler_test_wait_for_scheduler_finish
|
|
----------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_ON_JOB);
|
|
ts_bgw_params_mock_wait_returns_immediately
|
|
---------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
TRUNCATE public.bgw_log;
|
|
-- data before 8
|
|
SELECT * FROM test_continuous_agg_view ORDER BY 1;
|
|
time_bucket | value
|
|
-------------+-------
|
|
0 | 1
|
|
2 | 5
|
|
4 | 9
|
|
(3 rows)
|
|
|
|
-- invalidations test by running job multiple times
|
|
SELECT ts_bgw_params_reset_time();
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
DROP MATERIALIZED VIEW test_continuous_agg_view;
|
|
NOTICE: drop cascades to table _timescaledb_internal._hyper_2_3_chunk
|
|
CREATE MATERIALIZED VIEW test_continuous_agg_view
|
|
WITH (timescaledb.continuous,
|
|
timescaledb.materialized_only=true)
|
|
AS SELECT time_bucket('2', time), SUM(data) as value
|
|
FROM test_continuous_agg_table
|
|
GROUP BY 1 WITH NO DATA;
|
|
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval);
|
|
add_continuous_aggregate_policy
|
|
---------------------------------
|
|
1001
|
|
(1 row)
|
|
|
|
SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset
|
|
SELECT id AS job_id FROM _timescaledb_config.bgw_job WHERE hypertable_id=:mat_hypertable_id \gset
|
|
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
|
|
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
|
|
------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM sorted_bgw_log;
|
|
msg_no | mock_time | application_name | msg
|
|
--------+-----------+--------------------------------------------+------------------------------------------------------------------------------------------------------------
|
|
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
|
|
1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
|
|
0 | 0 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ -90, 12 ]
|
|
1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
(5 rows)
|
|
|
|
-- job ran once, successfully
|
|
SELECT job_id, last_finish - next_start as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
where job_id=:job_id;
|
|
job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes
|
|
--------+----------------+------------------+------------+-----------------+----------------+---------------
|
|
1001 | @ 12 hours ago | t | 1 | 1 | 0 | 0
|
|
(1 row)
|
|
|
|
-- should have refreshed everything we have so far
|
|
SELECT * FROM test_continuous_agg_view ORDER BY 1;
|
|
time_bucket | value
|
|
-------------+-------
|
|
0 | 1
|
|
2 | 5
|
|
4 | 9
|
|
6 | 13
|
|
8 | 17
|
|
10 | 10
|
|
(6 rows)
|
|
|
|
-- invalidate some data
|
|
UPDATE test_continuous_agg_table
|
|
SET data = 11 WHERE time = 6;
|
|
--advance time by 12h so that job runs one more time
|
|
SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25);
|
|
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
|
|
------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM sorted_bgw_log;
|
|
msg_no | mock_time | application_name | msg
|
|
--------+-------------+--------------------------------------------+------------------------------------------------------------------------------------------------------------
|
|
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
|
|
1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
|
|
0 | 0 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ -90, 12 ]
|
|
1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker
|
|
1 | 43200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
|
|
0 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 6, 8 ]
|
|
1 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
2 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
(10 rows)
|
|
|
|
SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
where job_id=:job_id;
|
|
job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes
|
|
--------+------------+------------------+------------+-----------------+----------------+---------------
|
|
1001 | @ 12 hours | t | 2 | 2 | 0 | 0
|
|
(1 row)
|
|
|
|
-- should have updated data for time=6
|
|
SELECT * FROM test_continuous_agg_view ORDER BY 1;
|
|
time_bucket | value
|
|
-------------+-------
|
|
0 | 1
|
|
2 | 5
|
|
4 | 9
|
|
6 | 18
|
|
8 | 17
|
|
10 | 10
|
|
(6 rows)
|
|
|
|
\x on
|
|
--check the information views --
|
|
select view_name, view_owner, materialization_hypertable_schema, materialization_hypertable_name
|
|
from timescaledb_information.continuous_aggregates
|
|
where view_name::text like '%test_continuous_agg_view';
|
|
-[ RECORD 1 ]---------------------+---------------------------
|
|
view_name | test_continuous_agg_view
|
|
view_owner | default_perm_user
|
|
materialization_hypertable_schema | _timescaledb_internal
|
|
materialization_hypertable_name | _materialized_hypertable_3
|
|
|
|
select view_name, view_definition from timescaledb_information.continuous_aggregates
|
|
where view_name::text like '%test_continuous_agg_view';
|
|
-[ RECORD 1 ]---+-------------------------------------------------------------------------
|
|
view_name | test_continuous_agg_view
|
|
view_definition | SELECT time_bucket(2, test_continuous_agg_table."time") AS time_bucket,+
|
|
| sum(test_continuous_agg_table.data) AS value +
|
|
| FROM test_continuous_agg_table +
|
|
| GROUP BY (time_bucket(2, test_continuous_agg_table."time"));
|
|
|
|
select job_status, last_run_duration
|
|
from timescaledb_information.job_stats ps, timescaledb_information.continuous_aggregates cagg
|
|
where cagg.view_name::text like '%test_continuous_agg_view'
|
|
and cagg.materialization_hypertable_name = ps.hypertable_name;
|
|
-[ RECORD 1 ]-----+----------
|
|
job_status | Scheduled
|
|
last_run_duration |
|
|
|
|
\x off
|
|
-- test merged refresh (change data in two chunks)
|
|
UPDATE test_continuous_agg_table SET data = 11;
|
|
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
|
|
ALTER SYSTEM SET timescaledb.materializations_per_refresh_window = 0;
|
|
SELECT pg_reload_conf();
|
|
pg_reload_conf
|
|
----------------
|
|
t
|
|
(1 row)
|
|
|
|
SET ROLE :ROLE_DEFAULT_PERM_USER;
|
|
--advance time by 1day so that job runs one more time
|
|
SELECT ts_bgw_params_reset_time(extract(epoch from interval '1day')::bigint * 1000000, true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50, 50);
|
|
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
|
|
------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM sorted_bgw_log;
|
|
msg_no | mock_time | application_name | msg
|
|
--------+-------------+--------------------------------------------+------------------------------------------------------------------------------------------------------------
|
|
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
|
|
1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
|
|
0 | 0 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ -90, 12 ]
|
|
1 | 0 | Refresh Continuous Aggregate Policy [1001] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
2 | 0 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker
|
|
1 | 43200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
|
|
0 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view" in window [ 6, 8 ]
|
|
1 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | deleted 1 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
2 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
0 | 86400000000 | DB Scheduler | [TESTING] Registered new background worker
|
|
1 | 86400000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
|
|
0 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | continuous aggregate refresh (merged invalidation) on "test_continuous_agg_view" in window [ 0, 12 ]
|
|
1 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | deleted 6 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
2 | 86400000000 | Refresh Continuous Aggregate Policy [1001] | inserted 6 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_3"
|
|
(15 rows)
|
|
|
|
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
|
|
ALTER SYSTEM RESET timescaledb.materializations_per_refresh_window;
|
|
SELECT pg_reload_conf();
|
|
pg_reload_conf
|
|
----------------
|
|
t
|
|
(1 row)
|
|
|
|
SET ROLE :ROLE_DEFAULT_PERM_USER;
|
|
DROP MATERIALIZED VIEW test_continuous_agg_view;
|
|
NOTICE: drop cascades to table _timescaledb_internal._hyper_3_4_chunk
|
|
--create a view with a function that it has no permission to execute
|
|
CREATE MATERIALIZED VIEW test_continuous_agg_view
|
|
WITH (timescaledb.continuous,
|
|
timescaledb.materialized_only=true)
|
|
AS SELECT time_bucket('2', time), SUM(data) as value, get_constant_no_perms()
|
|
FROM test_continuous_agg_table
|
|
GROUP BY 1 WITH NO DATA;
|
|
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval);
|
|
add_continuous_aggregate_policy
|
|
---------------------------------
|
|
1002
|
|
(1 row)
|
|
|
|
SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset
|
|
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
|
|
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
|
|
------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- job fails
|
|
SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
where job_id=:job_id;
|
|
job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes
|
|
--------+------------------+------------+-----------------+----------------+---------------
|
|
1002 | f | 1 | 0 | 1 | 0
|
|
(1 row)
|
|
|
|
DROP MATERIALIZED VIEW test_continuous_agg_view;
|
|
--advance clock to quit scheduler
|
|
SELECT ts_bgw_params_reset_time(extract(epoch from interval '25 hour')::bigint * 1000000, true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
select ts_bgw_db_scheduler_test_wait_for_scheduler_finish();
|
|
ts_bgw_db_scheduler_test_wait_for_scheduler_finish
|
|
----------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT ts_bgw_params_mock_wait_returns_immediately(:WAIT_ON_JOB);
|
|
ts_bgw_params_mock_wait_returns_immediately
|
|
---------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
--clear log for next run of the scheduler
|
|
TRUNCATE public.bgw_log;
|
|
SELECT ts_bgw_params_reset_time();
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
--
|
|
-- Test creating continuous aggregate with a user that is the non-owner of the raw table
|
|
--
|
|
CREATE TABLE test_continuous_agg_table_w_grant(time int, data int);
|
|
SELECT create_hypertable('test_continuous_agg_table_w_grant', 'time', chunk_time_interval => 10);
|
|
NOTICE: adding not-null constraint to column "time"
|
|
create_hypertable
|
|
------------------------------------------------
|
|
(5,public,test_continuous_agg_table_w_grant,t)
|
|
(1 row)
|
|
|
|
CREATE OR REPLACE FUNCTION integer_now_test1() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM test_continuous_agg_table_w_grant $$;
|
|
SELECT set_integer_now_func('test_continuous_agg_table_w_grant', 'integer_now_test1');
|
|
set_integer_now_func
|
|
----------------------
|
|
|
|
(1 row)
|
|
|
|
GRANT SELECT, TRIGGER ON test_continuous_agg_table_w_grant TO public;
|
|
INSERT INTO test_continuous_agg_table_w_grant
|
|
SELECT 1 , 1;
|
|
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2
|
|
-- make sure view can be created
|
|
CREATE MATERIALIZED VIEW test_continuous_agg_view_user_2
|
|
WITH ( timescaledb.continuous,
|
|
timescaledb.materialized_only=true)
|
|
AS SELECT time_bucket('2', time), SUM(data) as value
|
|
FROM test_continuous_agg_table_w_grant
|
|
GROUP BY 1 WITH NO DATA;
|
|
SELECT add_continuous_aggregate_policy('test_continuous_agg_view_user_2', NULL, -2::integer, '12 h'::interval);
|
|
add_continuous_aggregate_policy
|
|
---------------------------------
|
|
1003
|
|
(1 row)
|
|
|
|
SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset
|
|
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
|
|
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
|
|
------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT id, owner FROM _timescaledb_config.bgw_job WHERE id = :job_id ;
|
|
id | owner
|
|
------+---------------------
|
|
1003 | default_perm_user_2
|
|
(1 row)
|
|
|
|
SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
where job_id=:job_id;
|
|
job_id | until_next | last_run_success | total_runs | total_successes | total_failures | total_crashes
|
|
--------+------------+------------------+------------+-----------------+----------------+---------------
|
|
1003 | @ 12 hours | t | 1 | 1 | 0 | 0
|
|
(1 row)
|
|
|
|
--view is populated
|
|
SELECT * FROM test_continuous_agg_view_user_2 ORDER BY 1;
|
|
time_bucket | value
|
|
-------------+-------
|
|
0 | 1
|
|
(1 row)
|
|
|
|
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
|
|
--revoke permissions from the continuous agg view owner to select from raw table
|
|
--no further updates to cont agg should happen
|
|
REVOKE SELECT ON test_continuous_agg_table_w_grant FROM public;
|
|
--add new data to table
|
|
INSERT INTO test_continuous_agg_table_w_grant VALUES(5,1);
|
|
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2
|
|
--advance time by 12h so that job tries to run one more time
|
|
SELECT ts_bgw_params_reset_time(extract(epoch from interval '12 hour')::bigint * 1000000, true);
|
|
ts_bgw_params_reset_time
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25);
|
|
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
|
|
------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
--should show a failing execution because no longer has permissions (due to lack of permission on partial view owner's part)
|
|
SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
|
FROM _timescaledb_internal.bgw_job_stat
|
|
where job_id=:job_id;
|
|
job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes
|
|
--------+------------------+------------+-----------------+----------------+---------------
|
|
1003 | f | 2 | 1 | 1 | 0
|
|
(1 row)
|
|
|
|
--view was NOT updated; but the old stuff is still there
|
|
SELECT * FROM test_continuous_agg_view_user_2;
|
|
time_bucket | value
|
|
-------------+-------
|
|
0 | 1
|
|
(1 row)
|
|
|
|
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
|
|
SELECT * from sorted_bgw_log;
|
|
msg_no | mock_time | application_name | msg
|
|
--------+-------------+--------------------------------------------+--------------------------------------------------------------------------------------------------------------------------
|
|
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
|
|
1 | 0 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
|
|
0 | 0 | Refresh Continuous Aggregate Policy [1003] | continuous aggregate refresh (individual invalidation) on "test_continuous_agg_view_user_2" in window [ -2147483648, 2 ]
|
|
1 | 0 | Refresh Continuous Aggregate Policy [1003] | deleted 0 row(s) from materialization table "_timescaledb_internal._materialized_hypertable_6"
|
|
2 | 0 | Refresh Continuous Aggregate Policy [1003] | inserted 1 row(s) into materialization table "_timescaledb_internal._materialized_hypertable_6"
|
|
0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker
|
|
1 | 43200000000 | DB Scheduler | [TESTING] Wait until (RANDOM), started at (RANDOM)
|
|
0 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | job 1003 threw an error
|
|
1 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | permission denied for table test_continuous_agg_table_w_grant
|
|
(9 rows)
|
|
|
|
-- Count the number of continuous aggregate policies
|
|
SELECT count(*) FROM _timescaledb_config.bgw_job
|
|
WHERE proc_schema = '_timescaledb_functions'
|
|
AND proc_name = 'policy_refresh_continuous_aggregate';
|
|
count
|
|
-------
|
|
1
|
|
(1 row)
|
|
|