mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-21 13:14:19 +08:00
Use creation time in retention/compression policy
The retention and compression policies can now use drop_created_before and compress_created_before arguments respectively to specify chunk selection using their creation times. We don't support creation times for CAggs, yet.
This commit is contained in:
parent
e5840e3d45
commit
44817252b5
.github/workflows
.unreleased
sql
tsl
src
bgw_policy
compression_api.ccompression_api.hjob.cjob.hpolicies_v2.cpolicies_v2.hpolicy_utils.cretention_api.cretention_api.h
chunk.cchunk.htest
4
.github/workflows/pgspot.yaml
vendored
4
.github/workflows/pgspot.yaml
vendored
@ -21,6 +21,10 @@ jobs:
|
||||
--proc-without-search-path 'extschema.recompress_chunk(chunk regclass,if_not_compressed boolean)'
|
||||
--proc-without-search-path '_timescaledb_internal.policy_compression(job_id integer,config jsonb)'
|
||||
--proc-without-search-path '_timescaledb_functions.policy_compression(job_id integer,config jsonb)'
|
||||
--proc-without-search-path
|
||||
'_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)'
|
||||
--proc-without-search-path
|
||||
'_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)'
|
||||
--proc-without-search-path
|
||||
'_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean)'
|
||||
--proc-without-search-path
|
||||
|
1
.unreleased/feature_6227
Normal file
1
.unreleased/feature_6227
Normal file
@ -0,0 +1 @@
|
||||
Implements: #6227 Use creation time in retention/compression policy
|
@ -1015,12 +1015,12 @@ END$$
|
||||
SET search_path TO pg_catalog,pg_temp;
|
||||
|
||||
|
||||
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean) LANGUAGE PLPGSQL AS $$
|
||||
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean) LANGUAGE PLPGSQL AS $$
|
||||
BEGIN
|
||||
IF current_setting('timescaledb.enable_deprecation_warnings', true)::bool THEN
|
||||
RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.';
|
||||
RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.';
|
||||
END IF;
|
||||
CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6);
|
||||
CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6,$7);
|
||||
END$$
|
||||
SET search_path TO pg_catalog,pg_temp;
|
||||
|
||||
|
@ -12,11 +12,12 @@
|
||||
-- might be kept, but data within the window will never be deleted.
|
||||
CREATE OR REPLACE FUNCTION @extschema@.add_retention_policy(
|
||||
relation REGCLASS,
|
||||
drop_after "any",
|
||||
drop_after "any" = NULL,
|
||||
if_not_exists BOOL = false,
|
||||
schedule_interval INTERVAL = NULL,
|
||||
initial_start TIMESTAMPTZ = NULL,
|
||||
timezone TEXT = NULL
|
||||
timezone TEXT = NULL,
|
||||
drop_created_before INTERVAL = NULL
|
||||
)
|
||||
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
|
||||
LANGUAGE C VOLATILE;
|
||||
@ -45,11 +46,13 @@ LANGUAGE C VOLATILE STRICT;
|
||||
|
||||
/* compression policy */
|
||||
CREATE OR REPLACE FUNCTION @extschema@.add_compression_policy(
|
||||
hypertable REGCLASS, compress_after "any",
|
||||
hypertable REGCLASS,
|
||||
compress_after "any" = NULL,
|
||||
if_not_exists BOOL = false,
|
||||
schedule_interval INTERVAL = NULL,
|
||||
initial_start TIMESTAMPTZ = NULL,
|
||||
timezone TEXT = NULL
|
||||
timezone TEXT = NULL,
|
||||
compress_created_before INTERVAL = NULL
|
||||
)
|
||||
RETURNS INTEGER
|
||||
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
|
||||
@ -83,6 +86,7 @@ LANGUAGE C VOLATILE;
|
||||
/* 1 step policies */
|
||||
|
||||
/* Add policies */
|
||||
/* Unsupported drop_created_before/compress_created_before in add/alter for caggs */
|
||||
CREATE OR REPLACE FUNCTION timescaledb_experimental.add_policies(
|
||||
relation REGCLASS,
|
||||
if_not_exists BOOL = false,
|
||||
@ -128,4 +132,4 @@ CREATE OR REPLACE FUNCTION timescaledb_experimental.show_policies(
|
||||
relation REGCLASS)
|
||||
RETURNS SETOF JSONB
|
||||
AS '@MODULE_PATHNAME@', 'ts_policies_show'
|
||||
LANGUAGE C VOLATILE;
|
||||
LANGUAGE C VOLATILE;
|
||||
|
@ -41,7 +41,8 @@ _timescaledb_functions.policy_compression_execute(
|
||||
lag ANYELEMENT,
|
||||
maxchunks INTEGER,
|
||||
verbose_log BOOLEAN,
|
||||
recompress_enabled BOOLEAN)
|
||||
recompress_enabled BOOLEAN,
|
||||
use_creation_time BOOLEAN)
|
||||
AS $$
|
||||
DECLARE
|
||||
htoid REGCLASS;
|
||||
@ -54,6 +55,7 @@ DECLARE
|
||||
bit_compressed_unordered int := 2;
|
||||
bit_frozen int := 4;
|
||||
bit_compressed_partial int := 8;
|
||||
creation_lag INTERVAL := NULL;
|
||||
BEGIN
|
||||
|
||||
-- procedures with SET clause cannot execute transaction
|
||||
@ -67,14 +69,26 @@ BEGIN
|
||||
-- for the integer cases, we have to compute the lag w.r.t
|
||||
-- the integer_now function and then pass on to show_chunks
|
||||
IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
|
||||
-- cannot have use_creation_time set with this
|
||||
IF use_creation_time IS TRUE THEN
|
||||
RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id;
|
||||
END IF;
|
||||
lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
|
||||
END IF;
|
||||
|
||||
-- if use_creation_time has been specified then the lag needs to be used with the
|
||||
-- "compress_created_before" argument. Otherwise the usual "older_than" argument
|
||||
-- is good enough
|
||||
IF use_creation_time IS TRUE THEN
|
||||
creation_lag := lag;
|
||||
lag := NULL;
|
||||
END IF;
|
||||
|
||||
FOR chunk_rec IN
|
||||
SELECT
|
||||
show.oid, ch.schema_name, ch.table_name, ch.status
|
||||
FROM
|
||||
@extschema@.show_chunks(htoid, older_than => lag) AS show(oid)
|
||||
@extschema@.show_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid)
|
||||
INNER JOIN pg_class pgc ON pgc.oid = show.oid
|
||||
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
|
||||
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
|
||||
@ -157,7 +171,9 @@ DECLARE
|
||||
dimtype REGTYPE;
|
||||
dimtypeinput REGPROC;
|
||||
compress_after TEXT;
|
||||
compress_created_before TEXT;
|
||||
lag_value TEXT;
|
||||
lag_bigint_value BIGINT;
|
||||
htid INTEGER;
|
||||
htoid REGCLASS;
|
||||
chunk_rec RECORD;
|
||||
@ -165,6 +181,7 @@ DECLARE
|
||||
maxchunks INTEGER := 0;
|
||||
numchunks INTEGER := 1;
|
||||
recompress_enabled BOOL;
|
||||
use_creation_time BOOL := FALSE;
|
||||
BEGIN
|
||||
|
||||
-- procedures with SET clause cannot execute transaction
|
||||
@ -183,11 +200,6 @@ BEGIN
|
||||
verbose_log := COALESCE(jsonb_object_field_text(config, 'verbose_log')::BOOLEAN, FALSE);
|
||||
maxchunks := COALESCE(jsonb_object_field_text(config, 'maxchunks_to_compress')::INTEGER, 0);
|
||||
recompress_enabled := COALESCE(jsonb_object_field_text(config, 'recompress')::BOOLEAN, TRUE);
|
||||
compress_after := jsonb_object_field_text(config, 'compress_after');
|
||||
|
||||
IF compress_after IS NULL THEN
|
||||
RAISE EXCEPTION 'job % config must have compress_after', job_id;
|
||||
END IF;
|
||||
|
||||
-- find primary dimension type --
|
||||
SELECT dim.column_type INTO dimtype
|
||||
@ -197,29 +209,40 @@ BEGIN
|
||||
ORDER BY dim.id
|
||||
LIMIT 1;
|
||||
|
||||
lag_value := jsonb_object_field_text(config, 'compress_after');
|
||||
compress_after := jsonb_object_field_text(config, 'compress_after');
|
||||
IF compress_after IS NULL THEN
|
||||
compress_created_before := jsonb_object_field_text(config, 'compress_created_before');
|
||||
IF compress_created_before IS NULL THEN
|
||||
RAISE EXCEPTION 'job % config must have compress_after or compress_created_before', job_id;
|
||||
END IF;
|
||||
lag_value := compress_created_before;
|
||||
use_creation_time := true;
|
||||
dimtype := 'INTERVAL' ::regtype;
|
||||
ELSE
|
||||
lag_value := compress_after;
|
||||
END IF;
|
||||
|
||||
-- execute the properly type casts for the lag value
|
||||
CASE dimtype
|
||||
WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype THEN
|
||||
WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype, 'INTERVAL' ::regtype THEN
|
||||
CALL _timescaledb_functions.policy_compression_execute(
|
||||
job_id, htid, lag_value::INTERVAL,
|
||||
maxchunks, verbose_log, recompress_enabled
|
||||
maxchunks, verbose_log, recompress_enabled, use_creation_time
|
||||
);
|
||||
WHEN 'BIGINT'::regtype THEN
|
||||
CALL _timescaledb_functions.policy_compression_execute(
|
||||
job_id, htid, lag_value::BIGINT,
|
||||
maxchunks, verbose_log, recompress_enabled
|
||||
maxchunks, verbose_log, recompress_enabled, use_creation_time
|
||||
);
|
||||
WHEN 'INTEGER'::regtype THEN
|
||||
CALL _timescaledb_functions.policy_compression_execute(
|
||||
job_id, htid, lag_value::INTEGER,
|
||||
maxchunks, verbose_log, recompress_enabled
|
||||
maxchunks, verbose_log, recompress_enabled, use_creation_time
|
||||
);
|
||||
WHEN 'SMALLINT'::regtype THEN
|
||||
CALL _timescaledb_functions.policy_compression_execute(
|
||||
job_id, htid, lag_value::SMALLINT,
|
||||
maxchunks, verbose_log, recompress_enabled
|
||||
maxchunks, verbose_log, recompress_enabled, use_creation_time
|
||||
);
|
||||
END CASE;
|
||||
END;
|
||||
|
@ -250,3 +250,156 @@ CREATE FUNCTION @extschema@.show_chunks(
|
||||
created_after "any" = NULL
|
||||
) RETURNS SETOF REGCLASS AS '@MODULE_PATHNAME@', 'ts_chunk_show_chunks'
|
||||
LANGUAGE C STABLE PARALLEL SAFE;
|
||||
|
||||
DROP FUNCTION @extschema@.add_retention_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT);
|
||||
CREATE FUNCTION @extschema@.add_retention_policy(
|
||||
relation REGCLASS,
|
||||
drop_after "any" = NULL,
|
||||
if_not_exists BOOL = false,
|
||||
schedule_interval INTERVAL = NULL,
|
||||
initial_start TIMESTAMPTZ = NULL,
|
||||
timezone TEXT = NULL,
|
||||
drop_created_before INTERVAL = NULL
|
||||
)
|
||||
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
|
||||
LANGUAGE C VOLATILE;
|
||||
|
||||
DROP FUNCTION @extschema@.add_compression_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT);
|
||||
CREATE FUNCTION @extschema@.add_compression_policy(
|
||||
hypertable REGCLASS,
|
||||
compress_after "any" = NULL,
|
||||
if_not_exists BOOL = false,
|
||||
schedule_interval INTERVAL = NULL,
|
||||
initial_start TIMESTAMPTZ = NULL,
|
||||
timezone TEXT = NULL,
|
||||
compress_created_before INTERVAL = NULL
|
||||
)
|
||||
RETURNS INTEGER
|
||||
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
|
||||
LANGUAGE C VOLATILE;
|
||||
|
||||
DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN);
|
||||
DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN);
|
||||
CREATE PROCEDURE
|
||||
_timescaledb_functions.policy_compression_execute(
|
||||
job_id INTEGER,
|
||||
htid INTEGER,
|
||||
lag ANYELEMENT,
|
||||
maxchunks INTEGER,
|
||||
verbose_log BOOLEAN,
|
||||
recompress_enabled BOOLEAN,
|
||||
use_creation_time BOOLEAN)
|
||||
AS $$
|
||||
DECLARE
|
||||
htoid REGCLASS;
|
||||
chunk_rec RECORD;
|
||||
numchunks INTEGER := 1;
|
||||
_message text;
|
||||
_detail text;
|
||||
-- chunk status bits:
|
||||
bit_compressed int := 1;
|
||||
bit_compressed_unordered int := 2;
|
||||
bit_frozen int := 4;
|
||||
bit_compressed_partial int := 8;
|
||||
creation_lag INTERVAL := NULL;
|
||||
BEGIN
|
||||
|
||||
-- procedures with SET clause cannot execute transaction
|
||||
-- control so we adjust search_path in procedure body
|
||||
SET LOCAL search_path TO pg_catalog, pg_temp;
|
||||
|
||||
SELECT format('%I.%I', schema_name, table_name) INTO htoid
|
||||
FROM _timescaledb_catalog.hypertable
|
||||
WHERE id = htid;
|
||||
|
||||
-- for the integer cases, we have to compute the lag w.r.t
|
||||
-- the integer_now function and then pass on to show_chunks
|
||||
IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
|
||||
-- cannot have use_creation_time set with this
|
||||
IF use_creation_time IS TRUE THEN
|
||||
RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id;
|
||||
END IF;
|
||||
lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
|
||||
END IF;
|
||||
|
||||
-- if use_creation_time has been specified then the lag needs to be used with the
|
||||
-- "compress_created_before" argument. Otherwise the usual "older_than" argument
|
||||
-- is good enough
|
||||
IF use_creation_time IS TRUE THEN
|
||||
creation_lag := lag;
|
||||
lag := NULL;
|
||||
END IF;
|
||||
|
||||
FOR chunk_rec IN
|
||||
SELECT
|
||||
show.oid, ch.schema_name, ch.table_name, ch.status
|
||||
FROM
|
||||
@extschema@.show_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid)
|
||||
INNER JOIN pg_class pgc ON pgc.oid = show.oid
|
||||
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
|
||||
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
|
||||
WHERE
|
||||
ch.dropped IS FALSE
|
||||
AND (
|
||||
ch.status = 0 OR
|
||||
(
|
||||
ch.status & bit_compressed > 0 AND (
|
||||
ch.status & bit_compressed_unordered > 0 OR
|
||||
ch.status & bit_compressed_partial > 0
|
||||
)
|
||||
)
|
||||
)
|
||||
LOOP
|
||||
IF chunk_rec.status = 0 THEN
|
||||
BEGIN
|
||||
PERFORM @extschema@.compress_chunk( chunk_rec.oid );
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS
|
||||
_message = MESSAGE_TEXT,
|
||||
_detail = PG_EXCEPTION_DETAIL;
|
||||
RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
|
||||
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
|
||||
ERRCODE = sqlstate;
|
||||
END;
|
||||
ELSIF
|
||||
(
|
||||
chunk_rec.status & bit_compressed > 0 AND (
|
||||
chunk_rec.status & bit_compressed_unordered > 0 OR
|
||||
chunk_rec.status & bit_compressed_partial > 0
|
||||
)
|
||||
) AND recompress_enabled IS TRUE THEN
|
||||
BEGIN
|
||||
PERFORM @extschema@.decompress_chunk(chunk_rec.oid, if_compressed => true);
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
RAISE WARNING 'decompressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
|
||||
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
|
||||
ERRCODE = sqlstate;
|
||||
END;
|
||||
-- SET LOCAL is only active until end of transaction.
|
||||
-- While we could use SET at the start of the function we do not
|
||||
-- want to bleed out search_path to caller, so we do SET LOCAL
|
||||
-- again after COMMIT
|
||||
BEGIN
|
||||
PERFORM @extschema@.compress_chunk(chunk_rec.oid);
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
|
||||
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
|
||||
ERRCODE = sqlstate;
|
||||
END;
|
||||
END IF;
|
||||
COMMIT;
|
||||
-- SET LOCAL is only active until end of transaction.
|
||||
-- While we could use SET at the start of the function we do not
|
||||
-- want to bleed out search_path to caller, so we do SET LOCAL
|
||||
-- again after COMMIT
|
||||
SET LOCAL search_path TO pg_catalog, pg_temp;
|
||||
IF verbose_log THEN
|
||||
RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
|
||||
END IF;
|
||||
numchunks := numchunks + 1;
|
||||
IF maxchunks > 0 AND numchunks >= maxchunks THEN
|
||||
EXIT;
|
||||
END IF;
|
||||
END LOOP;
|
||||
END;
|
||||
$$ LANGUAGE PLPGSQL;
|
||||
|
@ -197,3 +197,139 @@ DROP FUNCTION IF EXISTS _timescaledb_functions.makeaclitem(regrole, regrole, tex
|
||||
|
||||
DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_validate_query(TEXT);
|
||||
|
||||
DROP FUNCTION @extschema@.add_retention_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT, INTERVAL);
|
||||
CREATE FUNCTION @extschema@.add_retention_policy(
|
||||
relation REGCLASS,
|
||||
drop_after "any",
|
||||
if_not_exists BOOL = false,
|
||||
schedule_interval INTERVAL = NULL,
|
||||
initial_start TIMESTAMPTZ = NULL,
|
||||
timezone TEXT = NULL
|
||||
)
|
||||
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
|
||||
LANGUAGE C VOLATILE;
|
||||
|
||||
DROP FUNCTION @extschema@.add_compression_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT, INTERVAL);
|
||||
CREATE FUNCTION @extschema@.add_compression_policy(
|
||||
hypertable REGCLASS,
|
||||
compress_after "any",
|
||||
if_not_exists BOOL = false,
|
||||
schedule_interval INTERVAL = NULL,
|
||||
initial_start TIMESTAMPTZ = NULL,
|
||||
timezone TEXT = NULL
|
||||
)
|
||||
RETURNS INTEGER
|
||||
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
|
||||
LANGUAGE C VOLATILE;
|
||||
|
||||
DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN);
|
||||
DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN);
|
||||
CREATE PROCEDURE
|
||||
_timescaledb_functions.policy_compression_execute(
|
||||
job_id INTEGER,
|
||||
htid INTEGER,
|
||||
lag ANYELEMENT,
|
||||
maxchunks INTEGER,
|
||||
verbose_log BOOLEAN,
|
||||
recompress_enabled BOOLEAN)
|
||||
AS $$
|
||||
DECLARE
|
||||
htoid REGCLASS;
|
||||
chunk_rec RECORD;
|
||||
numchunks INTEGER := 1;
|
||||
_message text;
|
||||
_detail text;
|
||||
-- chunk status bits:
|
||||
bit_compressed int := 1;
|
||||
bit_compressed_unordered int := 2;
|
||||
bit_frozen int := 4;
|
||||
bit_compressed_partial int := 8;
|
||||
BEGIN
|
||||
|
||||
-- procedures with SET clause cannot execute transaction
|
||||
-- control so we adjust search_path in procedure body
|
||||
SET LOCAL search_path TO pg_catalog, pg_temp;
|
||||
|
||||
SELECT format('%I.%I', schema_name, table_name) INTO htoid
|
||||
FROM _timescaledb_catalog.hypertable
|
||||
WHERE id = htid;
|
||||
|
||||
-- for the integer cases, we have to compute the lag w.r.t
|
||||
-- the integer_now function and then pass on to show_chunks
|
||||
IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
|
||||
lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
|
||||
END IF;
|
||||
|
||||
FOR chunk_rec IN
|
||||
SELECT
|
||||
show.oid, ch.schema_name, ch.table_name, ch.status
|
||||
FROM
|
||||
@extschema@.show_chunks(htoid, older_than => lag) AS show(oid)
|
||||
INNER JOIN pg_class pgc ON pgc.oid = show.oid
|
||||
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
|
||||
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
|
||||
WHERE
|
||||
ch.dropped IS FALSE
|
||||
AND (
|
||||
ch.status = 0 OR
|
||||
(
|
||||
ch.status & bit_compressed > 0 AND (
|
||||
ch.status & bit_compressed_unordered > 0 OR
|
||||
ch.status & bit_compressed_partial > 0
|
||||
)
|
||||
)
|
||||
)
|
||||
LOOP
|
||||
IF chunk_rec.status = 0 THEN
|
||||
BEGIN
|
||||
PERFORM @extschema@.compress_chunk( chunk_rec.oid );
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS
|
||||
_message = MESSAGE_TEXT,
|
||||
_detail = PG_EXCEPTION_DETAIL;
|
||||
RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
|
||||
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
|
||||
ERRCODE = sqlstate;
|
||||
END;
|
||||
ELSIF
|
||||
(
|
||||
chunk_rec.status & bit_compressed > 0 AND (
|
||||
chunk_rec.status & bit_compressed_unordered > 0 OR
|
||||
chunk_rec.status & bit_compressed_partial > 0
|
||||
)
|
||||
) AND recompress_enabled IS TRUE THEN
|
||||
BEGIN
|
||||
PERFORM @extschema@.decompress_chunk(chunk_rec.oid, if_compressed => true);
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
RAISE WARNING 'decompressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
|
||||
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
|
||||
ERRCODE = sqlstate;
|
||||
END;
|
||||
-- SET LOCAL is only active until end of transaction.
|
||||
-- While we could use SET at the start of the function we do not
|
||||
-- want to bleed out search_path to caller, so we do SET LOCAL
|
||||
-- again after COMMIT
|
||||
BEGIN
|
||||
PERFORM @extschema@.compress_chunk(chunk_rec.oid);
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
|
||||
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
|
||||
ERRCODE = sqlstate;
|
||||
END;
|
||||
END IF;
|
||||
COMMIT;
|
||||
-- SET LOCAL is only active until end of transaction.
|
||||
-- While we could use SET at the start of the function we do not
|
||||
-- want to bleed out search_path to caller, so we do SET LOCAL
|
||||
-- again after COMMIT
|
||||
SET LOCAL search_path TO pg_catalog, pg_temp;
|
||||
IF verbose_log THEN
|
||||
RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
|
||||
END IF;
|
||||
numchunks := numchunks + 1;
|
||||
IF maxchunks > 0 AND numchunks >= maxchunks THEN
|
||||
EXIT;
|
||||
END IF;
|
||||
END LOOP;
|
||||
END;
|
||||
$$ LANGUAGE PLPGSQL;
|
||||
|
@ -96,6 +96,21 @@ policy_compression_get_compress_after_interval(const Jsonb *config)
|
||||
return interval;
|
||||
}
|
||||
|
||||
Interval *
|
||||
policy_compression_get_compress_created_before_interval(const Jsonb *config)
|
||||
{
|
||||
Interval *interval =
|
||||
ts_jsonb_get_interval_field(config, POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE);
|
||||
|
||||
if (interval == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("could not find %s in config for job",
|
||||
POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE)));
|
||||
|
||||
return interval;
|
||||
}
|
||||
|
||||
int64
|
||||
policy_recompression_get_recompress_after_int(const Jsonb *config)
|
||||
{
|
||||
@ -184,7 +199,8 @@ policy_compression_check(PG_FUNCTION_ARGS)
|
||||
/* compression policies are added to hypertables or continuous aggregates */
|
||||
Datum
|
||||
policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
|
||||
Oid compress_after_type, Interval *default_schedule_interval,
|
||||
Oid compress_after_type, Interval *created_before,
|
||||
Interval *default_schedule_interval,
|
||||
bool user_defined_schedule_interval, bool if_not_exists,
|
||||
bool fixed_schedule, TimestampTz initial_start,
|
||||
const char *timezone)
|
||||
@ -201,6 +217,13 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
|
||||
hcache = ts_hypertable_cache_pin();
|
||||
hypertable = validate_compress_chunks_hypertable(hcache, user_rel_oid, &is_cagg);
|
||||
|
||||
/* creation time usage not supported with caggs yet */
|
||||
if (is_cagg && created_before != NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot use \"compress_created_before\" with continuous aggregate \"%s\" ",
|
||||
get_rel_name(user_rel_oid))));
|
||||
|
||||
owner_id = ts_hypertable_permissions_check(user_rel_oid, GetUserId());
|
||||
ts_bgw_job_validate_job_owner(owner_id);
|
||||
|
||||
@ -214,6 +237,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
|
||||
|
||||
if (jobs != NIL)
|
||||
{
|
||||
bool is_equal = false;
|
||||
|
||||
if (!if_not_exists)
|
||||
{
|
||||
ts_cache_release(hcache);
|
||||
@ -227,11 +252,25 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
|
||||
Assert(list_length(jobs) == 1);
|
||||
BgwJob *existing = linitial(jobs);
|
||||
|
||||
if (policy_config_check_hypertable_lag_equality(existing->fd.config,
|
||||
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
|
||||
partitioning_type,
|
||||
compress_after_type,
|
||||
compress_after_datum))
|
||||
if (OidIsValid(compress_after_type))
|
||||
is_equal =
|
||||
policy_config_check_hypertable_lag_equality(existing->fd.config,
|
||||
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
|
||||
partitioning_type,
|
||||
compress_after_type,
|
||||
compress_after_datum);
|
||||
else
|
||||
{
|
||||
Assert(created_before != NULL);
|
||||
is_equal = policy_config_check_hypertable_lag_equality(
|
||||
existing->fd.config,
|
||||
POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE,
|
||||
partitioning_type,
|
||||
INTERVALOID,
|
||||
IntervalPGetDatum(created_before));
|
||||
}
|
||||
|
||||
if (is_equal)
|
||||
{
|
||||
/* If all arguments are the same, do nothing */
|
||||
ts_cache_release(hcache);
|
||||
@ -251,6 +290,22 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
|
||||
PG_RETURN_INT32(-1);
|
||||
}
|
||||
}
|
||||
|
||||
if (created_before)
|
||||
{
|
||||
Assert(!OidIsValid(compress_after_type));
|
||||
compress_after_type = INTERVALOID;
|
||||
}
|
||||
|
||||
if (!is_cagg && IS_INTEGER_TYPE(partitioning_type) && !IS_INTEGER_TYPE(compress_after_type) &&
|
||||
created_before == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid value for parameter %s", POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER),
|
||||
errhint("Integer duration in \"compress_after\" or interval time duration"
|
||||
" in \"compress_created_before\" is required for hypertables with integer "
|
||||
"time dimension.")));
|
||||
|
||||
if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim)) &&
|
||||
!user_defined_schedule_interval)
|
||||
{
|
||||
@ -286,9 +341,14 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
|
||||
switch (compress_after_type)
|
||||
{
|
||||
case INTERVALOID:
|
||||
ts_jsonb_add_interval(parse_state,
|
||||
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
|
||||
DatumGetIntervalP(compress_after_datum));
|
||||
if (created_before)
|
||||
ts_jsonb_add_interval(parse_state,
|
||||
POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE,
|
||||
created_before);
|
||||
else
|
||||
ts_jsonb_add_interval(parse_state,
|
||||
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
|
||||
DatumGetIntervalP(compress_after_datum));
|
||||
break;
|
||||
case INT2OID:
|
||||
ts_jsonb_add_int64(parse_state,
|
||||
@ -360,7 +420,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
|
||||
* The function is not STRICT but we can't allow required args to be NULL
|
||||
* so we need to act like a strict function in those cases
|
||||
*/
|
||||
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
|
||||
if (PG_ARGISNULL(0) || PG_ARGISNULL(2))
|
||||
{
|
||||
ts_feature_flag_check(FEATURE_POLICY);
|
||||
PG_RETURN_NULL();
|
||||
@ -368,7 +428,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
|
||||
|
||||
Oid user_rel_oid = PG_GETARG_OID(0);
|
||||
Datum compress_after_datum = PG_GETARG_DATUM(1);
|
||||
Oid compress_after_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
|
||||
Oid compress_after_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1);
|
||||
bool if_not_exists = PG_GETARG_BOOL(2);
|
||||
bool user_defined_schedule_interval = !(PG_ARGISNULL(3));
|
||||
Interval *default_schedule_interval =
|
||||
@ -378,10 +438,18 @@ policy_compression_add(PG_FUNCTION_ARGS)
|
||||
bool fixed_schedule = !PG_ARGISNULL(4);
|
||||
text *timezone = PG_ARGISNULL(5) ? NULL : PG_GETARG_TEXT_PP(5);
|
||||
char *valid_timezone = NULL;
|
||||
Interval *created_before = PG_GETARG_INTERVAL_P(6);
|
||||
|
||||
ts_feature_flag_check(FEATURE_POLICY);
|
||||
TS_PREVENT_FUNC_IF_READ_ONLY();
|
||||
|
||||
/* compress_after and created_before cannot be specified [or omitted] together */
|
||||
if ((PG_ARGISNULL(1) && PG_ARGISNULL(6)) || (!PG_ARGISNULL(1) && !PG_ARGISNULL(6)))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg(
|
||||
"need to specify one of \"compress_after\" or \"compress_created_before\"")));
|
||||
|
||||
/* if users pass in -infinity for initial_start, then use the current_timestamp instead */
|
||||
if (fixed_schedule)
|
||||
{
|
||||
@ -397,6 +465,7 @@ policy_compression_add(PG_FUNCTION_ARGS)
|
||||
retval = policy_compression_add_internal(user_rel_oid,
|
||||
compress_after_datum,
|
||||
compress_after_type,
|
||||
created_before,
|
||||
default_schedule_interval,
|
||||
user_defined_schedule_interval,
|
||||
if_not_exists,
|
||||
|
@ -21,12 +21,14 @@ extern Datum policy_compression_check(PG_FUNCTION_ARGS);
|
||||
int32 policy_compression_get_hypertable_id(const Jsonb *config);
|
||||
int64 policy_compression_get_compress_after_int(const Jsonb *config);
|
||||
Interval *policy_compression_get_compress_after_interval(const Jsonb *config);
|
||||
Interval *policy_compression_get_compress_created_before_interval(const Jsonb *config);
|
||||
int32 policy_compression_get_maxchunks_per_job(const Jsonb *config);
|
||||
int64 policy_recompression_get_recompress_after_int(const Jsonb *config);
|
||||
Interval *policy_recompression_get_recompress_after_interval(const Jsonb *config);
|
||||
|
||||
Datum policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
|
||||
Oid compress_after_type, Interval *default_schedule_interval,
|
||||
Oid compress_after_type, Interval *created_before,
|
||||
Interval *default_schedule_interval,
|
||||
bool user_defined_schedule_interval, bool if_not_exists,
|
||||
bool fixed_schedule, TimestampTz initial_start,
|
||||
const char *timezone);
|
||||
|
@ -73,9 +73,10 @@ log_retention_boundary(int elevel, PolicyRetentionData *policy_data, const char
|
||||
|
||||
if (OidIsValid(outfuncid))
|
||||
elog(elevel,
|
||||
"%s \"%s\": dropping data older than %s",
|
||||
"%s \"%s\": dropping data %s %s",
|
||||
message,
|
||||
relname,
|
||||
policy_data->use_creation_time ? "created before" : "older than",
|
||||
DatumGetCString(OidFunctionCall1(outfuncid, boundary)));
|
||||
}
|
||||
|
||||
@ -294,7 +295,8 @@ policy_retention_execute(int32 job_id, Jsonb *config)
|
||||
|
||||
chunk_invoke_drop_chunks(policy_data.object_relid,
|
||||
policy_data.boundary,
|
||||
policy_data.boundary_type);
|
||||
policy_data.boundary_type,
|
||||
policy_data.use_creation_time);
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -309,6 +311,9 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po
|
||||
Datum boundary;
|
||||
Datum boundary_type;
|
||||
ContinuousAgg *cagg;
|
||||
Interval *(*interval_getter)(const Jsonb *);
|
||||
interval_getter = policy_retention_get_drop_after_interval;
|
||||
bool use_creation_time = false;
|
||||
|
||||
object_relid = ts_hypertable_id_to_relid(policy_retention_get_hypertable_id(config), false);
|
||||
hypertable = ts_hypertable_cache_get_cache_and_entry(object_relid, CACHE_FLAG_NONE, &hcache);
|
||||
@ -329,14 +334,14 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po
|
||||
|
||||
/* if there's no int_now function the boundary is considered as an INTERVAL */
|
||||
boundary_type = INTERVALOID;
|
||||
interval_getter = policy_retention_get_drop_created_before_interval;
|
||||
use_creation_time = true;
|
||||
}
|
||||
else
|
||||
boundary_type = ts_dimension_get_partition_type(open_dim);
|
||||
|
||||
boundary = get_window_boundary(open_dim,
|
||||
config,
|
||||
policy_retention_get_drop_after_int,
|
||||
policy_retention_get_drop_after_interval);
|
||||
boundary =
|
||||
get_window_boundary(open_dim, config, policy_retention_get_drop_after_int, interval_getter);
|
||||
|
||||
/* We need to do a reverse lookup here since the given hypertable might be
|
||||
a materialized hypertable, and thus need to call drop_chunks on the
|
||||
@ -356,6 +361,7 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po
|
||||
policy_data->object_relid = object_relid;
|
||||
policy_data->boundary = boundary;
|
||||
policy_data->boundary_type = boundary_type;
|
||||
policy_data->use_creation_time = use_creation_time;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,6 +29,7 @@ typedef struct PolicyRetentionData
|
||||
Oid object_relid;
|
||||
Datum boundary;
|
||||
Datum boundary_type;
|
||||
bool use_creation_time;
|
||||
} PolicyRetentionData;
|
||||
|
||||
typedef struct PolicyContinuousAggData
|
||||
|
@ -219,6 +219,7 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
|
||||
policy_compression_add_internal(all_policies.rel_oid,
|
||||
all_policies.compress->compress_after,
|
||||
all_policies.compress->compress_after_type,
|
||||
NULL,
|
||||
DEFAULT_COMPRESSION_SCHEDULE_INTERVAL,
|
||||
false,
|
||||
if_exists,
|
||||
@ -234,6 +235,7 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
|
||||
policy_retention_add_internal(all_policies.rel_oid,
|
||||
all_policies.retention->drop_after_type,
|
||||
all_policies.retention->drop_after,
|
||||
NULL,
|
||||
(Interval) DEFAULT_RETENTION_SCHEDULE_INTERVAL,
|
||||
false,
|
||||
false,
|
||||
@ -702,6 +704,7 @@ policies_show(PG_FUNCTION_ARGS)
|
||||
job,
|
||||
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
|
||||
SHOW_POLICY_KEY_COMPRESS_AFTER);
|
||||
/* POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE not supported with caggs */
|
||||
ts_jsonb_add_interval(parse_state,
|
||||
SHOW_POLICY_KEY_COMPRESS_INTERVAL,
|
||||
&(job->fd.schedule_interval));
|
||||
@ -714,6 +717,7 @@ policies_show(PG_FUNCTION_ARGS)
|
||||
job,
|
||||
POL_RETENTION_CONF_KEY_DROP_AFTER,
|
||||
SHOW_POLICY_KEY_DROP_AFTER);
|
||||
/* POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE not supported with caggs */
|
||||
ts_jsonb_add_interval(parse_state,
|
||||
SHOW_POLICY_KEY_RETENTION_INTERVAL,
|
||||
&(job->fd.schedule_interval));
|
||||
|
@ -23,6 +23,7 @@
|
||||
#define POL_COMPRESSION_CONF_KEY_HYPERTABLE_ID "hypertable_id"
|
||||
#define POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER "compress_after"
|
||||
#define POL_COMPRESSION_CONF_KEY_MAXCHUNKS_TO_COMPRESS "maxchunks_to_compress"
|
||||
#define POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE "compress_created_before"
|
||||
|
||||
#define POLICY_RECOMPRESSION_PROC_NAME "policy_recompression"
|
||||
#define POL_RECOMPRESSION_CONF_KEY_RECOMPRESS_AFTER "recompress_after"
|
||||
@ -31,6 +32,7 @@
|
||||
#define POLICY_RETENTION_CHECK_NAME "policy_retention_check"
|
||||
#define POL_RETENTION_CONF_KEY_HYPERTABLE_ID "hypertable_id"
|
||||
#define POL_RETENTION_CONF_KEY_DROP_AFTER "drop_after"
|
||||
#define POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE "drop_created_before"
|
||||
|
||||
#define SHOW_POLICY_KEY_HYPERTABLE_ID "hypertable_id"
|
||||
#define SHOW_POLICY_KEY_POLICY_NAME "policy_name"
|
||||
@ -38,8 +40,10 @@
|
||||
#define SHOW_POLICY_KEY_REFRESH_START_OFFSET "refresh_start_offset"
|
||||
#define SHOW_POLICY_KEY_REFRESH_END_OFFSET "refresh_end_offset"
|
||||
#define SHOW_POLICY_KEY_COMPRESS_AFTER POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER
|
||||
#define SHOW_POLICY_KEY_COMPRESS_CREATED_BEFORE POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE
|
||||
#define SHOW_POLICY_KEY_COMPRESS_INTERVAL "compress_interval"
|
||||
#define SHOW_POLICY_KEY_DROP_AFTER POL_RETENTION_CONF_KEY_DROP_AFTER
|
||||
#define SHOW_POLICY_KEY_DROP_CREATED_BEFORE POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE
|
||||
#define SHOW_POLICY_KEY_RETENTION_INTERVAL "retention_interval"
|
||||
|
||||
#define DEFAULT_RETENTION_SCHEDULE_INTERVAL \
|
||||
|
@ -32,7 +32,7 @@ bool
|
||||
policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label,
|
||||
Oid partitioning_type, Oid lag_type, Datum lag_datum)
|
||||
{
|
||||
if (IS_INTEGER_TYPE(partitioning_type))
|
||||
if (IS_INTEGER_TYPE(partitioning_type) && lag_type != INTERVALOID)
|
||||
{
|
||||
bool found;
|
||||
int64 config_value = ts_jsonb_get_int64_field(config, json_label, &found);
|
||||
|
@ -101,6 +101,21 @@ policy_retention_get_drop_after_interval(const Jsonb *config)
|
||||
return interval;
|
||||
}
|
||||
|
||||
Interval *
|
||||
policy_retention_get_drop_created_before_interval(const Jsonb *config)
|
||||
{
|
||||
Interval *interval =
|
||||
ts_jsonb_get_interval_field(config, POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE);
|
||||
|
||||
if (interval == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("could not find %s in config for job",
|
||||
POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE)));
|
||||
|
||||
return interval;
|
||||
}
|
||||
|
||||
static Hypertable *
|
||||
validate_drop_chunks_hypertable(Cache *hcache, Oid user_htoid)
|
||||
{
|
||||
@ -151,8 +166,9 @@ validate_drop_chunks_hypertable(Cache *hcache, Oid user_htoid)
|
||||
|
||||
Datum
|
||||
policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
|
||||
Interval default_schedule_interval, bool if_not_exists,
|
||||
bool fixed_schedule, TimestampTz initial_start, const char *timezone)
|
||||
Interval *created_before, Interval default_schedule_interval,
|
||||
bool if_not_exists, bool fixed_schedule, TimestampTz initial_start,
|
||||
const char *timezone)
|
||||
{
|
||||
NameData application_name;
|
||||
int32 job_id;
|
||||
@ -183,9 +199,10 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
|
||||
List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_RETENTION_PROC_NAME,
|
||||
FUNCTIONS_SCHEMA_NAME,
|
||||
hypertable->fd.id);
|
||||
|
||||
if (jobs != NIL)
|
||||
{
|
||||
bool is_equal = false;
|
||||
|
||||
if (!if_not_exists)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DUPLICATE_OBJECT),
|
||||
@ -195,11 +212,25 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
|
||||
Assert(list_length(jobs) == 1);
|
||||
BgwJob *existing = linitial(jobs);
|
||||
|
||||
if (policy_config_check_hypertable_lag_equality(existing->fd.config,
|
||||
POL_RETENTION_CONF_KEY_DROP_AFTER,
|
||||
partitioning_type,
|
||||
window_type,
|
||||
window_datum))
|
||||
if (OidIsValid(window_type))
|
||||
is_equal =
|
||||
policy_config_check_hypertable_lag_equality(existing->fd.config,
|
||||
POL_RETENTION_CONF_KEY_DROP_AFTER,
|
||||
partitioning_type,
|
||||
window_type,
|
||||
window_datum);
|
||||
else
|
||||
{
|
||||
Assert(created_before != NULL);
|
||||
is_equal = policy_config_check_hypertable_lag_equality(
|
||||
existing->fd.config,
|
||||
POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE,
|
||||
partitioning_type,
|
||||
INTERVALOID,
|
||||
IntervalPGetDatum(created_before));
|
||||
}
|
||||
|
||||
if (is_equal)
|
||||
{
|
||||
/* If all arguments are the same, do nothing */
|
||||
ts_cache_release(hcache);
|
||||
@ -220,12 +251,28 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
|
||||
}
|
||||
}
|
||||
|
||||
if (IS_INTEGER_TYPE(partitioning_type) && !IS_INTEGER_TYPE(window_type))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid value for parameter %s", POL_RETENTION_CONF_KEY_DROP_AFTER),
|
||||
errhint("Integer time duration is required for hypertables"
|
||||
" with integer time dimension.")));
|
||||
if (created_before)
|
||||
{
|
||||
Assert(!OidIsValid(window_type));
|
||||
window_type = INTERVALOID;
|
||||
}
|
||||
|
||||
if (IS_INTEGER_TYPE(partitioning_type))
|
||||
{
|
||||
ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(ht_oid);
|
||||
|
||||
if ((IS_INTEGER_TYPE(window_type) && cagg == NULL &&
|
||||
!OidIsValid(ts_get_integer_now_func(dim, false))) ||
|
||||
(!IS_INTEGER_TYPE(window_type) && created_before == NULL))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid value for parameter %s", POL_RETENTION_CONF_KEY_DROP_AFTER),
|
||||
errhint(
|
||||
"Integer duration in \"drop_after\" with valid \"integer_now\" function"
|
||||
" or interval time duration"
|
||||
" in \"drop_created_before\" is required for hypertables with integer "
|
||||
"time dimension.")));
|
||||
}
|
||||
|
||||
if (IS_TIMESTAMP_TYPE(partitioning_type) && window_type != INTERVALOID)
|
||||
ereport(ERROR,
|
||||
@ -242,9 +289,14 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
|
||||
switch (window_type)
|
||||
{
|
||||
case INTERVALOID:
|
||||
ts_jsonb_add_interval(parse_state,
|
||||
POL_RETENTION_CONF_KEY_DROP_AFTER,
|
||||
DatumGetIntervalP(window_datum));
|
||||
if (created_before)
|
||||
ts_jsonb_add_interval(parse_state,
|
||||
POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE,
|
||||
created_before);
|
||||
else
|
||||
ts_jsonb_add_interval(parse_state,
|
||||
POL_RETENTION_CONF_KEY_DROP_AFTER,
|
||||
DatumGetIntervalP(window_datum));
|
||||
break;
|
||||
case INT2OID:
|
||||
ts_jsonb_add_int64(parse_state,
|
||||
@ -306,7 +358,7 @@ Datum
|
||||
policy_retention_add(PG_FUNCTION_ARGS)
|
||||
{
|
||||
/* behave like a strict function */
|
||||
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
|
||||
if (PG_ARGISNULL(0) || PG_ARGISNULL(2))
|
||||
PG_RETURN_NULL();
|
||||
|
||||
Oid ht_oid = PG_GETARG_OID(0);
|
||||
@ -319,11 +371,20 @@ policy_retention_add(PG_FUNCTION_ARGS)
|
||||
bool fixed_schedule = !PG_ARGISNULL(4);
|
||||
text *timezone = PG_ARGISNULL(5) ? NULL : PG_GETARG_TEXT_PP(5);
|
||||
char *valid_timezone = NULL;
|
||||
// Interval *created_before = PG_ARGISNULL(6) ? NULL: PG_GETARG_INTERVAL_P(6);
|
||||
Interval *created_before = PG_GETARG_INTERVAL_P(6);
|
||||
|
||||
ts_feature_flag_check(FEATURE_POLICY);
|
||||
TS_PREVENT_FUNC_IF_READ_ONLY();
|
||||
|
||||
Datum retval;
|
||||
|
||||
/* drop_after and created_before cannot be specified [or omitted] together */
|
||||
if ((PG_ARGISNULL(1) && PG_ARGISNULL(6)) || (!PG_ARGISNULL(1) && !PG_ARGISNULL(6)))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("need to specify one of \"drop_after\" or \"drop_created_before\"")));
|
||||
|
||||
/* if users pass in -infinity for initial_start, then use the current_timestamp instead */
|
||||
if (fixed_schedule)
|
||||
{
|
||||
@ -338,6 +399,7 @@ policy_retention_add(PG_FUNCTION_ARGS)
|
||||
retval = policy_retention_add_internal(ht_oid,
|
||||
window_type,
|
||||
window_datum,
|
||||
created_before,
|
||||
default_schedule_interval,
|
||||
if_not_exists,
|
||||
fixed_schedule,
|
||||
|
@ -18,10 +18,11 @@ extern Datum policy_retention_remove(PG_FUNCTION_ARGS);
|
||||
int32 policy_retention_get_hypertable_id(const Jsonb *config);
|
||||
int64 policy_retention_get_drop_after_int(const Jsonb *config);
|
||||
Interval *policy_retention_get_drop_after_interval(const Jsonb *config);
|
||||
Interval *policy_retention_get_drop_created_before_interval(const Jsonb *config);
|
||||
|
||||
Datum policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
|
||||
Interval default_schedule_interval, bool if_not_exists,
|
||||
bool fixed_schedule, TimestampTz initial_start,
|
||||
const char *timezone);
|
||||
Interval *created_before, Interval default_schedule_interval,
|
||||
bool if_not_exists, bool fixed_schedule,
|
||||
TimestampTz initial_start, const char *timezone);
|
||||
Datum policy_retention_remove_internal(Oid table_oid, bool if_exists);
|
||||
#endif /* TIMESCALEDB_TSL_BGW_POLICY_RETENTION_API_H */
|
||||
|
@ -308,7 +308,7 @@ chunk_set_default_data_node(PG_FUNCTION_ARGS)
|
||||
* Returns the number of dropped chunks.
|
||||
*/
|
||||
int
|
||||
chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type)
|
||||
chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type, bool use_creation_time)
|
||||
{
|
||||
EState *estate;
|
||||
ExprContext *econtext;
|
||||
@ -318,27 +318,26 @@ chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type)
|
||||
SetExprState *state;
|
||||
Oid restype;
|
||||
Oid func_oid;
|
||||
Const *argarr[DROP_CHUNKS_NARGS] = {
|
||||
makeConst(REGCLASSOID,
|
||||
-1,
|
||||
InvalidOid,
|
||||
sizeof(relid),
|
||||
ObjectIdGetDatum(relid),
|
||||
false,
|
||||
false),
|
||||
makeConst(older_than_type,
|
||||
-1,
|
||||
InvalidOid,
|
||||
get_typlen(older_than_type),
|
||||
older_than,
|
||||
false,
|
||||
get_typbyval(older_than_type)),
|
||||
makeNullConst(older_than_type, -1, InvalidOid),
|
||||
castNode(Const, makeBoolConst(false, true)),
|
||||
/* For now, till we actually support created_before/created_after later */
|
||||
makeNullConst(older_than_type, -1, InvalidOid),
|
||||
makeNullConst(older_than_type, -1, InvalidOid),
|
||||
};
|
||||
Const *TypeNullCons = makeNullConst(older_than_type, -1, InvalidOid);
|
||||
Const *IntervalVal = makeConst(older_than_type,
|
||||
-1,
|
||||
InvalidOid,
|
||||
get_typlen(older_than_type),
|
||||
older_than,
|
||||
false,
|
||||
get_typbyval(older_than_type));
|
||||
Const *argarr[DROP_CHUNKS_NARGS] = { makeConst(REGCLASSOID,
|
||||
-1,
|
||||
InvalidOid,
|
||||
sizeof(relid),
|
||||
ObjectIdGetDatum(relid),
|
||||
false,
|
||||
false),
|
||||
TypeNullCons,
|
||||
TypeNullCons,
|
||||
castNode(Const, makeBoolConst(false, true)),
|
||||
TypeNullCons,
|
||||
TypeNullCons };
|
||||
Oid type_id[DROP_CHUNKS_NARGS] = { REGCLASSOID, ANYOID, ANYOID, BOOLOID, ANYOID, ANYOID };
|
||||
char *const schema_name = ts_extension_schema_name();
|
||||
List *const fqn = list_make2(makeString(schema_name), makeString(DROP_CHUNKS_FUNCNAME));
|
||||
@ -349,6 +348,12 @@ chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type)
|
||||
func_oid = LookupFuncName(fqn, lengthof(type_id), type_id, false);
|
||||
Assert(func_oid); /* LookupFuncName should not return an invalid OID */
|
||||
|
||||
/* decide whether to use "older_than" or "drop_created_before" */
|
||||
if (use_creation_time)
|
||||
argarr[4] = IntervalVal;
|
||||
else
|
||||
argarr[1] = IntervalVal;
|
||||
|
||||
/* Prepare the function expr with argument list */
|
||||
get_func_result_type(func_oid, &restype, NULL);
|
||||
|
||||
|
@ -18,7 +18,8 @@ extern Datum chunk_freeze_chunk(PG_FUNCTION_ARGS);
|
||||
extern Datum chunk_unfreeze_chunk(PG_FUNCTION_ARGS);
|
||||
extern Datum chunk_drop_stale_chunks(PG_FUNCTION_ARGS);
|
||||
extern void ts_chunk_drop_stale_chunks(const char *node_name, ArrayType *chunks_array);
|
||||
extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type);
|
||||
extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type,
|
||||
bool use_creation_time);
|
||||
extern Datum chunk_create_replica_table(PG_FUNCTION_ARGS);
|
||||
extern void chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes);
|
||||
|
||||
|
@ -600,6 +600,14 @@ select * from _timescaledb_catalog.dimension;
|
||||
|
||||
alter schema new_public rename to public;
|
||||
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2
|
||||
-- test that the behavior is strict when providing NULL required arguments
|
||||
create table test_strict (time timestamptz not null, a int, b int);
|
||||
select create_hypertable('test_strict', 'time');
|
||||
create_hypertable
|
||||
--------------------------
|
||||
(6,public,test_strict,t)
|
||||
(1 row)
|
||||
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_reorder_policy('test_table_perm', 'test_table_perm_pkey');
|
||||
ERROR: must be owner of hypertable "test_table_perm"
|
||||
@ -609,6 +617,8 @@ select add_retention_policy('test_table_perm', INTERVAL '4 months', true);
|
||||
ERROR: must be owner of hypertable "test_table_perm"
|
||||
select remove_retention_policy('test_table');
|
||||
ERROR: must be owner of hypertable "test_table"
|
||||
select add_retention_policy('test_strict', drop_after => NULL);
|
||||
ERROR: need to specify one of "drop_after" or "drop_created_before"
|
||||
\set ON_ERROR_STOP 1
|
||||
-- Check the number of non-telemetry policies. We check for telemetry
|
||||
-- policy in telemetry_community.sql
|
||||
@ -623,21 +633,7 @@ GROUP BY proc_name;
|
||||
policy_retention | 2
|
||||
(3 rows)
|
||||
|
||||
-- test that the behavior is strict when providing NULL required arguments
|
||||
create table test_strict (time timestamptz not null, a int, b int);
|
||||
select create_hypertable('test_strict', 'time');
|
||||
create_hypertable
|
||||
--------------------------
|
||||
(6,public,test_strict,t)
|
||||
(1 row)
|
||||
|
||||
-- test retention with null arguments
|
||||
select add_retention_policy('test_strict', drop_after => NULL);
|
||||
add_retention_policy
|
||||
----------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select add_retention_policy(NULL, NULL);
|
||||
add_retention_policy
|
||||
----------------------
|
||||
@ -665,12 +661,6 @@ select add_retention_policy('test_strict', interval '2 days', schedule_interval
|
||||
|
||||
-- test compression with null arguments
|
||||
alter table test_strict set (timescaledb.compress);
|
||||
select add_compression_policy('test_strict', compress_after => NULL);
|
||||
add_compression_policy
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select add_compression_policy(NULL, compress_after => NULL);
|
||||
add_compression_policy
|
||||
------------------------
|
||||
@ -723,6 +713,8 @@ select * from _timescaledb_config.bgw_job where id in (:retenion_id_missing_sche
|
||||
|
||||
-- test policy check functions with NULL args
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compression_policy('test_strict', compress_after => NULL);
|
||||
ERROR: need to specify one of "compress_after" or "compress_created_before"
|
||||
SELECT _timescaledb_functions.policy_compression_check(NULL);
|
||||
ERROR: config must not be NULL
|
||||
SELECT _timescaledb_functions.policy_refresh_continuous_aggregate_check(NULL);
|
||||
|
@ -1145,10 +1145,13 @@ ERROR: setup a refresh policy for "metrics_cagg" before setting up a compressio
|
||||
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval) as "REFRESH_JOB" \gset
|
||||
SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
|
||||
ERROR: compression not enabled on continuous aggregate "metrics_cagg"
|
||||
\set ON_ERROR_STOP 1
|
||||
ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress);
|
||||
NOTICE: defaulting compress_segmentby to device_id
|
||||
NOTICE: defaulting compress_orderby to dayb
|
||||
--cannot use compress_created_before with cagg
|
||||
SELECT add_compression_policy('metrics_cagg', compress_created_before => '8 day'::interval) AS "COMP_JOB" ;
|
||||
ERROR: cannot use "compress_created_before" with continuous aggregate "metrics_cagg"
|
||||
\set ON_ERROR_STOP 1
|
||||
SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
|
||||
COMP_JOB
|
||||
----------
|
||||
|
@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10;
|
||||
ALTER TABLE test_table_smallint SET (timescaledb.compress);
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval );
|
||||
ERROR: unsupported compress_after argument type, expected type : smallint
|
||||
ERROR: invalid value for parameter compress_after
|
||||
\set ON_ERROR_STOP 1
|
||||
SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset
|
||||
SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
|
@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10;
|
||||
ALTER TABLE test_table_smallint SET (timescaledb.compress);
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval );
|
||||
ERROR: unsupported compress_after argument type, expected type : smallint
|
||||
ERROR: invalid value for parameter compress_after
|
||||
\set ON_ERROR_STOP 1
|
||||
SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset
|
||||
SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
|
@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10;
|
||||
ALTER TABLE test_table_smallint SET (timescaledb.compress);
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval );
|
||||
ERROR: unsupported compress_after argument type, expected type : smallint
|
||||
ERROR: invalid value for parameter compress_after
|
||||
\set ON_ERROR_STOP 1
|
||||
SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset
|
||||
SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
|
@ -155,7 +155,7 @@ INSERT INTO test_table_smallint SELECT generate_series(1,5), 10;
|
||||
ALTER TABLE test_table_smallint SET (timescaledb.compress);
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compression_policy( 'test_table_smallint', compress_after=> '1 day'::interval );
|
||||
ERROR: unsupported compress_after argument type, expected type : smallint
|
||||
ERROR: invalid value for parameter compress_after
|
||||
\set ON_ERROR_STOP 1
|
||||
SELECT add_compression_policy('test_table_smallint', 2::SMALLINT) AS compressjob_id \gset
|
||||
SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
|
@ -485,8 +485,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1000 config must have compress_after
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE
|
||||
ERROR: job 1000 config must have compress_after or compress_created_before
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE
|
||||
SELECT remove_compression_policy('test_table_int');
|
||||
remove_compression_policy
|
||||
---------------------------
|
||||
@ -508,7 +508,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1001 config must have hypertable_id
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE
|
||||
UPDATE _timescaledb_config.bgw_job
|
||||
SET config = NULL
|
||||
WHERE id = :compressjob_id;
|
||||
@ -521,7 +521,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1001 has null config
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE
|
||||
-- test ADD COLUMN IF NOT EXISTS
|
||||
CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL);
|
||||
SELECT create_hypertable('metric', 'time', 'dev_id', 10);
|
||||
|
@ -485,8 +485,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1000 config must have compress_after
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE
|
||||
ERROR: job 1000 config must have compress_after or compress_created_before
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE
|
||||
SELECT remove_compression_policy('test_table_int');
|
||||
remove_compression_policy
|
||||
---------------------------
|
||||
@ -508,7 +508,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1001 config must have hypertable_id
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE
|
||||
UPDATE _timescaledb_config.bgw_job
|
||||
SET config = NULL
|
||||
WHERE id = :compressjob_id;
|
||||
@ -521,7 +521,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1001 has null config
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE
|
||||
-- test ADD COLUMN IF NOT EXISTS
|
||||
CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL);
|
||||
SELECT create_hypertable('metric', 'time', 'dev_id', 10);
|
||||
|
@ -485,8 +485,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1000 config must have compress_after
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE
|
||||
ERROR: job 1000 config must have compress_after or compress_created_before
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE
|
||||
SELECT remove_compression_policy('test_table_int');
|
||||
remove_compression_policy
|
||||
---------------------------
|
||||
@ -508,7 +508,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1001 config must have hypertable_id
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE
|
||||
UPDATE _timescaledb_config.bgw_job
|
||||
SET config = NULL
|
||||
WHERE id = :compressjob_id;
|
||||
@ -521,7 +521,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1001 has null config
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE
|
||||
-- test ADD COLUMN IF NOT EXISTS
|
||||
CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL);
|
||||
SELECT create_hypertable('metric', 'time', 'dev_id', 10);
|
||||
|
@ -485,8 +485,8 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1000 config must have compress_after
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 35 at RAISE
|
||||
ERROR: job 1000 config must have compress_after or compress_created_before
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 48 at RAISE
|
||||
SELECT remove_compression_policy('test_table_int');
|
||||
remove_compression_policy
|
||||
---------------------------
|
||||
@ -508,7 +508,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1001 config must have hypertable_id
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 26 at RAISE
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 29 at RAISE
|
||||
UPDATE _timescaledb_config.bgw_job
|
||||
SET config = NULL
|
||||
WHERE id = :compressjob_id;
|
||||
@ -521,7 +521,7 @@ SELECT config FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
--should fail
|
||||
CALL run_job(:compressjob_id);
|
||||
ERROR: job 1001 has null config
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 21 at RAISE
|
||||
CONTEXT: PL/pgSQL function _timescaledb_functions.policy_compression(integer,jsonb) line 24 at RAISE
|
||||
-- test ADD COLUMN IF NOT EXISTS
|
||||
CREATE TABLE metric (time TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, dev_id INT4 NOT NULL);
|
||||
SELECT create_hypertable('metric', 'time', 'dev_id', 10);
|
||||
|
@ -73,4 +73,113 @@ select count(*) from timescaledb_information.chunks where hypertable_name='test'
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- retention policy
|
||||
INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i;
|
||||
\set ON_ERROR_STOP 0
|
||||
-- interval input for "drop_after" for INTEGER partitioning errors out
|
||||
SELECT add_retention_policy('test', INTERVAL '5 seconds', true);
|
||||
ERROR: invalid value for parameter drop_after
|
||||
-- integer input for "drop_after" for INTEGER partitioning without valid
|
||||
-- integer_now function errors out
|
||||
SELECT add_retention_policy('test', 2000, true);
|
||||
ERROR: invalid value for parameter drop_after
|
||||
-- both drop_created_before and drop_after should error out
|
||||
SELECT add_retention_policy('test', drop_after => INTERVAL '5 seconds',
|
||||
drop_created_before => INTERVAL '2 seconds');
|
||||
ERROR: need to specify one of "drop_after" or "drop_created_before"
|
||||
\set ON_ERROR_STOP 1
|
||||
SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds',
|
||||
if_not_exists => true) as drop_chunks_job_id \gset
|
||||
CALL run_job(:drop_chunks_job_id);
|
||||
select count(*) from timescaledb_information.chunks where hypertable_name='test';
|
||||
count
|
||||
-------
|
||||
11
|
||||
(1 row)
|
||||
|
||||
SELECT pg_sleep(3);
|
||||
pg_sleep
|
||||
----------
|
||||
|
||||
(1 row)
|
||||
|
||||
CALL run_job(:drop_chunks_job_id);
|
||||
select count(*) from timescaledb_information.chunks where hypertable_name='test';
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- check for WARNING/NOTICE if policy already exists
|
||||
SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds',
|
||||
if_not_exists => true);
|
||||
NOTICE: retention policy already exists for hypertable "test", skipping
|
||||
add_retention_policy
|
||||
----------------------
|
||||
-1
|
||||
(1 row)
|
||||
|
||||
SELECT add_retention_policy('test', drop_created_before => INTERVAL '20 seconds',
|
||||
if_not_exists => true);
|
||||
WARNING: retention policy already exists for hypertable "test"
|
||||
add_retention_policy
|
||||
----------------------
|
||||
-1
|
||||
(1 row)
|
||||
|
||||
SELECT remove_retention_policy('test');
|
||||
remove_retention_policy
|
||||
-------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- compression policy
|
||||
ALTER TABLE test SET (timescaledb.compress);
|
||||
INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i;
|
||||
-- Chunk compression status
|
||||
SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats;
|
||||
compression_status
|
||||
--------------------
|
||||
Uncompressed
|
||||
(1 row)
|
||||
|
||||
-- Compression policy
|
||||
SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds') AS compress_chunks_job_id \gset
|
||||
SELECT pg_sleep(3);
|
||||
pg_sleep
|
||||
----------
|
||||
|
||||
(1 row)
|
||||
|
||||
CALL run_job(:compress_chunks_job_id);
|
||||
-- Chunk compression status
|
||||
SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats;
|
||||
compression_status
|
||||
--------------------
|
||||
Compressed
|
||||
(1 row)
|
||||
|
||||
-- check for WARNING/NOTICE if policy already exists
|
||||
SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds',
|
||||
if_not_exists => true);
|
||||
NOTICE: compression policy already exists for hypertable "test", skipping
|
||||
add_compression_policy
|
||||
------------------------
|
||||
-1
|
||||
(1 row)
|
||||
|
||||
SELECT add_compression_policy('test', compress_created_before => INTERVAL '20 seconds',
|
||||
if_not_exists => true);
|
||||
WARNING: compression policy already exists for hypertable "test"
|
||||
add_compression_policy
|
||||
------------------------
|
||||
-1
|
||||
(1 row)
|
||||
|
||||
SELECT remove_compression_policy('test');
|
||||
remove_compression_policy
|
||||
---------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test;
|
||||
|
@ -115,7 +115,7 @@ SELECT * FROM _timescaledb_config.bgw_job WHERE id >= 1000 ORDER BY id;
|
||||
select add_retention_policy();
|
||||
ERROR: function add_retention_policy() does not exist at character 8
|
||||
select add_retention_policy('test_table');
|
||||
ERROR: function add_retention_policy(unknown) does not exist at character 8
|
||||
ERROR: need to specify one of "drop_after" or "drop_created_before"
|
||||
select add_retention_policy(INTERVAL '3 hours');
|
||||
ERROR: function add_retention_policy(interval) does not exist at character 8
|
||||
select add_retention_policy('test_table', INTERVAL 'haha');
|
||||
@ -198,6 +198,7 @@ select * from _timescaledb_catalog.dimension;
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER
|
||||
CREATE SCHEMA IF NOT EXISTS my_new_schema;
|
||||
create or replace function my_new_schema.dummy_now2() returns BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 1::BIGINT';
|
||||
grant usage on SCHEMA my_new_schema to public;
|
||||
grant execute on ALL FUNCTIONS IN SCHEMA my_new_schema to public;
|
||||
select set_integer_now_func('test_table_int', 'my_new_schema.dummy_now2');
|
||||
set_integer_now_func
|
||||
|
@ -407,8 +407,8 @@ ERROR: null values cannot be formatted as an SQL identifier
|
||||
CALL _timescaledb_internal.policy_compression(0,NULL);
|
||||
WARNING: procedure _timescaledb_internal.policy_compression(integer,jsonb) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.
|
||||
ERROR: job 0 has null config
|
||||
CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true);
|
||||
WARNING: procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.
|
||||
CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true,true);
|
||||
WARNING: procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.
|
||||
ERROR: invalid hypertable or continuous aggregate
|
||||
CALL _timescaledb_internal.policy_recompression(0,NULL);
|
||||
WARNING: procedure _timescaledb_internal.policy_recompression(integer,jsonb) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.
|
||||
|
@ -114,7 +114,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
|
||||
_timescaledb_functions.ping_data_node(name,interval)
|
||||
_timescaledb_functions.policy_compression(integer,jsonb)
|
||||
_timescaledb_functions.policy_compression_check(jsonb)
|
||||
_timescaledb_functions.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean)
|
||||
_timescaledb_functions.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean)
|
||||
_timescaledb_functions.policy_job_error_retention(integer,jsonb)
|
||||
_timescaledb_functions.policy_job_error_retention_check(jsonb)
|
||||
_timescaledb_functions.policy_recompression(integer,jsonb)
|
||||
@ -225,7 +225,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
|
||||
_timescaledb_internal.ping_data_node(name,interval)
|
||||
_timescaledb_internal.policy_compression(integer,jsonb)
|
||||
_timescaledb_internal.policy_compression_check(jsonb)
|
||||
_timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean)
|
||||
_timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean)
|
||||
_timescaledb_internal.policy_job_error_retention(integer,jsonb)
|
||||
_timescaledb_internal.policy_job_error_retention_check(jsonb)
|
||||
_timescaledb_internal.policy_recompression(integer,jsonb)
|
||||
@ -262,14 +262,14 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
|
||||
debug_waitpoint_enable(text)
|
||||
debug_waitpoint_id(text)
|
||||
debug_waitpoint_release(text)
|
||||
add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text)
|
||||
add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval)
|
||||
add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text)
|
||||
add_data_node(name,text,name,integer,boolean,boolean,text)
|
||||
add_dimension(regclass,_timescaledb_internal.dimension_info,boolean)
|
||||
add_dimension(regclass,name,integer,anyelement,regproc,boolean)
|
||||
add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc,boolean,text)
|
||||
add_reorder_policy(regclass,name,boolean,timestamp with time zone,text)
|
||||
add_retention_policy(regclass,"any",boolean,interval,timestamp with time zone,text)
|
||||
add_retention_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval)
|
||||
alter_data_node(name,text,name,integer,boolean)
|
||||
alter_job(integer,interval,interval,integer,interval,boolean,jsonb,timestamp with time zone,boolean,regproc,boolean,timestamp with time zone,text)
|
||||
approximate_row_count(regclass)
|
||||
|
@ -100,7 +100,7 @@ CALL _timescaledb_internal.cagg_migrate_execute_override_cagg(NULL,NULL);
|
||||
CALL _timescaledb_internal.cagg_migrate_execute_plan(NULL);
|
||||
CALL _timescaledb_internal.cagg_migrate_execute_refresh_new_cagg(NULL,NULL);
|
||||
CALL _timescaledb_internal.policy_compression(0,NULL);
|
||||
CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true);
|
||||
CALL _timescaledb_internal.policy_compression_execute(0,0,NULL::interval,0,true,true,true);
|
||||
CALL _timescaledb_internal.policy_recompression(0,NULL);
|
||||
CALL _timescaledb_internal.policy_refresh_continuous_aggregate(0,NULL);
|
||||
CALL _timescaledb_internal.policy_reorder(0,NULL);
|
||||
|
@ -313,13 +313,17 @@ select * from _timescaledb_catalog.dimension;
|
||||
alter schema new_public rename to public;
|
||||
|
||||
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2
|
||||
-- test that the behavior is strict when providing NULL required arguments
|
||||
create table test_strict (time timestamptz not null, a int, b int);
|
||||
select create_hypertable('test_strict', 'time');
|
||||
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_reorder_policy('test_table_perm', 'test_table_perm_pkey');
|
||||
select remove_reorder_policy('test_table');
|
||||
|
||||
select add_retention_policy('test_table_perm', INTERVAL '4 months', true);
|
||||
select remove_retention_policy('test_table');
|
||||
|
||||
select add_retention_policy('test_strict', drop_after => NULL);
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
-- Check the number of non-telemetry policies. We check for telemetry
|
||||
@ -330,11 +334,7 @@ WHERE proc_name NOT LIKE '%telemetry%'
|
||||
GROUP BY proc_name;
|
||||
|
||||
|
||||
-- test that the behavior is strict when providing NULL required arguments
|
||||
create table test_strict (time timestamptz not null, a int, b int);
|
||||
select create_hypertable('test_strict', 'time');
|
||||
-- test retention with null arguments
|
||||
select add_retention_policy('test_strict', drop_after => NULL);
|
||||
select add_retention_policy(NULL, NULL);
|
||||
select add_retention_policy(NULL, drop_after => interval '2 days');
|
||||
-- this is an optional argument
|
||||
@ -342,7 +342,6 @@ select add_retention_policy('test_strict', drop_after => interval '2 days', if_n
|
||||
select add_retention_policy('test_strict', interval '2 days', schedule_interval => NULL);
|
||||
-- test compression with null arguments
|
||||
alter table test_strict set (timescaledb.compress);
|
||||
select add_compression_policy('test_strict', compress_after => NULL);
|
||||
select add_compression_policy(NULL, compress_after => NULL);
|
||||
select add_compression_policy('test_strict', INTERVAL '2 weeks', if_not_exists => NULL);
|
||||
select add_compression_policy('test_strict', INTERVAL '2 weeks', schedule_interval => NULL);
|
||||
@ -366,6 +365,7 @@ select * from _timescaledb_config.bgw_job where id in (:retenion_id_missing_sche
|
||||
|
||||
-- test policy check functions with NULL args
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compression_policy('test_strict', compress_after => NULL);
|
||||
SELECT _timescaledb_functions.policy_compression_check(NULL);
|
||||
SELECT _timescaledb_functions.policy_refresh_continuous_aggregate_check(NULL);
|
||||
SELECT _timescaledb_functions.policy_reorder_check(NULL);
|
||||
|
@ -540,9 +540,12 @@ SELECT add_compression_policy('metrics_cagg', '1 day'::interval);
|
||||
--can set compression policy only after enabling compression --
|
||||
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval) as "REFRESH_JOB" \gset
|
||||
SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
|
||||
ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress);
|
||||
|
||||
--cannot use compress_created_before with cagg
|
||||
SELECT add_compression_policy('metrics_cagg', compress_created_before => '8 day'::interval) AS "COMP_JOB" ;
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress);
|
||||
|
||||
SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
|
||||
SELECT remove_compression_policy('metrics_cagg');
|
||||
|
@ -28,4 +28,51 @@ select count(*) from timescaledb_information.chunks where hypertable_name='test'
|
||||
SELECT count(*) from drop_chunks('test', created_after => INTERVAL '1 hour');
|
||||
select count(*) from timescaledb_information.chunks where hypertable_name='test';
|
||||
|
||||
-- retention policy
|
||||
INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i;
|
||||
\set ON_ERROR_STOP 0
|
||||
-- interval input for "drop_after" for INTEGER partitioning errors out
|
||||
SELECT add_retention_policy('test', INTERVAL '5 seconds', true);
|
||||
-- integer input for "drop_after" for INTEGER partitioning without valid
|
||||
-- integer_now function errors out
|
||||
SELECT add_retention_policy('test', 2000, true);
|
||||
-- both drop_created_before and drop_after should error out
|
||||
SELECT add_retention_policy('test', drop_after => INTERVAL '5 seconds',
|
||||
drop_created_before => INTERVAL '2 seconds');
|
||||
\set ON_ERROR_STOP 1
|
||||
SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds',
|
||||
if_not_exists => true) as drop_chunks_job_id \gset
|
||||
CALL run_job(:drop_chunks_job_id);
|
||||
select count(*) from timescaledb_information.chunks where hypertable_name='test';
|
||||
SELECT pg_sleep(3);
|
||||
CALL run_job(:drop_chunks_job_id);
|
||||
select count(*) from timescaledb_information.chunks where hypertable_name='test';
|
||||
-- check for WARNING/NOTICE if policy already exists
|
||||
SELECT add_retention_policy('test', drop_created_before => INTERVAL '2 seconds',
|
||||
if_not_exists => true);
|
||||
SELECT add_retention_policy('test', drop_created_before => INTERVAL '20 seconds',
|
||||
if_not_exists => true);
|
||||
SELECT remove_retention_policy('test');
|
||||
|
||||
-- compression policy
|
||||
ALTER TABLE test SET (timescaledb.compress);
|
||||
INSERT INTO test SELECT i, i %10, 0.10 FROM generate_series(1, 100, 1) i;
|
||||
|
||||
-- Chunk compression status
|
||||
SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats;
|
||||
|
||||
-- Compression policy
|
||||
SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds') AS compress_chunks_job_id \gset
|
||||
SELECT pg_sleep(3);
|
||||
CALL run_job(:compress_chunks_job_id);
|
||||
|
||||
-- Chunk compression status
|
||||
SELECT DISTINCT compression_status FROM _timescaledb_internal.compressed_chunk_stats;
|
||||
-- check for WARNING/NOTICE if policy already exists
|
||||
SELECT add_compression_policy('test', compress_created_before => INTERVAL '2 seconds',
|
||||
if_not_exists => true);
|
||||
SELECT add_compression_policy('test', compress_created_before => INTERVAL '20 seconds',
|
||||
if_not_exists => true);
|
||||
SELECT remove_compression_policy('test');
|
||||
|
||||
DROP TABLE test;
|
||||
|
@ -90,6 +90,7 @@ select * from _timescaledb_catalog.dimension;
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS my_new_schema;
|
||||
create or replace function my_new_schema.dummy_now2() returns BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 1::BIGINT';
|
||||
grant usage on SCHEMA my_new_schema to public;
|
||||
grant execute on ALL FUNCTIONS IN SCHEMA my_new_schema to public;
|
||||
select set_integer_now_func('test_table_int', 'my_new_schema.dummy_now2');
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user