1 step CAgg policy management

This simplifies the process of adding the policies
for the CAggs. Now, with one single sql statements
all the policies can be added for a given CAgg.
Similarly, all the policies can be removed or modified
via single sql statement only.

This also adds a new function as well as a view to show all
the policies on a continuous aggregate.
This commit is contained in:
Rafia Sabih 2022-05-04 15:56:14 +05:30 committed by Markos Fountoulakis
parent be429eb3d9
commit bca65f4697
17 changed files with 899 additions and 85 deletions

View File

@ -55,3 +55,48 @@ CREATE OR REPLACE FUNCTION @extschema@.remove_continuous_aggregate_policy(contin
RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_remove'
LANGUAGE C VOLATILE STRICT;
/* 1 step policies */
/* Add policies */
CREATE OR REPLACE FUNCTION @extschema@.add_policies(
relation REGCLASS,
if_not_exists BOOL = false,
refresh_start_offset "any" = NULL,
refresh_end_offset "any" = NULL,
refresh_schedule_interval INTERVAL = NULL,
compress_after "any" = NULL,
drop_after "any" = NULL)
RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_policies_add'
LANGUAGE C VOLATILE;
/* Remove policies */
CREATE OR REPLACE FUNCTION @extschema@.remove_policies(
relation REGCLASS,
if_not_exists BOOL = false,
VARIADIC policy_names TEXT[] = NULL)
RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_policies_remove'
LANGUAGE C VOLATILE;
/* Alter policies */
CREATE OR REPLACE FUNCTION @extschema@.alter_policies(
relation REGCLASS,
if_not_exists BOOL = false,
refresh_start_offset "any" = NULL,
refresh_end_offset "any" = NULL,
refresh_schedule_interval INTERVAL = NULL,
compress_after "any" = NULL,
drop_after "any" = NULL)
RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_policies_alter'
LANGUAGE C VOLATILE;
/* Show policies info */
CREATE OR REPLACE FUNCTION @extschema@.show_policies(
relation REGCLASS,
if_not_exists BOOL = false)
RETURNS SETOF JSONB
AS '@MODULE_PATHNAME@', 'ts_policies_show'
LANGUAGE C VOLATILE;

View File

@ -51,6 +51,11 @@ CROSSMODULE_WRAPPER(copy_chunk_proc);
CROSSMODULE_WRAPPER(copy_chunk_cleanup_proc);
CROSSMODULE_WRAPPER(subscription_exec);
CROSSMODULE_WRAPPER(policies_add);
CROSSMODULE_WRAPPER(policies_remove);
CROSSMODULE_WRAPPER(policies_alter);
CROSSMODULE_WRAPPER(policies_show);
/* partialize/finalize aggregate */
CROSSMODULE_WRAPPER(partialize_agg);
CROSSMODULE_WRAPPER(finalize_agg_sfunc);
@ -404,6 +409,11 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.subscription_exec = error_no_default_fn_pg_community,
.reorder_chunk = error_no_default_fn_pg_community,
.policies_add = error_no_default_fn_pg_community,
.policies_remove = error_no_default_fn_pg_community,
.policies_alter = error_no_default_fn_pg_community,
.policies_show = error_no_default_fn_pg_community,
.partialize_agg = error_no_default_fn_pg_community,
.finalize_agg_sfunc = error_no_default_fn_pg_community,
.finalize_agg_ffunc = error_no_default_fn_pg_community,

View File

@ -53,6 +53,11 @@ typedef struct CrossModuleFunctions
PGFunction policy_retention_proc;
PGFunction policy_retention_remove;
PGFunction policies_add;
PGFunction policies_remove;
PGFunction policies_alter;
PGFunction policies_show;
PGFunction job_add;
PGFunction job_alter;
PGFunction job_alter_set_hypertable_id;

View File

@ -125,6 +125,14 @@ typedef struct CaggsInfoData
List *bucket_functions;
} CaggsInfo;
typedef struct CaggPolicyOffset
{
Datum value;
Oid type;
bool isnull;
const char *name;
} CaggPolicyOffset;
extern TSDLLEXPORT const CaggsInfo ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id);
extern TSDLLEXPORT void ts_populate_caggs_info_from_arrays(ArrayType *mat_hypertable_ids,
ArrayType *bucket_widths,

View File

@ -5,6 +5,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/job_api.c
${CMAKE_CURRENT_SOURCE_DIR}/reorder_api.c
${CMAKE_CURRENT_SOURCE_DIR}/retention_api.c
${CMAKE_CURRENT_SOURCE_DIR}/policy_utils.c)
${CMAKE_CURRENT_SOURCE_DIR}/policy_utils.c
${CMAKE_CURRENT_SOURCE_DIR}/policies_v2.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
target_include_directories(${TSL_LIBRARY_NAME} PRIVATE ${CMAKE_SOURCE_DIR})

View File

@ -19,6 +19,7 @@
#include "bgw/job.h"
#include "bgw_policy/job.h"
#include "bgw_policy/continuous_aggregate_api.h"
#include "bgw_policy/policies_v2.h"
/*
* Default scheduled interval for compress jobs = default chunk length.
@ -37,10 +38,8 @@
#define DEFAULT_RETRY_PERIOD \
DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("1 hour"), InvalidOid, -1))
#define POLICY_COMPRESSION_PROC_NAME "policy_compression"
#define POLICY_RECOMPRESSION_PROC_NAME "policy_recompression"
#define CONFIG_KEY_HYPERTABLE_ID "hypertable_id"
#define CONFIG_KEY_COMPRESS_AFTER "compress_after"
#define CONFIG_KEY_RECOMPRESS_AFTER "recompress_after"
#define CONFIG_KEY_RECOMPRESS "recompress"
#define CONFIG_KEY_MAXCHUNKS_TO_COMPRESS "maxchunks_to_compress"
@ -168,9 +167,9 @@ validate_compress_after_type(Oid partitioning_type, Oid compress_after_type)
}
}
/* compression policies are added to hypertables or continuous aggregates */
Datum
policy_compression_add(PG_FUNCTION_ARGS)
policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
Oid compress_after_type, bool if_not_exists)
{
/* The function is not STRICT but we can't allow required args to be NULL
* so we need to act like a strict function in those cases */
@ -193,8 +192,6 @@ policy_compression_add(PG_FUNCTION_ARGS)
Oid owner_id;
bool is_cagg = false;
TS_PREVENT_FUNC_IF_READ_ONLY();
hcache = ts_hypertable_cache_pin();
hypertable = validate_compress_chunks_hypertable(hcache, user_rel_oid, &is_cagg);
@ -330,17 +327,29 @@ policy_compression_add(PG_FUNCTION_ARGS)
PG_RETURN_INT32(job_id);
}
/* remove compression policy from ht or cagg */
/* compression policies are added to hypertables or continuous aggregates */
Datum
policy_compression_remove(PG_FUNCTION_ARGS)
policy_compression_add(PG_FUNCTION_ARGS)
{
Oid user_rel_oid = PG_GETARG_OID(0);
bool if_exists = PG_GETARG_BOOL(1);
Hypertable *ht;
Cache *hcache;
Datum compress_after_datum = PG_GETARG_DATUM(1);
Oid compress_after_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1);
bool if_not_exists = PG_GETARG_BOOL(2);
TS_PREVENT_FUNC_IF_READ_ONLY();
return policy_compression_add_internal(user_rel_oid,
compress_after_datum,
compress_after_type,
if_not_exists);
}
bool
policy_compression_remove_internal(Oid user_rel_oid, bool if_exists)
{
Hypertable *ht;
Cache *hcache;
ht = ts_hypertable_cache_get_cache_and_entry(user_rel_oid, CACHE_FLAG_MISSING_OK, &hcache);
if (!ht)
{
@ -395,6 +404,18 @@ policy_compression_remove(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(true);
}
/* remove compression policy from ht or cagg */
Datum
policy_compression_remove(PG_FUNCTION_ARGS)
{
Oid user_rel_oid = PG_GETARG_OID(0);
bool if_exists = PG_GETARG_BOOL(1);
TS_PREVENT_FUNC_IF_READ_ONLY();
return policy_compression_remove_internal(user_rel_oid, if_exists);
}
/* compare cagg job config with compression job config. If there is an overlap, then
* throw an error. We do this since we cannot refresh compressed
* regions. We do not want cont. aggregate jobs to fail

View File

@ -25,4 +25,8 @@ int32 policy_compression_get_maxchunks_per_job(const Jsonb *config);
int64 policy_recompression_get_recompress_after_int(const Jsonb *config);
Interval *policy_recompression_get_recompress_after_interval(const Jsonb *config);
Datum policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
Oid compress_after_type, bool if_not_exists);
bool policy_compression_remove_internal(Oid user_rel_oid, bool if_exists);
#endif /* TIMESCALEDB_TSL_BGW_POLICY_COMPRESSION_API_H */

View File

@ -24,11 +24,7 @@
#include "time_utils.h"
#include "policy_utils.h"
#include "time_utils.h"
#define POLICY_REFRESH_CAGG_PROC_NAME "policy_refresh_continuous_aggregate"
#define CONFIG_KEY_MAT_HYPERTABLE_ID "mat_hypertable_id"
#define CONFIG_KEY_START_OFFSET "start_offset"
#define CONFIG_KEY_END_OFFSET "end_offset"
#include "bgw_policy/policies_v2.h"
/* Default max runtime for a continuous aggregate jobs is unlimited for now */
#define DEFAULT_MAX_RUNTIME \
@ -339,21 +335,6 @@ convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const cha
return converted;
}
typedef struct CaggPolicyOffset
{
Datum value;
Oid type;
bool isnull;
const char *name;
} CaggPolicyOffset;
typedef struct CaggPolicyConfig
{
Oid partition_type;
CaggPolicyOffset offset_start;
CaggPolicyOffset offset_end;
} CaggPolicyConfig;
/*
* Convert an interval to a 128 integer value.
*
@ -488,23 +469,22 @@ validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config)
}
static void
parse_offset_arg(const ContinuousAgg *cagg, const FunctionCallInfo fcinfo, CaggPolicyOffset *offset,
int argnum)
parse_offset_arg(const ContinuousAgg *cagg, Oid offset_type, NullableDatum arg,
CaggPolicyOffset *offset)
{
offset->isnull = PG_ARGISNULL(argnum);
offset->isnull = arg.isnull;
if (!offset->isnull)
{
Oid type = get_fn_expr_argtype(fcinfo->flinfo, argnum);
Datum arg = PG_GETARG_DATUM(argnum);
offset->value = convert_interval_arg(cagg->partition_type, arg, &type, offset->name);
offset->type = type;
offset->value =
convert_interval_arg(cagg->partition_type, arg.value, &offset_type, offset->name);
offset->type = offset_type;
}
}
static void
parse_cagg_policy_config(const ContinuousAgg *cagg, const FunctionCallInfo fcinfo,
parse_cagg_policy_config(const ContinuousAgg *cagg, Oid start_offset_type,
NullableDatum start_offset, Oid end_offset_type, NullableDatum end_offset,
CaggPolicyConfig *config)
{
MemSet(config, 0, sizeof(CaggPolicyConfig));
@ -518,29 +498,28 @@ parse_cagg_policy_config(const ContinuousAgg *cagg, const FunctionCallInfo fcinf
config->offset_start.name = CONFIG_KEY_START_OFFSET;
config->offset_end.name = CONFIG_KEY_END_OFFSET;
parse_offset_arg(cagg, fcinfo, &config->offset_start, 1);
parse_offset_arg(cagg, fcinfo, &config->offset_end, 2);
parse_offset_arg(cagg, start_offset_type, start_offset, &config->offset_start);
parse_offset_arg(cagg, end_offset_type, end_offset, &config->offset_end);
Assert(config->offset_start.type == config->offset_end.type);
validate_window_size(cagg, config);
}
Datum
policy_refresh_cagg_add(PG_FUNCTION_ARGS)
policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDatum start_offset,
Oid end_offset_type, NullableDatum end_offset,
Interval refresh_interval, bool if_not_exists)
{
NameData application_name;
NameData proc_name, proc_schema, owner;
ContinuousAgg *cagg;
CaggPolicyConfig policyconf;
int32 job_id;
Interval refresh_interval;
Oid cagg_oid, owner_id;
Oid owner_id;
List *jobs;
JsonbParseState *parse_state = NULL;
bool if_not_exists;
/* Verify that the owner can create a background worker */
cagg_oid = PG_GETARG_OID(0);
owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId());
ts_bgw_job_validate_job_owner(owner_id);
@ -550,15 +529,12 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid))));
parse_cagg_policy_config(cagg, fcinfo, &policyconf);
if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL schedule interval")));
refresh_interval = *PG_GETARG_INTERVAL_P(3);
if_not_exists = PG_GETARG_BOOL(4);
parse_cagg_policy_config(cagg,
start_offset_type,
start_offset,
end_offset_type,
end_offset,
&policyconf);
/* Make sure there is only 1 refresh policy on the cagg */
jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME,
@ -650,10 +626,41 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
}
Datum
policy_refresh_cagg_remove(PG_FUNCTION_ARGS)
policy_refresh_cagg_add(PG_FUNCTION_ARGS)
{
Oid cagg_oid, start_offset_type, end_offset_type;
Interval refresh_interval;
bool if_not_exists;
NullableDatum start_offset, end_offset;
cagg_oid = PG_GETARG_OID(0);
if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL refresh_schedule_interval")));
start_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
start_offset.value = PG_GETARG_DATUM(1);
start_offset.isnull = PG_ARGISNULL(1);
end_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 2);
end_offset.value = PG_GETARG_DATUM(2);
end_offset.isnull = PG_ARGISNULL(2);
refresh_interval = *PG_GETARG_INTERVAL_P(3);
if_not_exists = PG_GETARG_BOOL(4);
return policy_refresh_cagg_add_internal(cagg_oid,
start_offset_type,
start_offset,
end_offset_type,
end_offset,
refresh_interval,
if_not_exists);
}
Datum
policy_refresh_cagg_remove_internal(Oid cagg_oid, bool if_exists)
{
Oid cagg_oid = PG_GETARG_OID(0);
bool if_exists = PG_GETARG_BOOL(1);
int32 mat_htid;
ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(cagg_oid);
@ -680,13 +687,22 @@ policy_refresh_cagg_remove(PG_FUNCTION_ARGS)
ereport(NOTICE,
(errmsg("continuous aggregate policy not found for \"%s\", skipping",
get_rel_name(cagg_oid))));
PG_RETURN_VOID();
PG_RETURN_BOOL(false);
}
}
Assert(list_length(jobs) == 1);
BgwJob *job = linitial(jobs);
ts_bgw_job_delete_by_id(job->fd.id);
PG_RETURN_BOOL(true);
}
Datum
policy_refresh_cagg_remove(PG_FUNCTION_ARGS)
{
Oid cagg_oid = PG_GETARG_OID(0);
bool if_exists = PG_GETARG_BOOL(1);
(void) policy_refresh_cagg_remove_internal(cagg_oid, if_exists);
PG_RETURN_VOID();
}

View File

@ -23,4 +23,10 @@ bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type
Datum cmp_interval);
bool policy_refresh_cagg_exists(int32 materialization_id);
Datum policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type,
NullableDatum start_offset, Oid end_offset_type,
NullableDatum end_offset, Interval refresh_interval,
bool if_not_exists);
Datum policy_refresh_cagg_remove_internal(Oid cagg_oid, bool if_exists);
#endif /* TIMESCALEDB_TSL_BGW_POLICY_CAGG_API_H */

View File

@ -0,0 +1,363 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <access/xact.h>
#include <miscadmin.h>
#include <utils/builtins.h>
#include "compression_api.h"
#include "errors.h"
#include "hypertable.h"
#include "hypertable_cache.h"
#include "policy_utils.h"
#include "utils.h"
#include "jsonb_utils.h"
#include "bgw/job.h"
#include "bgw_policy/job.h"
#include "bgw_policy/continuous_aggregate_api.h"
#include "bgw_policy/policies_v2.h"
#include "funcapi.h"
#include "compat/compat.h"
Datum
policies_add(PG_FUNCTION_ARGS)
{
Oid rel_oid;
bool if_exists;
int refresh_job_id = 0, compression_job_id = 0, retention_job_id = 0;
rel_oid = PG_GETARG_OID(0);
if_exists = PG_GETARG_BOOL(1);
if (!PG_ARGISNULL(2) || !PG_ARGISNULL(3) || !PG_ARGISNULL(4))
{
NullableDatum start_offset, end_offset;
Interval refresh_interval = { 0, 0, 0 };
Oid start_offset_type, end_offset_type;
if (PG_ARGISNULL(4))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL refresh_schedule_interval")));
start_offset.value = PG_GETARG_DATUM(2);
start_offset.isnull = PG_ARGISNULL(2);
start_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 2);
end_offset.value = PG_GETARG_DATUM(3);
end_offset.isnull = PG_ARGISNULL(3);
end_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 3);
refresh_interval = *PG_GETARG_INTERVAL_P(4);
refresh_job_id = policy_refresh_cagg_add_internal(rel_oid,
start_offset_type,
start_offset,
end_offset_type,
end_offset,
refresh_interval,
if_exists);
}
if (!PG_ARGISNULL(5))
{
Datum compress_after_datum = PG_GETARG_DATUM(5);
Oid compress_after_type = get_fn_expr_argtype(fcinfo->flinfo, 5);
compression_job_id = policy_compression_add_internal(rel_oid,
compress_after_datum,
compress_after_type,
if_exists);
}
if (!PG_ARGISNULL(6))
{
Datum drop_after_datum = PG_GETARG_DATUM(6);
Oid drop_after_type = get_fn_expr_argtype(fcinfo->flinfo, 6);
retention_job_id =
policy_retention_add_internal(rel_oid, drop_after_type, drop_after_datum, if_exists);
}
PG_RETURN_BOOL(refresh_job_id || compression_job_id || retention_job_id);
}
Datum
policies_remove(PG_FUNCTION_ARGS)
{
Oid cagg_oid = PG_GETARG_OID(0);
ArrayType *policy_array = PG_ARGISNULL(2) ? NULL : PG_GETARG_ARRAYTYPE_P(2);
bool if_exists = PG_GETARG_BOOL(1);
Datum *policy;
int npolicies;
int i;
bool success = false;
if (policy_array == NULL)
PG_RETURN_BOOL(false);
deconstruct_array(policy_array, TEXTOID, -1, false, TYPALIGN_INT, &policy, NULL, &npolicies);
for (i = 0; i < npolicies; i++)
{
char *curr_policy = VARDATA(policy[i]);
if (pg_strcasecmp(curr_policy, POLICY_REFRESH_CAGG_PROC_NAME) == 0)
success = policy_refresh_cagg_remove_internal(cagg_oid, if_exists);
else if (pg_strcasecmp(curr_policy, POLICY_COMPRESSION_PROC_NAME) == 0)
success = policy_compression_remove_internal(cagg_oid, if_exists);
else if (pg_strncasecmp(curr_policy,
POLICY_RETENTION_PROC_NAME,
strlen(POLICY_RETENTION_PROC_NAME)) == 0)
success = policy_retention_remove_internal(cagg_oid, if_exists);
else
ereport(NOTICE, (errmsg("No relevant policy found")));
}
PG_RETURN_BOOL(success);
}
Datum
policies_alter(PG_FUNCTION_ARGS)
{
Oid rel_oid = PG_GETARG_OID(0);
ContinuousAgg *cagg;
List *jobs;
bool if_exists = false, found;
int refresh_job_id = 0, compression_job_id = 0, retention_job_id = 0;
cagg = ts_continuous_agg_find_by_relid(rel_oid);
if (!cagg)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(rel_oid))));
if (!PG_ARGISNULL(2) || !PG_ARGISNULL(3) || !PG_ARGISNULL(4))
{
Interval refresh_interval;
NullableDatum start_offset, end_offset;
Oid start_offset_type, end_offset_type;
jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME,
INTERNAL_SCHEMA_NAME,
cagg->data.mat_hypertable_id);
BgwJob *job = linitial(jobs);
refresh_interval = PG_ARGISNULL(4) ? job->fd.schedule_interval : *PG_GETARG_INTERVAL_P(4);
policy_refresh_cagg_remove_internal(rel_oid, if_exists);
if (PG_ARGISNULL(2))
{
if (IS_INTEGER_TYPE(cagg->partition_type))
{
int64 value =
ts_jsonb_get_int64_field(job->fd.config, CONFIG_KEY_START_OFFSET, &found);
start_offset.isnull = !found;
start_offset_type = cagg->partition_type;
switch (start_offset_type)
{
case INT2OID:
start_offset.value = Int16GetDatum((int16) value);
break;
case INT4OID:
start_offset.value = Int32GetDatum((int32) value);
break;
case INT8OID:
start_offset.value = Int64GetDatum(value);
break;
default:
Assert(0);
}
}
else
{
start_offset.value = IntervalPGetDatum(
ts_jsonb_get_interval_field(job->fd.config, CONFIG_KEY_START_OFFSET));
start_offset.isnull = (DatumGetIntervalP(start_offset.value) == NULL);
start_offset_type = INTERVALOID;
}
}
else
{
start_offset.value = PG_GETARG_DATUM(2);
start_offset.isnull = false;
start_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 2);
}
if (PG_ARGISNULL(3))
{
if (IS_INTEGER_TYPE(cagg->partition_type))
{
int64 value =
ts_jsonb_get_int64_field(job->fd.config, CONFIG_KEY_END_OFFSET, &found);
end_offset.isnull = !found;
end_offset_type = cagg->partition_type;
switch (end_offset_type)
{
case INT2OID:
end_offset.value = Int16GetDatum((int16) value);
break;
case INT4OID:
end_offset.value = Int32GetDatum((int32) value);
break;
case INT8OID:
end_offset.value = Int64GetDatum(value);
break;
default:
Assert(0);
}
}
else
{
end_offset.value = IntervalPGetDatum(
ts_jsonb_get_interval_field(job->fd.config, CONFIG_KEY_END_OFFSET));
end_offset.isnull = (DatumGetIntervalP(end_offset.value) == NULL);
end_offset_type = INTERVALOID;
}
}
else
{
end_offset.value = PG_GETARG_DATUM(3);
end_offset.isnull = false;
end_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 3);
}
refresh_job_id = policy_refresh_cagg_add_internal(rel_oid,
start_offset_type,
start_offset,
end_offset_type,
end_offset,
refresh_interval,
if_exists);
}
if (!PG_ARGISNULL(5))
{
Datum compress_after_datum = PG_GETARG_DATUM(5);
Oid compress_after_type = get_fn_expr_argtype(fcinfo->flinfo, 5);
policy_compression_remove_internal(rel_oid, if_exists);
compression_job_id = policy_compression_add_internal(rel_oid,
compress_after_datum,
compress_after_type,
if_exists);
}
if (!PG_ARGISNULL(6))
{
Datum drop_after_datum = PG_GETARG_DATUM(6);
Oid drop_after_type = get_fn_expr_argtype(fcinfo->flinfo, 6);
policy_retention_remove_internal(rel_oid, if_exists);
retention_job_id =
policy_retention_add_internal(rel_oid, drop_after_type, drop_after_datum, if_exists);
}
PG_RETURN_BOOL(refresh_job_id || compression_job_id || retention_job_id);
}
static void
push_to_json(Oid type, JsonbParseState *parse_state, BgwJob *job, char *json_label,
char *show_config)
{
if (IS_INTEGER_TYPE(type))
{
bool found;
int64 value = ts_jsonb_get_int64_field(job->fd.config, json_label, &found);
if (!found)
ts_jsonb_add_null(parse_state, show_config);
else
ts_jsonb_add_int64(parse_state, show_config, value);
}
else
{
Interval *value = ts_jsonb_get_interval_field(job->fd.config, json_label);
if (value == NULL)
ts_jsonb_add_null(parse_state, show_config);
else
ts_jsonb_add_interval(parse_state, show_config, value);
}
}
Datum
policies_show(PG_FUNCTION_ARGS)
{
Oid rel_oid = PG_GETARG_OID(0);
Oid type;
ContinuousAgg *cagg;
ListCell *lc;
FuncCallContext *funcctx;
static List *jobs;
JsonbParseState *parse_state = NULL;
cagg = ts_continuous_agg_find_by_relid(rel_oid);
if (!cagg)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(rel_oid))));
type = IS_TIMESTAMP_TYPE(cagg->partition_type) ? INTERVALOID : cagg->partition_type;
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Use top-level memory context to preserve the global static list */
jobs = ts_bgw_job_find_by_hypertable_id(cagg->data.mat_hypertable_id);
funcctx->user_fctx = list_head(jobs);
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
lc = (ListCell *) funcctx->user_fctx;
if (lc == NULL)
SRF_RETURN_DONE(funcctx);
else
{
BgwJob *job = lfirst(lc);
if (!namestrcmp(&(job->fd.proc_name), POLICY_REFRESH_CAGG_PROC_NAME))
{
ts_jsonb_add_str(parse_state,
SHOW_POLICY_KEY_POLICY_NAME,
POLICY_REFRESH_CAGG_PROC_NAME);
push_to_json(type,
parse_state,
job,
CONFIG_KEY_START_OFFSET,
SHOW_POLICY_KEY_REFRESH_START_OFFSET);
push_to_json(type,
parse_state,
job,
CONFIG_KEY_END_OFFSET,
SHOW_POLICY_KEY_REFRESH_END_OFFSET);
ts_jsonb_add_interval(parse_state,
SHOW_POLICY_KEY_REFRESH_INTERVAL,
&(job->fd.schedule_interval));
}
else if (!namestrcmp(&(job->fd.proc_name), POLICY_COMPRESSION_PROC_NAME))
{
ts_jsonb_add_str(parse_state,
SHOW_POLICY_KEY_POLICY_NAME,
POLICY_COMPRESSION_PROC_NAME);
push_to_json(type,
parse_state,
job,
CONFIG_KEY_COMPRESS_AFTER,
SHOW_POLICY_KEY_COMPRESS_AFTER);
}
else if (!namestrcmp(&(job->fd.proc_name), POLICY_RETENTION_PROC_NAME))
{
ts_jsonb_add_str(parse_state, SHOW_POLICY_KEY_POLICY_NAME, POLICY_RETENTION_PROC_NAME);
push_to_json(type, parse_state, job, CONFIG_KEY_DROP_AFTER, SHOW_POLICY_KEY_DROP_AFTER);
}
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
funcctx->user_fctx = lnext_compat(jobs, (ListCell *) funcctx->user_fctx);
SRF_RETURN_NEXT(funcctx, PointerGetDatum(JsonbValueToJsonb(result)));
}
PG_RETURN_NULL();
}

View File

@ -0,0 +1,42 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <utils/jsonb.h>
#include "dimension.h"
#include <continuous_aggs/materialize.h>
#include <bgw_policy/compression_api.h>
#include <bgw_policy/continuous_aggregate_api.h>
#include <bgw_policy/retention_api.h>
#define POLICY_REFRESH_CAGG_PROC_NAME "policy_refresh_continuous_aggregate"
#define POLICY_COMPRESSION_PROC_NAME "policy_compression"
#define POLICY_RETENTION_PROC_NAME "policy_retention"
#define CONFIG_KEY_MAT_HYPERTABLE_ID "mat_hypertable_id"
#define CONFIG_KEY_START_OFFSET "start_offset"
#define CONFIG_KEY_END_OFFSET "end_offset"
#define CONFIG_KEY_COMPRESS_AFTER "compress_after"
#define CONFIG_KEY_DROP_AFTER "drop_after"
#define SHOW_POLICY_KEY_HYPERTABLE_ID "hypertable_id"
#define SHOW_POLICY_KEY_POLICY_NAME "policy_name"
#define SHOW_POLICY_KEY_REFRESH_INTERVAL "refresh_interval"
#define SHOW_POLICY_KEY_REFRESH_START_OFFSET "refresh_start_offset"
#define SHOW_POLICY_KEY_REFRESH_END_OFFSET "refresh_end_offset"
#define SHOW_POLICY_KEY_COMPRESS_AFTER CONFIG_KEY_COMPRESS_AFTER
#define SHOW_POLICY_KEY_DROP_AFTER CONFIG_KEY_DROP_AFTER
extern Datum policies_add(PG_FUNCTION_ARGS);
extern Datum policies_remove(PG_FUNCTION_ARGS);
extern Datum policies_alter(PG_FUNCTION_ARGS);
extern Datum policies_show(PG_FUNCTION_ARGS);
typedef struct CaggPolicyConfig
{
Oid partition_type;
CaggPolicyOffset offset_start;
CaggPolicyOffset offset_end;
} CaggPolicyConfig;

View File

@ -25,8 +25,8 @@
#include "utils.h"
#include "jsonb_utils.h"
#include "bgw_policy/job.h"
#include "bgw_policy/policies_v2.h"
#define POLICY_RETENTION_PROC_NAME "policy_retention"
#define CONFIG_KEY_HYPERTABLE_ID "hypertable_id"
#define CONFIG_KEY_DROP_AFTER "drop_after"
#define DEFAULT_SCHEDULE_INTERVAL \
@ -137,7 +137,7 @@ validate_drop_chunks_hypertable(Cache *hcache, Oid user_htoid)
}
Datum
policy_retention_add(PG_FUNCTION_ARGS)
policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum, bool if_not_exists)
{
/* behave like a strict function */
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
@ -165,8 +165,6 @@ policy_retention_add(PG_FUNCTION_ARGS)
/* Right now, there is an infinite number of retries for drop_chunks jobs */
int default_max_retries = -1;
TS_PREVENT_FUNC_IF_READ_ONLY();
/* Verify that the hypertable owner can create a background worker */
ts_bgw_job_validate_job_owner(owner_id);
@ -288,15 +286,24 @@ policy_retention_add(PG_FUNCTION_ARGS)
}
Datum
policy_retention_remove(PG_FUNCTION_ARGS)
policy_retention_add(PG_FUNCTION_ARGS)
{
Oid table_oid = PG_GETARG_OID(0);
bool if_exists = PG_GETARG_BOOL(1);
Cache *hcache;
Hypertable *hypertable;
Oid ht_oid = PG_GETARG_OID(0);
Datum window_datum = PG_GETARG_DATUM(1);
bool if_not_exists = PG_GETARG_BOOL(2);
Oid window_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1);
TS_PREVENT_FUNC_IF_READ_ONLY();
return policy_retention_add_internal(ht_oid, window_type, window_datum, if_not_exists);
}
Datum
policy_retention_remove_internal(Oid table_oid, bool if_exists)
{
Cache *hcache;
Hypertable *hypertable;
hypertable = ts_hypertable_cache_get_cache_and_entry(table_oid, CACHE_FLAG_MISSING_OK, &hcache);
if (!hypertable)
{
@ -339,7 +346,7 @@ policy_retention_remove(PG_FUNCTION_ARGS)
ereport(NOTICE,
(errmsg("retention policy not found for hypertable \"%s\", skipping",
get_rel_name(table_oid))));
PG_RETURN_NULL();
PG_RETURN_BOOL(false);
}
}
Assert(list_length(jobs) == 1);
@ -347,5 +354,16 @@ policy_retention_remove(PG_FUNCTION_ARGS)
ts_bgw_job_delete_by_id(job->fd.id);
PG_RETURN_NULL();
PG_RETURN_BOOL(true);
}
Datum
policy_retention_remove(PG_FUNCTION_ARGS)
{
Oid table_oid = PG_GETARG_OID(0);
bool if_exists = PG_GETARG_BOOL(1);
TS_PREVENT_FUNC_IF_READ_ONLY();
return policy_retention_remove_internal(table_oid, if_exists);
}

View File

@ -18,4 +18,7 @@ int32 policy_retention_get_hypertable_id(const Jsonb *config);
int64 policy_retention_get_drop_after_int(const Jsonb *config);
Interval *policy_retention_get_drop_after_interval(const Jsonb *config);
Datum policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
bool if_not_exists);
Datum policy_retention_remove_internal(Oid table_oid, bool if_exists);
#endif /* TIMESCALEDB_TSL_BGW_POLICY_RETENTION_API_H */

View File

@ -13,6 +13,7 @@
#include "bgw_policy/job.h"
#include "bgw_policy/job_api.h"
#include "bgw_policy/reorder_api.h"
#include "bgw_policy/policies_v2.h"
#include "chunk.h"
#include "chunk_api.h"
#include "compression/api.h"
@ -130,6 +131,11 @@ CrossModuleFunctions tsl_cm_functions = {
.copy_chunk_cleanup_proc = tsl_copy_chunk_cleanup_proc,
.subscription_exec = tsl_subscription_exec,
.policies_add = policies_add,
.policies_remove = policies_remove,
.policies_alter = policies_alter,
.policies_show = policies_show,
/* Continuous Aggregates */
.partialize_agg = tsl_partialize_agg,
.finalize_agg_sfunc = tsl_finalize_agg_sfunc,

View File

@ -46,6 +46,87 @@ SELECT count(*) FROM _timescaledb_config.bgw_job;
\set ON_ERROR_STOP 0
\set VERBOSITY default
-- Test 1 step policy for integer type buckets
ALTER materialized view mat_m1 set (timescaledb.compress = true);
-- No policy is added if one errors out
SELECT add_policies('mat_m1', refresh_start_offset => 1, refresh_end_offset => 10, refresh_schedule_interval =>'1 h'::interval, compress_after => 11, drop_after => 20);
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "integer".
SELECT show_policies('mat_m1');
show_policies
---------------
(0 rows)
-- All policies are added in one step
SELECT add_policies('mat_m1', refresh_start_offset => 10, refresh_end_offset => 1, refresh_schedule_interval =>'1 h'::interval, compress_after => 11, drop_after => 20);
add_policies
--------------
t
(1 row)
SELECT show_policies('mat_m1');
show_policies
---------------------------------------------------------------------------------------------------------------------------------------------
{"policy_name": "policy_compression", "compress_after": 11}
{"policy_name": "policy_refresh_continuous_aggregate", "refresh_interval": "@ 1 hour", "refresh_end_offset": 1, "refresh_start_offset": 10}
{"drop_after": 20, "policy_name": "policy_retention"}
(3 rows)
-- Alter policies
SELECT alter_policies('mat_m1', refresh_schedule_interval =>'2 h'::interval, compress_after=>11, drop_after => 15);
alter_policies
----------------
t
(1 row)
SELECT show_policies('mat_m1');
show_policies
----------------------------------------------------------------------------------------------------------------------------------------------
{"policy_name": "policy_compression", "compress_after": 11}
{"policy_name": "policy_refresh_continuous_aggregate", "refresh_interval": "@ 2 hours", "refresh_end_offset": 1, "refresh_start_offset": 10}
{"drop_after": 15, "policy_name": "policy_retention"}
(3 rows)
-- Remove one or more policy
SELECT remove_policies('mat_m1', false, 'policy_refresh_continuous_aggregate', 'policy_compression');
remove_policies
-----------------
t
(1 row)
SELECT show_policies('mat_m1');
show_policies
-------------------------------------------------------
{"drop_after": 15, "policy_name": "policy_retention"}
(1 row)
-- Add one policy
SELECT add_policies('mat_m1', refresh_start_offset => 10, refresh_end_offset => 1, refresh_schedule_interval =>'1 h'::interval);
add_policies
--------------
t
(1 row)
SELECT show_policies('mat_m1');
show_policies
---------------------------------------------------------------------------------------------------------------------------------------------
{"policy_name": "policy_refresh_continuous_aggregate", "refresh_interval": "@ 1 hour", "refresh_end_offset": 1, "refresh_start_offset": 10}
{"drop_after": 15, "policy_name": "policy_retention"}
(2 rows)
-- Remove all policies
SELECT remove_policies('mat_m1', false, 'policy_refresh_continuous_aggregate', 'policy_retention');
remove_policies
-----------------
t
(1 row)
SELECT show_policies('mat_m1');
show_policies
---------------
(0 rows)
ALTER materialized view mat_m1 set (timescaledb.compress = false);
SELECT add_continuous_aggregate_policy('int_tab', '1 day'::interval, 10 , '1 h'::interval);
ERROR: "int_tab" is not a continuous aggregate
SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 , '1 h'::interval);
@ -71,7 +152,7 @@ SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_
--adding again should warn/error
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>false);
ERROR: continuous aggregate policy already exists for "mat_m1"
DETAIL: Only one continuous aggregate policy can be created per continuous aggregate and a policy with job id 1000 already exists for "mat_m1".
DETAIL: Only one continuous aggregate policy can be created per continuous aggregate and a policy with job id 1007 already exists for "mat_m1".
SELECT add_continuous_aggregate_policy('mat_m1', 20, 15, '1h'::interval, if_not_exists=>true);
WARNING: continuous aggregate policy already exists for "mat_m1"
DETAIL: A policy already exists with different arguments.
@ -146,7 +227,7 @@ SELECT create_hypertable('continuous_agg_max_mat_date', 'time');
NOTICE: adding not-null constraint to column "time"
create_hypertable
------------------------------------------
(3,public,continuous_agg_max_mat_date,t)
(4,public,continuous_agg_max_mat_date,t)
(1 row)
CREATE MATERIALIZED VIEW max_mat_view_date
@ -156,6 +237,114 @@ CREATE MATERIALIZED VIEW max_mat_view_date
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
\set VERBOSITY default
-- Test 1 step policy for timestamp type buckets
ALTER materialized view max_mat_view_date set (timescaledb.compress = true);
-- Only works for cagg
SELECT add_policies('continuous_agg_max_mat_date', refresh_start_offset => '1 day'::interval, refresh_end_offset => '2 day'::interval, refresh_schedule_interval =>'1 h'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval);
ERROR: "continuous_agg_max_mat_date" is not a continuous aggregate
SELECT show_policies('continuous_agg_max_mat_date');
ERROR: "continuous_agg_max_mat_date" is not a continuous aggregate
SELECT alter_policies('continuous_agg_max_mat_date', compress_after=>'16 days'::interval);
ERROR: "continuous_agg_max_mat_date" is not a continuous aggregate
SELECT remove_policies('continuous_agg_max_mat_date', false, 'policy_refresh_continuous_aggregate');
ERROR: "continuous_agg_max_mat_date" is not a continuous aggregate
-- No policy is added if one errors out
SELECT add_policies('max_mat_view_date', refresh_start_offset => '1 day'::interval, refresh_end_offset => '2 day'::interval, refresh_schedule_interval =>'1 h'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval);
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "date".
SELECT show_policies('max_mat_view_date');
show_policies
---------------
(0 rows)
-- Refresh schedule interval cannot be NULL
SELECT add_policies('max_mat_view_date', refresh_start_offset => '1 day'::interval, refresh_end_offset => '2 day'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval);
ERROR: cannot use NULL refresh_schedule_interval
-- All policies are added in one step
SELECT add_policies('max_mat_view_date', refresh_start_offset => '15 days'::interval, refresh_end_offset => '1 day'::interval, refresh_schedule_interval =>'1 h'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval);
add_policies
--------------
t
(1 row)
SELECT show_policies('max_mat_view_date');
show_policies
--------------------------------------------------------------------------------------------------------------------------------------------------------------
{"policy_name": "policy_compression", "compress_after": "@ 20 days"}
{"policy_name": "policy_refresh_continuous_aggregate", "refresh_interval": "@ 1 hour", "refresh_end_offset": "@ 1 day", "refresh_start_offset": "@ 15 days"}
{"drop_after": "@ 25 days", "policy_name": "policy_retention"}
(3 rows)
-- Alter policies
SELECT alter_policies('max_mat_view_date', refresh_schedule_interval =>'2 h', compress_after=>'16 days'::interval, drop_after => '20 days'::interval);
alter_policies
----------------
t
(1 row)
SELECT show_policies('max_mat_view_date');
show_policies
---------------------------------------------------------------------------------------------------------------------------------------------------------------
{"policy_name": "policy_compression", "compress_after": "@ 16 days"}
{"policy_name": "policy_refresh_continuous_aggregate", "refresh_interval": "@ 2 hours", "refresh_end_offset": "@ 1 day", "refresh_start_offset": "@ 15 days"}
{"drop_after": "@ 20 days", "policy_name": "policy_retention"}
(3 rows)
-- Remove one or more policy
-- Code coverage: no policy names provided
SELECT remove_policies('max_mat_view_date', false);
remove_policies
-----------------
f
(1 row)
-- Code coverage: incorrect name of policy
SELECT remove_policies('max_mat_view_date', false, 'refresh_policy');
NOTICE: No relevant policy found
remove_policies
-----------------
f
(1 row)
SELECT remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate', 'policy_compression');
remove_policies
-----------------
t
(1 row)
SELECT show_policies('max_mat_view_date');
show_policies
----------------------------------------------------------------
{"drop_after": "@ 20 days", "policy_name": "policy_retention"}
(1 row)
-- Add one policy
SELECT add_policies('max_mat_view_date', refresh_start_offset => '15 day'::interval, refresh_end_offset => '1 day'::interval, refresh_schedule_interval =>'1 h'::interval);
add_policies
--------------
t
(1 row)
SELECT show_policies('max_mat_view_date');
show_policies
--------------------------------------------------------------------------------------------------------------------------------------------------------------
{"policy_name": "policy_refresh_continuous_aggregate", "refresh_interval": "@ 1 hour", "refresh_end_offset": "@ 1 day", "refresh_start_offset": "@ 15 days"}
{"drop_after": "@ 20 days", "policy_name": "policy_retention"}
(2 rows)
-- Remove all policies
SELECT remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate', 'policy_retention');
remove_policies
-----------------
t
(1 row)
SELECT show_policies('max_mat_view_date');
show_policies
---------------
(0 rows)
ALTER materialized view max_mat_view_date set (timescaledb.compress = false);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
ERROR: invalid parameter value for end_offset
HINT: Use time interval with a continuous aggregate using timestamp-based time bucket.
@ -179,7 +368,7 @@ DETAIL: The start and end offsets must cover at least two buckets in the valid
SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-1 day', '1 day'::interval);
add_continuous_aggregate_policy
---------------------------------
1001
1015
(1 row)
SELECT remove_continuous_aggregate_policy('max_mat_view_date');
@ -192,7 +381,7 @@ SELECT remove_continuous_aggregate_policy('max_mat_view_date');
SELECT add_continuous_aggregate_policy('max_mat_view_date', NULL, NULL, '1 day'::interval);
add_continuous_aggregate_policy
---------------------------------
1002
1016
(1 row)
SELECT remove_continuous_aggregate_policy('max_mat_view_date');
@ -206,7 +395,7 @@ SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
config
--------------------------------------------------------------------------------
{"end_offset": "@ 1 day", "start_offset": "@ 15 days", "mat_hypertable_id": 4}
{"end_offset": "@ 1 day", "start_offset": "@ 15 days", "mat_hypertable_id": 5}
(1 row)
INSERT INTO continuous_agg_max_mat_date
@ -221,7 +410,7 @@ SELECT create_hypertable('continuous_agg_timestamp', 'time');
NOTICE: adding not-null constraint to column "time"
create_hypertable
---------------------------------------
(5,public,continuous_agg_timestamp,t)
(7,public,continuous_agg_timestamp,t)
(1 row)
CREATE MATERIALIZED VIEW max_mat_view_timestamp
@ -234,7 +423,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval);
add_continuous_aggregate_policy
---------------------------------
1004
1018
(1 row)
SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp');
@ -264,7 +453,7 @@ SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
config
---------------------------------------------------------------------------------
{"end_offset": "@ 1 hour", "start_offset": "@ 15 days", "mat_hypertable_id": 6}
{"end_offset": "@ 1 hour", "start_offset": "@ 15 days", "mat_hypertable_id": 8}
(1 row)
\c :TEST_DBNAME :ROLE_SUPERUSER
@ -517,13 +706,13 @@ ERROR: compress_after value for compression policy should be greater than the s
SELECT add_compression_policy('mat_smallint', 5::smallint);
add_compression_policy
------------------------
1012
1026
(1 row)
SELECT add_compression_policy('mat_bigint', 20::bigint);
add_compression_policy
------------------------
1013
1027
(1 row)
-- end of coverage tests
@ -540,7 +729,7 @@ CREATE TABLE metrics (
SELECT create_hypertable('metrics', 'time');
create_hypertable
-----------------------
(13,public,metrics,t)
(15,public,metrics,t)
(1 row)
INSERT INTO metrics (time, device_id, device_id_peer, v0, v1, v2, v3)
@ -557,7 +746,7 @@ ALTER TABLE metrics SET ( timescaledb.compress );
SELECT compress_chunk(ch) FROM show_chunks('metrics') ch;
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_13_19_chunk
_timescaledb_internal._hyper_15_19_chunk
(1 row)
CREATE MATERIALIZED VIEW metrics_cagg WITH (timescaledb.continuous,
@ -581,7 +770,7 @@ ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress);
SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
COMP_JOB
----------
1015
1029
(1 row)
SELECT remove_compression_policy('metrics_cagg');

View File

@ -134,9 +134,11 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
add_data_node(name,text,name,integer,boolean,boolean,text)
add_dimension(regclass,name,integer,anyelement,regproc,boolean)
add_job(regproc,interval,jsonb,timestamp with time zone,boolean)
add_policies(regclass,boolean,"any","any",interval,"any","any")
add_reorder_policy(regclass,name,boolean)
add_retention_policy(regclass,"any",boolean,interval)
alter_job(integer,interval,interval,integer,interval,boolean,jsonb,timestamp with time zone,boolean)
alter_policies(regclass,boolean,"any","any",interval,"any","any")
approximate_row_count(regclass)
attach_data_node(name,regclass,boolean,boolean)
attach_tablespace(name,regclass,boolean)
@ -172,6 +174,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
refresh_continuous_aggregate(regclass,"any","any")
remove_compression_policy(regclass,boolean)
remove_continuous_aggregate_policy(regclass,boolean)
remove_policies(regclass,boolean,text[])
remove_reorder_policy(regclass,boolean)
remove_retention_policy(regclass,boolean)
reorder_chunk(regclass,regclass,boolean)
@ -182,6 +185,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
set_number_partitions(regclass,integer,name)
set_replication_factor(regclass,integer)
show_chunks(regclass,"any","any")
show_policies(regclass,boolean)
show_tablespaces(regclass)
time_bucket(bigint,bigint)
time_bucket(bigint,bigint,bigint)

View File

@ -41,6 +41,35 @@ SELECT count(*) FROM _timescaledb_config.bgw_job;
\set ON_ERROR_STOP 0
\set VERBOSITY default
-- Test 1 step policy for integer type buckets
ALTER materialized view mat_m1 set (timescaledb.compress = true);
-- No policy is added if one errors out
SELECT add_policies('mat_m1', refresh_start_offset => 1, refresh_end_offset => 10, refresh_schedule_interval =>'1 h'::interval, compress_after => 11, drop_after => 20);
SELECT show_policies('mat_m1');
-- All policies are added in one step
SELECT add_policies('mat_m1', refresh_start_offset => 10, refresh_end_offset => 1, refresh_schedule_interval =>'1 h'::interval, compress_after => 11, drop_after => 20);
SELECT show_policies('mat_m1');
-- Alter policies
SELECT alter_policies('mat_m1', refresh_schedule_interval =>'2 h'::interval, compress_after=>11, drop_after => 15);
SELECT show_policies('mat_m1');
-- Remove one or more policy
SELECT remove_policies('mat_m1', false, 'policy_refresh_continuous_aggregate', 'policy_compression');
SELECT show_policies('mat_m1');
-- Add one policy
SELECT add_policies('mat_m1', refresh_start_offset => 10, refresh_end_offset => 1, refresh_schedule_interval =>'1 h'::interval);
SELECT show_policies('mat_m1');
-- Remove all policies
SELECT remove_policies('mat_m1', false, 'policy_refresh_continuous_aggregate', 'policy_retention');
SELECT show_policies('mat_m1');
ALTER materialized view mat_m1 set (timescaledb.compress = false);
SELECT add_continuous_aggregate_policy('int_tab', '1 day'::interval, 10 , '1 h'::interval);
SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 , '1 h'::interval);
SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 );
@ -100,6 +129,50 @@ CREATE MATERIALIZED VIEW max_mat_view_date
\set ON_ERROR_STOP 0
\set VERBOSITY default
-- Test 1 step policy for timestamp type buckets
ALTER materialized view max_mat_view_date set (timescaledb.compress = true);
-- Only works for cagg
SELECT add_policies('continuous_agg_max_mat_date', refresh_start_offset => '1 day'::interval, refresh_end_offset => '2 day'::interval, refresh_schedule_interval =>'1 h'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval);
SELECT show_policies('continuous_agg_max_mat_date');
SELECT alter_policies('continuous_agg_max_mat_date', compress_after=>'16 days'::interval);
SELECT remove_policies('continuous_agg_max_mat_date', false, 'policy_refresh_continuous_aggregate');
-- No policy is added if one errors out
SELECT add_policies('max_mat_view_date', refresh_start_offset => '1 day'::interval, refresh_end_offset => '2 day'::interval, refresh_schedule_interval =>'1 h'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval);
SELECT show_policies('max_mat_view_date');
-- Refresh schedule interval cannot be NULL
SELECT add_policies('max_mat_view_date', refresh_start_offset => '1 day'::interval, refresh_end_offset => '2 day'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval);
-- All policies are added in one step
SELECT add_policies('max_mat_view_date', refresh_start_offset => '15 days'::interval, refresh_end_offset => '1 day'::interval, refresh_schedule_interval =>'1 h'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval);
SELECT show_policies('max_mat_view_date');
-- Alter policies
SELECT alter_policies('max_mat_view_date', refresh_schedule_interval =>'2 h', compress_after=>'16 days'::interval, drop_after => '20 days'::interval);
SELECT show_policies('max_mat_view_date');
-- Remove one or more policy
-- Code coverage: no policy names provided
SELECT remove_policies('max_mat_view_date', false);
-- Code coverage: incorrect name of policy
SELECT remove_policies('max_mat_view_date', false, 'refresh_policy');
SELECT remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate', 'policy_compression');
SELECT show_policies('max_mat_view_date');
-- Add one policy
SELECT add_policies('max_mat_view_date', refresh_start_offset => '15 day'::interval, refresh_end_offset => '1 day'::interval, refresh_schedule_interval =>'1 h'::interval);
SELECT show_policies('max_mat_view_date');
-- Remove all policies
SELECT remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate', 'policy_retention');
SELECT show_policies('max_mat_view_date');
ALTER materialized view max_mat_view_date set (timescaledb.compress = false);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
--start_interval < end_interval
SELECT add_continuous_aggregate_policy('max_mat_view_date', '1 day'::interval, '2 days'::interval , '1 day'::interval) ;