From b6a974e7f3c7439c1ef423ea64431ce2a907e068 Mon Sep 17 00:00:00 2001 From: Konstantina Skovola Date: Mon, 23 May 2022 18:48:09 +0300 Subject: [PATCH] Add schedule_interval to policies Add a parameter `schedule_interval` to retention and compression policies to allow users to define the schedule interval. Fall back to previous default if no value is specified. Fixes #3806 --- sql/policy_api.sql | 9 +- sql/updates/latest-dev.sql | 3 + sql/updates/reverse-dev.sql | 7 ++ tsl/src/bgw_policy/compression_api.c | 14 ++- tsl/src/bgw_policy/retention_api.c | 11 ++- tsl/test/expected/bgw_policy.out | 98 +++++++++++++++++++ tsl/test/expected/bgw_reorder_drop_chunks.out | 90 +++++++++++++++++ tsl/test/shared/expected/extension.out | 4 +- tsl/test/sql/bgw_policy.sql | 33 +++++++ tsl/test/sql/bgw_reorder_drop_chunks.sql | 29 +++++- 10 files changed, 287 insertions(+), 11 deletions(-) diff --git a/sql/policy_api.sql b/sql/policy_api.sql index 5b22f9081..f5e54431d 100644 --- a/sql/policy_api.sql +++ b/sql/policy_api.sql @@ -13,10 +13,11 @@ CREATE OR REPLACE FUNCTION @extschema@.add_retention_policy( relation REGCLASS, drop_after "any", - if_not_exists BOOL = false + if_not_exists BOOL = false, + schedule_interval INTERVAL = NULL ) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add' -LANGUAGE C VOLATILE STRICT; +LANGUAGE C VOLATILE; CREATE OR REPLACE FUNCTION @extschema@.remove_retention_policy( relation REGCLASS, @@ -35,10 +36,10 @@ AS '@MODULE_PATHNAME@', 'ts_policy_reorder_remove' LANGUAGE C VOLATILE STRICT; /* compression policy */ -CREATE OR REPLACE FUNCTION @extschema@.add_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL = false) +CREATE OR REPLACE FUNCTION @extschema@.add_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL = false, schedule_interval INTERVAL = NULL) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_compression_add' -LANGUAGE C VOLATILE STRICT; +LANGUAGE C VOLATILE; -- not strict because we need to set different default values for schedule_interval CREATE OR REPLACE FUNCTION @extschema@.remove_compression_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS BOOL AS '@MODULE_PATHNAME@', 'ts_policy_compression_remove' diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index e69de29bb..60a5b4b3b 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -0,0 +1,3 @@ +DROP FUNCTION IF EXISTS @extschema@.add_retention_policy(REGCLASS, "any", BOOL); + +DROP FUNCTION IF EXISTS @extschema@.add_compression_policy(REGCLASS, "any", BOOL); diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index e69de29bb..5077a7621 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -0,0 +1,7 @@ +DROP FUNCTION IF EXISTS @extschema@.add_retention_policy(REGCLASS, "any", BOOL, INTERVAL); +CREATE FUNCTION @extschema@.add_retention_policy(relation REGCLASS, drop_after "any", if_not_exists BOOL = false) +RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add' LANGUAGE C VOLATILE STRICT; + +DROP FUNCTION IF EXISTS @extschema@.add_compression_policy(REGCLASS, "any", BOOL, INTERVAL); +CREATE FUNCTION @extschema@.add_compression_policy(hypertable REGCLASS, compress_after "any", if_not_exists BOOL = false) +RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_compression_add' LANGUAGE C VOLATILE STRICT; diff --git a/tsl/src/bgw_policy/compression_api.c b/tsl/src/bgw_policy/compression_api.c index d043d7177..615d5b9aa 100644 --- a/tsl/src/bgw_policy/compression_api.c +++ b/tsl/src/bgw_policy/compression_api.c @@ -172,14 +172,21 @@ validate_compress_after_type(Oid partitioning_type, Oid compress_after_type) Datum policy_compression_add(PG_FUNCTION_ARGS) { + /* The function is not STRICT but we can't allow required args to be NULL + * so we need to act like a strict function in those cases */ + if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2)) + PG_RETURN_NULL(); + NameData application_name; NameData proc_name, proc_schema, owner; int32 job_id; Oid user_rel_oid = PG_GETARG_OID(0); Datum compress_after_datum = PG_GETARG_DATUM(1); - Oid compress_after_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1); + Oid compress_after_type = get_fn_expr_argtype(fcinfo->flinfo, 1); bool if_not_exists = PG_GETARG_BOOL(2); - Interval *default_schedule_interval = DEFAULT_SCHEDULE_INTERVAL; + bool user_defined_schedule_interval = !(PG_ARGISNULL(3)); + Interval *default_schedule_interval = + PG_ARGISNULL(3) ? DEFAULT_SCHEDULE_INTERVAL : PG_GETARG_INTERVAL_P(3); Hypertable *hypertable; Cache *hcache; const Dimension *dim; @@ -241,7 +248,8 @@ policy_compression_add(PG_FUNCTION_ARGS) } } - if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim))) + if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim)) && + !user_defined_schedule_interval) { default_schedule_interval = DatumGetIntervalP( ts_internal_to_interval_value(dim->fd.interval_length / 2, INTERVALOID)); diff --git a/tsl/src/bgw_policy/retention_api.c b/tsl/src/bgw_policy/retention_api.c index 7d8a26b53..e0192b287 100644 --- a/tsl/src/bgw_policy/retention_api.c +++ b/tsl/src/bgw_policy/retention_api.c @@ -29,6 +29,10 @@ #define POLICY_RETENTION_PROC_NAME "policy_retention" #define CONFIG_KEY_HYPERTABLE_ID "hypertable_id" #define CONFIG_KEY_DROP_AFTER "drop_after" +#define DEFAULT_SCHEDULE_INTERVAL \ + { \ + .day = 1 \ + } Datum policy_retention_proc(PG_FUNCTION_ARGS) @@ -135,12 +139,18 @@ validate_drop_chunks_hypertable(Cache *hcache, Oid user_htoid) Datum policy_retention_add(PG_FUNCTION_ARGS) { + /* behave like a strict function */ + if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2)) + PG_RETURN_NULL(); + NameData application_name; int32 job_id; Oid ht_oid = PG_GETARG_OID(0); Datum window_datum = PG_GETARG_DATUM(1); bool if_not_exists = PG_GETARG_BOOL(2); Oid window_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1); + Interval default_schedule_interval = + PG_ARGISNULL(3) ? (Interval) DEFAULT_SCHEDULE_INTERVAL : *PG_GETARG_INTERVAL_P(3); Hypertable *hypertable; Cache *hcache; @@ -148,7 +158,6 @@ policy_retention_add(PG_FUNCTION_ARGS) Oid partitioning_type; const Dimension *dim; /* Default scheduled interval for drop_chunks jobs is currently 1 day (24 hours) */ - Interval default_schedule_interval = { .day = 1 }; /* Default max runtime should not be very long. Right now set to 5 minutes */ Interval default_max_runtime = { .time = 5 * USECS_PER_MINUTE }; /* Default retry period is currently 5 minutes */ diff --git a/tsl/test/expected/bgw_policy.out b/tsl/test/expected/bgw_policy.out index ef9e71901..68ad8e3c4 100644 --- a/tsl/test/expected/bgw_policy.out +++ b/tsl/test/expected/bgw_policy.out @@ -621,3 +621,101 @@ GROUP BY proc_name; policy_retention | 2 (2 rows) +-- test that the behavior is strict when providing NULL required arguments +create table test_strict (time timestamptz not null, a int, b int); +select create_hypertable('test_strict', 'time'); + create_hypertable +-------------------------- + (6,public,test_strict,t) +(1 row) + +-- test retention with null arguments +select add_retention_policy('test_strict', drop_after => NULL); + add_retention_policy +---------------------- + +(1 row) + +select add_retention_policy(NULL, NULL); + add_retention_policy +---------------------- + +(1 row) + +select add_retention_policy(NULL, drop_after => interval '2 days'); + add_retention_policy +---------------------- + +(1 row) + +-- this is an optional argument +select add_retention_policy('test_strict', drop_after => interval '2 days', if_not_exists => NULL); + add_retention_policy +---------------------- + +(1 row) + +select add_retention_policy('test_strict', interval '2 days', schedule_interval => NULL); + add_retention_policy +---------------------- + 1006 +(1 row) + +-- test compression with null arguments +alter table test_strict set (timescaledb.compress); +select add_compression_policy('test_strict', compress_after => NULL); + add_compression_policy +------------------------ + +(1 row) + +select add_compression_policy(NULL, compress_after => NULL); + add_compression_policy +------------------------ + +(1 row) + +select add_compression_policy('test_strict', INTERVAL '2 weeks', if_not_exists => NULL); + add_compression_policy +------------------------ + +(1 row) + +select add_compression_policy('test_strict', INTERVAL '2 weeks', schedule_interval => NULL); + add_compression_policy +------------------------ + 1007 +(1 row) + +-- test that we get the default schedule_interval if nothing is specified +create table test_missing_schedint (time timestamptz not null, a int, b int); +select create_hypertable('test_missing_schedint', 'time', chunk_time_interval=> '31days'::interval); + create_hypertable +------------------------------------ + (8,public,test_missing_schedint,t) +(1 row) + +-- we expect shedule_interval to be 1 day +select add_retention_policy('test_missing_schedint', interval '2 weeks') as retenion_id_missing_schedint \gset +-- we expect schedule_interval to be chunk_time_interval/2 for timestamptz time +alter table test_missing_schedint set (timescaledb.compress); +select add_compression_policy('test_missing_schedint', interval '60 days') as compression_id_missing_schedint \gset +-- we expect schedule_interval to be 1 day for int time +create table test_missing_schedint_integer (time int not null, a int, b int); +-- 10 days interval +select create_hypertable('test_missing_schedint_integer', 'time', chunk_time_interval => 864000000); + create_hypertable +--------------------------------------------- + (10,public,test_missing_schedint_integer,t) +(1 row) + +alter table test_missing_schedint_integer set (timescaledb.compress); +select add_compression_policy('test_missing_schedint_integer', BIGINT '600000') as compression_id_integer \gset +select * from _timescaledb_config.bgw_job where id in (:retenion_id_missing_schedint, :compression_id_missing_schedint, :compression_id_integer); + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config +------+---------------------------+--------------------+-------------+-------------+--------------+-----------------------+--------------------+---------------------+-----------+---------------+----------------------------------------------------- + 1008 | Retention Policy [1008] | @ 1 day | @ 5 mins | -1 | @ 5 mins | _timescaledb_internal | policy_retention | default_perm_user_2 | t | 8 | {"drop_after": "@ 14 days", "hypertable_id": 8} + 1009 | Compression Policy [1009] | @ 15 days 12 hours | @ 0 | -1 | @ 1 hour | _timescaledb_internal | policy_compression | default_perm_user_2 | t | 8 | {"hypertable_id": 8, "compress_after": "@ 60 days"} + 1010 | Compression Policy [1010] | @ 1 day | @ 0 | -1 | @ 1 hour | _timescaledb_internal | policy_compression | default_perm_user_2 | t | 10 | {"hypertable_id": 10, "compress_after": 600000} +(3 rows) + diff --git a/tsl/test/expected/bgw_reorder_drop_chunks.out b/tsl/test/expected/bgw_reorder_drop_chunks.out index 4caa56105..33f10591e 100644 --- a/tsl/test/expected/bgw_reorder_drop_chunks.out +++ b/tsl/test/expected/bgw_reorder_drop_chunks.out @@ -659,3 +659,93 @@ SELECT * FROM sorted_bgw_log; 0 | 1000000 | DB Scheduler | [TESTING] Wait until 2000000, started at 1000000 (6 rows) +-- test the schedule_interval parameter for policies +CREATE TABLE test_schedint(time timestamptz, a int, b int); +select create_hypertable('test_schedint', 'time'); +NOTICE: adding not-null constraint to column "time" + create_hypertable +---------------------------- + (5,public,test_schedint,t) +(1 row) + +insert into test_schedint values (now(), 1, 2), (now() + interval '2 seconds', 2, 3); +-- test the retention policy +select add_retention_policy('test_schedint', interval '2 months', schedule_interval => '30 seconds') as polret_schedint \gset +-- wait for a bit more than "schedule_interval" seconds, then verify the policy has run twice +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(1000); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polret_schedint; + total_runs | total_successes | total_failures +------------+-----------------+---------------- + 1 | 1 | 0 +(1 row) + +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(30000); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polret_schedint; + total_runs | total_successes | total_failures +------------+-----------------+---------------- + 2 | 2 | 0 +(1 row) + +-- if we wait another 30s, we should see 3 runs of the job +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(30000); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polret_schedint; + total_runs | total_successes | total_failures +------------+-----------------+---------------- + 3 | 3 | 0 +(1 row) + +-- test the compression policy +alter table test_schedint set (timescaledb.compress); +select add_compression_policy('test_schedint', interval '3 weeks', schedule_interval => '40 seconds') as polcomp_schedint \gset +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(1000); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polcomp_schedint; + total_runs | total_successes | total_failures +------------+-----------------+---------------- + 1 | 1 | 0 +(1 row) + +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(40000); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polcomp_schedint; + total_runs | total_successes | total_failures +------------+-----------------+---------------- + 2 | 2 | 0 +(1 row) + +-- if we wait another 40s, we should see 3 runs of the job +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(40000); + ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish +------------------------------------------------------------ + +(1 row) + +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polcomp_schedint; + total_runs | total_successes | total_failures +------------+-----------------+---------------- + 3 | 3 | 0 +(1 row) + diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index e50545e08..9c760f598 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -126,13 +126,13 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text _timescaledb_internal.tsl_loaded() _timescaledb_internal.validate_as_data_node() _timescaledb_internal.wait_subscription_sync(name,name,integer,numeric) - add_compression_policy(regclass,"any",boolean) + add_compression_policy(regclass,"any",boolean,interval) add_continuous_aggregate_policy(regclass,"any","any",interval,boolean) add_data_node(name,text,name,integer,boolean,boolean,text) add_dimension(regclass,name,integer,anyelement,regproc,boolean) add_job(regproc,interval,jsonb,timestamp with time zone,boolean) add_reorder_policy(regclass,name,boolean) - add_retention_policy(regclass,"any",boolean) + add_retention_policy(regclass,"any",boolean,interval) alter_job(integer,interval,interval,integer,interval,boolean,jsonb,timestamp with time zone,boolean) approximate_row_count(regclass) attach_data_node(name,regclass,boolean,boolean) diff --git a/tsl/test/sql/bgw_policy.sql b/tsl/test/sql/bgw_policy.sql index ea0e6713e..fca1a2e3a 100644 --- a/tsl/test/sql/bgw_policy.sql +++ b/tsl/test/sql/bgw_policy.sql @@ -330,3 +330,36 @@ WHERE proc_name NOT LIKE '%telemetry%' GROUP BY proc_name; +-- test that the behavior is strict when providing NULL required arguments +create table test_strict (time timestamptz not null, a int, b int); +select create_hypertable('test_strict', 'time'); +-- test retention with null arguments +select add_retention_policy('test_strict', drop_after => NULL); +select add_retention_policy(NULL, NULL); +select add_retention_policy(NULL, drop_after => interval '2 days'); +-- this is an optional argument +select add_retention_policy('test_strict', drop_after => interval '2 days', if_not_exists => NULL); +select add_retention_policy('test_strict', interval '2 days', schedule_interval => NULL); +-- test compression with null arguments +alter table test_strict set (timescaledb.compress); +select add_compression_policy('test_strict', compress_after => NULL); +select add_compression_policy(NULL, compress_after => NULL); +select add_compression_policy('test_strict', INTERVAL '2 weeks', if_not_exists => NULL); +select add_compression_policy('test_strict', INTERVAL '2 weeks', schedule_interval => NULL); + +-- test that we get the default schedule_interval if nothing is specified +create table test_missing_schedint (time timestamptz not null, a int, b int); +select create_hypertable('test_missing_schedint', 'time', chunk_time_interval=> '31days'::interval); +-- we expect shedule_interval to be 1 day +select add_retention_policy('test_missing_schedint', interval '2 weeks') as retenion_id_missing_schedint \gset +-- we expect schedule_interval to be chunk_time_interval/2 for timestamptz time +alter table test_missing_schedint set (timescaledb.compress); +select add_compression_policy('test_missing_schedint', interval '60 days') as compression_id_missing_schedint \gset +-- we expect schedule_interval to be 1 day for int time +create table test_missing_schedint_integer (time int not null, a int, b int); +-- 10 days interval +select create_hypertable('test_missing_schedint_integer', 'time', chunk_time_interval => 864000000); +alter table test_missing_schedint_integer set (timescaledb.compress); +select add_compression_policy('test_missing_schedint_integer', BIGINT '600000') as compression_id_integer \gset + +select * from _timescaledb_config.bgw_job where id in (:retenion_id_missing_schedint, :compression_id_missing_schedint, :compression_id_integer); diff --git a/tsl/test/sql/bgw_reorder_drop_chunks.sql b/tsl/test/sql/bgw_reorder_drop_chunks.sql index dc5efdbb8..0f8be3c2e 100644 --- a/tsl/test/sql/bgw_reorder_drop_chunks.sql +++ b/tsl/test/sql/bgw_reorder_drop_chunks.sql @@ -336,4 +336,31 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(1000); CALL run_job(:drop_chunks_tsntz_job_id); SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(1000); -SELECT * FROM sorted_bgw_log; \ No newline at end of file +SELECT * FROM sorted_bgw_log; + +-- test the schedule_interval parameter for policies +CREATE TABLE test_schedint(time timestamptz, a int, b int); +select create_hypertable('test_schedint', 'time'); +insert into test_schedint values (now(), 1, 2), (now() + interval '2 seconds', 2, 3); + +-- test the retention policy +select add_retention_policy('test_schedint', interval '2 months', schedule_interval => '30 seconds') as polret_schedint \gset +-- wait for a bit more than "schedule_interval" seconds, then verify the policy has run twice +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(1000); +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polret_schedint; +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(30000); +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polret_schedint; +-- if we wait another 30s, we should see 3 runs of the job +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(30000); +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polret_schedint; + +-- test the compression policy +alter table test_schedint set (timescaledb.compress); +select add_compression_policy('test_schedint', interval '3 weeks', schedule_interval => '40 seconds') as polcomp_schedint \gset +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(1000); +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polcomp_schedint; +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(40000); +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polcomp_schedint; +-- if we wait another 40s, we should see 3 runs of the job +select ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(40000); +select total_runs, total_successes, total_failures from timescaledb_information.job_stats where job_id = :polcomp_schedint; \ No newline at end of file