From 82a30ae420890f8361c6c8f57ce77d3974c80336 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Thu, 11 Feb 2021 13:16:29 +0100 Subject: [PATCH] Validate continuous aggregate policy This change adds validation of the settings for a continuous aggregate policy when the policy is created. Previously it was possible to create policies that would either fail at runtime or never refresh anything due to bad configuration. In particular, the refresh window (start and end offsets for refreshing) must now be at least two buckets in size or an error is generated when the policy is created. The policy must cover at least two buckets to ensure that at least one bucket is refreshed when the policy runs, since it is unlikely that the policy runs at a time that is perfectly aligned with the beginning of a bucket. Note that it is still possible to create policies that might not refresh anything depending on the time when it runs. For instance, if the "current" time is close to the minimum allowed time value, the refresh window can lag enough to fall outside the valid time range (e.g., the end offset is big enough to push the window outside the valid time range). As time moves on, the window would eventually move into the valid time range, however. Fixes #2929 --- CHANGELOG.md | 1 + src/continuous_agg.c | 61 ++- src/continuous_agg.h | 7 +- src/process_utility.c | 9 +- test/sql/updates/setup.continuous_aggs.v2.sql | 4 +- tsl/src/bgw_policy/continuous_aggregate_api.c | 392 +++++++++++++----- tsl/src/bgw_policy/job.c | 10 +- tsl/src/continuous_aggs/create.c | 8 +- tsl/src/continuous_aggs/refresh.c | 57 +-- tsl/src/continuous_aggs/refresh.h | 11 +- tsl/test/expected/continuous_aggs_bgw.out | 30 +- tsl/test/expected/continuous_aggs_policy.out | 176 +++++--- .../expected/continuous_aggs_policy_run.out | 2 +- tsl/test/sql/continuous_aggs_bgw.sql | 4 +- tsl/test/sql/continuous_aggs_policy.sql | 92 ++-- tsl/test/sql/continuous_aggs_policy_run.sql | 2 +- tsl/test/sql/continuous_aggs_refresh.sql | 1 - 17 files changed, 595 insertions(+), 272 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a0083923..f84b6e2bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ accidentally triggering the load of a previous DB version.** **Bugfixes** * #2883 Fix join qual propagation for nested joins * #2908 Fix changing column type of clustered hypertables +* #2942 Validate continuous aggregate policy **Thanks** * @zeeshanshabbir93 for reporting an issue with joins diff --git a/src/continuous_agg.c b/src/continuous_agg.c index ec11dfe3b..a4e83c848 100644 --- a/src/continuous_agg.c +++ b/src/continuous_agg.c @@ -199,9 +199,18 @@ static void continuous_agg_init(ContinuousAgg *cagg, const Form_continuous_agg fd) { Oid nspid = get_namespace_oid(NameStr(fd->user_view_schema), false); + Hypertable *cagg_ht = ts_hypertable_get_by_id(fd->mat_hypertable_id); + Dimension *time_dim; + Assert(NULL != cagg_ht); + time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0); + Assert(NULL != time_dim); + cagg->partition_type = ts_dimension_get_partition_type(time_dim); cagg->relid = get_relname_relid(NameStr(fd->user_view_name), nspid); memcpy(&cagg->data, fd, sizeof(cagg->data)); + + Assert(OidIsValid(cagg->relid)); + Assert(OidIsValid(cagg->partition_type)); } TSDLLEXPORT ContinuousAggHypertableStatus @@ -294,12 +303,11 @@ ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id) return ca; } -ContinuousAgg * -ts_continuous_agg_find_by_view_name(const char *schema, const char *name, - ContinuousAggViewType type) +static bool +continuous_agg_fill_form_data(const char *schema, const char *name, ContinuousAggViewType type, + FormData_continuous_agg *fd) { ScanIterator iterator; - ContinuousAgg *ca = NULL; AttrNumber view_name_attrnum = 0; AttrNumber schema_name_attrnum = 0; int count = 0; @@ -353,8 +361,7 @@ ts_continuous_agg_find_by_view_name(const char *schema, const char *name, if (vtype != ContinuousAggAnyView) { - ca = ts_scan_iterator_alloc_result(&iterator, sizeof(*ca)); - continuous_agg_init(ca, data); + memcpy(fd, data, sizeof(*fd)); count++; } @@ -364,6 +371,22 @@ ts_continuous_agg_find_by_view_name(const char *schema, const char *name, Assert(count <= 1); + return count == 1; +} + +ContinuousAgg * +ts_continuous_agg_find_by_view_name(const char *schema, const char *name, + ContinuousAggViewType type) +{ + FormData_continuous_agg fd; + ContinuousAgg *ca; + + if (!continuous_agg_fill_form_data(schema, name, type, &fd)) + return NULL; + + ca = palloc0(sizeof(ContinuousAgg)); + continuous_agg_init(ca, &fd); + return ca; } @@ -626,12 +649,12 @@ ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id) /* Block dropping the partial and direct view if the continuous aggregate still exists */ static void -drop_internal_view(ContinuousAgg *agg) +drop_internal_view(const FormData_continuous_agg *fd) { ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext); int count = 0; - init_scan_by_mat_hypertable_id(&iterator, agg->data.mat_hypertable_id); + init_scan_by_mat_hypertable_id(&iterator, fd->mat_hypertable_id); ts_scanner_foreach(&iterator) { TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); @@ -647,25 +670,37 @@ drop_internal_view(ContinuousAgg *agg) } /* This gets called when a view gets dropped. */ -void -ts_continuous_agg_drop_view_callback(ContinuousAgg *ca, const char *schema, const char *name) +static void +continuous_agg_drop_view_callback(FormData_continuous_agg *fd, const char *schema, const char *name) { ContinuousAggViewType vtyp; - vtyp = ts_continuous_agg_view_type(&ca->data, schema, name); + vtyp = ts_continuous_agg_view_type(fd, schema, name); switch (vtyp) { case ContinuousAggUserView: - drop_continuous_agg(&ca->data, false /* The user view has already been dropped */); + drop_continuous_agg(fd, false /* The user view has already been dropped */); break; case ContinuousAggPartialView: case ContinuousAggDirectView: - drop_internal_view(ca); + drop_internal_view(fd); break; default: elog(ERROR, "unknown continuous aggregate view type"); } } +bool +ts_continuous_agg_drop(const char *view_schema, const char *view_name) +{ + FormData_continuous_agg fd; + bool found = continuous_agg_fill_form_data(view_schema, view_name, ContinuousAggAnyView, &fd); + + if (found) + continuous_agg_drop_view_callback(&fd, view_schema, view_name); + + return found; +} + static inline bool ts_continuous_agg_is_user_view_schema(FormData_continuous_agg *data, const char *schema) { diff --git a/src/continuous_agg.h b/src/continuous_agg.h index 856ef5d8a..d44d845bc 100644 --- a/src/continuous_agg.h +++ b/src/continuous_agg.h @@ -37,7 +37,10 @@ extern TSDLLEXPORT WithClauseResult *ts_continuous_agg_with_clause_parse(const L typedef struct ContinuousAgg { FormData_continuous_agg data; + /* Relid of the user-facing view */ Oid relid; + /* Type of the primary partitioning dimension */ + Oid partition_type; } ContinuousAgg; typedef enum ContinuousAggHypertableStatus @@ -69,9 +72,7 @@ extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_relid(Oid relid); extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_rv(const RangeVar *rv); -extern void ts_continuous_agg_drop_view_callback(ContinuousAgg *ca, const char *schema, - const char *name); - +extern bool ts_continuous_agg_drop(const char *view_schema, const char *view_name); extern void ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id); extern TSDLLEXPORT ContinuousAggViewType ts_continuous_agg_view_type(FormData_continuous_agg *data, diff --git a/src/process_utility.c b/src/process_utility.c index acfdf800a..0963e7392 100644 --- a/src/process_utility.c +++ b/src/process_utility.c @@ -3827,14 +3827,7 @@ process_drop_trigger(EventTriggerDropObject *obj) static void process_drop_view(EventTriggerDropView *dropped_view) { - ContinuousAgg *ca; - - ca = ts_continuous_agg_find_by_view_name(dropped_view->schema, - dropped_view->view_name, - ContinuousAggAnyView); - - if (ca != NULL) - ts_continuous_agg_drop_view_callback(ca, dropped_view->schema, dropped_view->view_name); + ts_continuous_agg_drop(dropped_view->schema, dropped_view->view_name); } static void diff --git a/test/sql/updates/setup.continuous_aggs.v2.sql b/test/sql/updates/setup.continuous_aggs.v2.sql index 174181f08..5664c2e05 100644 --- a/test/sql/updates/setup.continuous_aggs.v2.sql +++ b/test/sql/updates/setup.continuous_aggs.v2.sql @@ -368,7 +368,7 @@ BEGIN IF ts_version < '2.0.0' THEN CREATE VIEW mat_inttime WITH ( timescaledb.continuous, timescaledb.materialized_only=true, - timescaledb.ignore_invalidation_older_than = 5, + timescaledb.ignore_invalidation_older_than = 6, timescaledb.refresh_lag = 2, timescaledb.refresh_interval='12 hours') AS @@ -400,7 +400,7 @@ BEGIN FROM int_time_test GROUP BY 1 WITH NO DATA; - PERFORM add_continuous_aggregate_policy('mat_inttime', 5, 2, '12 hours'); + PERFORM add_continuous_aggregate_policy('mat_inttime', 6, 2, '12 hours'); PERFORM add_continuous_aggregate_policy('mat_inttime2', NULL, 2, '12 hours'); END IF; END $$; diff --git a/tsl/src/bgw_policy/continuous_aggregate_api.c b/tsl/src/bgw_policy/continuous_aggregate_api.c index 596f3bf73..42e49dd09 100644 --- a/tsl/src/bgw_policy/continuous_aggregate_api.c +++ b/tsl/src/bgw_policy/continuous_aggregate_api.c @@ -5,6 +5,8 @@ */ #include +#include +#include #include #include #include @@ -50,36 +52,62 @@ policy_continuous_aggregate_get_mat_hypertable_id(const Jsonb *config) } static int64 -get_interval_from_config(const Dimension *dim, const Jsonb *config, const char *json_label, - bool *isnull) +get_time_from_interval(const Dimension *dim, Datum interval, Oid type) +{ + Oid partitioning_type = ts_dimension_get_partition_type(dim); + + if (IS_INTEGER_TYPE(type)) + { + Oid now_func = ts_get_integer_now_func(dim); + int64 value = ts_interval_value_to_internal(interval, type); + + Assert(now_func); + + return ts_subtract_integer_from_now_saturating(now_func, value, partitioning_type); + } + else if (type == INTERVALOID) + { + Datum res = subtract_interval_from_now(DatumGetIntervalP(interval), partitioning_type); + return ts_time_value_to_internal(res, partitioning_type); + } + else + elog(ERROR, "unsupported offset type for continuous aggregate policy"); + + pg_unreachable(); + + return 0; +} + +static int64 +get_time_from_config(const Dimension *dim, const Jsonb *config, const char *json_label, + bool *isnull) { Oid partitioning_type = ts_dimension_get_partition_type(dim); *isnull = false; + if (IS_INTEGER_TYPE(partitioning_type)) { bool found; int64 interval_val = ts_jsonb_get_int64_field(config, json_label, &found); + if (!found) { *isnull = true; return 0; } - Oid now_func = ts_get_integer_now_func(dim); - - Assert(now_func); - return ts_subtract_integer_from_now_saturating(now_func, interval_val, partitioning_type); + return get_time_from_interval(dim, Int64GetDatum(interval_val), INT8OID); } else { - Datum res; Interval *interval_val = ts_jsonb_get_interval_field(config, json_label); + if (!interval_val) { *isnull = true; return 0; } - res = subtract_interval_from_now(interval_val, partitioning_type); - return ts_time_value_to_internal(res, partitioning_type); + + return get_time_from_interval(dim, IntervalPGetDatum(interval_val), INTERVALOID); } } @@ -87,7 +115,7 @@ int64 policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config) { bool start_isnull; - int64 res = get_interval_from_config(dim, config, CONFIG_KEY_START_OFFSET, &start_isnull); + int64 res = get_time_from_config(dim, config, CONFIG_KEY_START_OFFSET, &start_isnull); /* interpret NULL as min value for that type */ if (start_isnull) return ts_time_get_min(ts_dimension_get_partition_type(dim)); @@ -98,7 +126,7 @@ int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config) { bool end_isnull; - int64 res = get_interval_from_config(dim, config, CONFIG_KEY_END_OFFSET, &end_isnull); + int64 res = get_time_from_config(dim, config, CONFIG_KEY_END_OFFSET, &end_isnull); if (end_isnull) return ts_time_get_end_or_max(ts_dimension_get_partition_type(dim)); return res; @@ -159,12 +187,13 @@ static Datum convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const char *str_msg) { Oid convert_to = dim_type; + Datum converted; + + if (IS_TIMESTAMP_TYPE(dim_type)) + convert_to = INTERVALOID; if (*interval_type != convert_to) { - if (IS_TIMESTAMP_TYPE(dim_type)) - convert_to = INTERVALOID; - if (!can_coerce_type(1, interval_type, &convert_to, COERCION_IMPLICIT)) { if (IS_INTEGER_TYPE(dim_type)) @@ -183,68 +212,253 @@ convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const cha } } - return ts_time_datum_convert_arg(interval, interval_type, convert_to); + converted = ts_time_datum_convert_arg(interval, interval_type, convert_to); + + /* For integer types, first convert all types to int64 to get on a common + * type. Then check valid time ranges against the partition/dimension + * type */ + switch (*interval_type) + { + case INT2OID: + converted = Int64GetDatum((int64) DatumGetInt16(converted)); + break; + case INT4OID: + converted = Int64GetDatum((int64) DatumGetInt32(converted)); + break; + case INT8OID: + break; + case INTERVALOID: + /* For timestamp types, we only support Interval, so nothing further + * to do. */ + return converted; + default: + pg_unreachable(); + break; + } + + /* Cap at min and max */ + if (DatumGetInt64(converted) < ts_time_get_min(dim_type)) + converted = ts_time_get_min(dim_type); + else if (DatumGetInt64(converted) > ts_time_get_max(dim_type)) + converted = ts_time_get_max(dim_type); + + /* Convert to the desired integer type */ + switch (dim_type) + { + case INT2OID: + converted = Int16GetDatum((int16) DatumGetInt64(converted)); + break; + case INT4OID: + converted = Int32GetDatum((int32) DatumGetInt64(converted)); + break; + case INT8OID: + /* Already int64, so nothing to do. */ + break; + default: + pg_unreachable(); + break; + } + + *interval_type = dim_type; + + return converted; +} + +typedef struct CaggPolicyOffset +{ + Datum value; + Oid type; + bool isnull; + const char *name; +} CaggPolicyOffset; + +typedef struct CaggPolicyConfig +{ + Oid partition_type; + CaggPolicyOffset offset_start; + CaggPolicyOffset offset_end; +} CaggPolicyConfig; + +/* + * Convert an interval to a 128 integer value. + * + * Based on PostgreSQL's interval_cmp_value(). + */ +static inline INT128 +interval_to_int128(const Interval *interval) +{ + INT128 span; + int64 dayfraction; + int64 days; + + /* + * Separate time field into days and dayfraction, then add the month and + * day fields to the days part. We cannot overflow int64 days here. + */ + dayfraction = interval->time % USECS_PER_DAY; + days = interval->time / USECS_PER_DAY; + days += interval->month * INT64CONST(30); + days += interval->day; + + /* Widen dayfraction to 128 bits */ + span = int64_to_int128(dayfraction); + + /* Scale up days to microseconds, forming a 128-bit product */ + int128_add_int64_mul_int64(&span, days, USECS_PER_DAY); + + return span; +} + +static int64 +interval_to_int64(Datum interval, Oid type) +{ + switch (type) + { + case INT2OID: + return DatumGetInt16(interval); + case INT4OID: + return DatumGetInt32(interval); + case INT8OID: + return DatumGetInt64(interval); + case INTERVALOID: + { + const int64 max = ts_time_get_max(TIMESTAMPTZOID); + const int64 min = ts_time_get_min(TIMESTAMPTZOID); + INT128 bigres = interval_to_int128(DatumGetIntervalP(interval)); + + if (int128_compare(bigres, int64_to_int128(max)) >= 0) + return max; + else if (int128_compare(bigres, int64_to_int128(min)) <= 0) + return min; + else + return int128_to_int64(bigres); + } + default: + break; + } + + pg_unreachable(); + + return 0; +} + +static const char * +two_buckets_to_str(const ContinuousAgg *cagg) +{ + Oid bucket_type; + Oid outfuncid; + int64 two_buckets; + Datum min_range; + bool isvarlena; + + if (IS_TIMESTAMP_TYPE(cagg->partition_type)) + bucket_type = INTERVALOID; + else + bucket_type = cagg->partition_type; + + two_buckets = ts_time_saturating_add(cagg->data.bucket_width, + cagg->data.bucket_width, + cagg->partition_type); + + min_range = ts_internal_to_interval_value(two_buckets, bucket_type); + + getTypeOutputInfo(bucket_type, &outfuncid, &isvarlena); + Assert(!isvarlena); + + return DatumGetCString(OidFunctionCall1(outfuncid, min_range)); +} + +/* + * Enforce that a policy has a refresh window of at least two buckets to + * ensure we materialize at least one bucket each run. + * + * Why two buckets? Note that the policy probably won't execute at at time + * that exactly aligns with a bucket boundary, so a window of one bucket + * might not cover a full bucket that we want to materialize: + * + * Refresh window: [-----) + * Materialized buckets: |-----|-----|-----| + */ +static void +validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config) +{ + int64 start_offset; + int64 end_offset; + + if (config->offset_start.isnull) + start_offset = ts_time_get_max(cagg->partition_type); + else + start_offset = interval_to_int64(config->offset_start.value, config->offset_start.type); + + if (config->offset_end.isnull) + end_offset = ts_time_get_min(cagg->partition_type); + else + end_offset = interval_to_int64(config->offset_end.value, config->offset_end.type); + + if (ts_time_saturating_add(end_offset, cagg->data.bucket_width * 2, INT8OID) > start_offset) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("policy refresh window too small"), + errdetail("The start and end offsets must cover at least" + " two buckets in the valid time range of type \"%s\".", + format_type_be(cagg->partition_type)), + errhint("Use a start and end offset that specifies" + " a window of at least %s.", + two_buckets_to_str(cagg)))); } static void -check_valid_interval_values(Oid interval_type, Datum start_offset, Datum end_offset) +parse_offset_arg(const ContinuousAgg *cagg, const FunctionCallInfo fcinfo, CaggPolicyOffset *offset, + int argnum) { - bool valid = true; - if (IS_INTEGER_TYPE(interval_type)) + offset->isnull = PG_ARGISNULL(argnum); + + if (!offset->isnull) { - switch (interval_type) - { - case INT2OID: - { - if (DatumGetInt16(start_offset) <= DatumGetInt16(end_offset)) - valid = false; - break; - } - case INT4OID: - { - if (DatumGetInt32(start_offset) <= DatumGetInt32(end_offset)) - valid = false; - break; - } - case INT8OID: - { - if (DatumGetInt64(start_offset) <= DatumGetInt64(end_offset)) - valid = false; - break; - } - } - } - else - { - Assert(interval_type == INTERVALOID); - valid = DatumGetBool(DirectFunctionCall2(interval_gt, start_offset, end_offset)); - } - if (!valid) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("start interval should be greater than end interval"))); + Oid type = get_fn_expr_argtype(fcinfo->flinfo, argnum); + Datum arg = PG_GETARG_DATUM(argnum); + + offset->value = convert_interval_arg(cagg->partition_type, arg, &type, offset->name); + offset->type = type; } } +static void +parse_cagg_policy_config(const ContinuousAgg *cagg, const FunctionCallInfo fcinfo, + CaggPolicyConfig *config) +{ + MemSet(config, 0, sizeof(CaggPolicyConfig)); + config->partition_type = cagg->partition_type; + /* This might seem backwards, but since we are dealing with offsets, start + * actually translates to max and end to min for maximum window. */ + config->offset_start.value = ts_time_datum_get_max(config->partition_type); + config->offset_end.value = ts_time_datum_get_min(config->partition_type); + config->offset_start.type = config->offset_end.type = + IS_TIMESTAMP_TYPE(cagg->partition_type) ? INTERVALOID : cagg->partition_type; + config->offset_start.name = CONFIG_KEY_START_OFFSET; + config->offset_end.name = CONFIG_KEY_END_OFFSET; + + parse_offset_arg(cagg, fcinfo, &config->offset_start, 1); + parse_offset_arg(cagg, fcinfo, &config->offset_end, 2); + + Assert(config->offset_start.type == config->offset_end.type); + validate_window_size(cagg, config); +} + Datum policy_refresh_cagg_add(PG_FUNCTION_ARGS) { NameData application_name; NameData refresh_name; NameData proc_name, proc_schema, owner; - Cache *hcache; - Hypertable *mat_ht; - Dimension *dim; ContinuousAgg *cagg; - int32 job_id, mat_htid; - Datum start_offset, end_offset; + CaggPolicyConfig policyconf; + int32 job_id; Interval refresh_interval; - Oid dim_type, start_offset_type, end_offset_type; Oid cagg_oid, owner_id; List *jobs; JsonbParseState *parse_state = NULL; - bool if_not_exists, start_isnull, end_isnull; + bool if_not_exists; /* Verify that the owner can create a background worker */ cagg_oid = PG_GETARG_OID(0); @@ -257,49 +471,20 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid)))); - hcache = ts_hypertable_cache_pin(); - mat_htid = cagg->data.mat_hypertable_id; - mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, mat_htid); - dim = hyperspace_get_open_dimension(mat_ht->space, 0); - dim_type = ts_dimension_get_partition_type(dim); - ts_cache_release(hcache); - - /* Try to convert the argument to the time type used by the - * continuous aggregate */ - start_offset = PG_GETARG_DATUM(1); - end_offset = PG_GETARG_DATUM(2); - start_isnull = PG_ARGISNULL(1); - end_isnull = PG_ARGISNULL(2); - start_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 1); - end_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 2); - - if (!start_isnull) - start_offset = convert_interval_arg(dim_type, - start_offset, - &start_offset_type, - CONFIG_KEY_START_OFFSET); - - if (!end_isnull) - end_offset = - convert_interval_arg(dim_type, end_offset, &end_offset_type, CONFIG_KEY_END_OFFSET); - - if (!start_isnull && !end_isnull) - { - Assert(start_offset_type == end_offset_type); - check_valid_interval_values(start_offset_type, start_offset, end_offset); - } + parse_cagg_policy_config(cagg, fcinfo, &policyconf); if (PG_ARGISNULL(3)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot use NULL schedule interval"))); + refresh_interval = *PG_GETARG_INTERVAL_P(3); if_not_exists = PG_GETARG_BOOL(4); /* Make sure there is only 1 refresh policy on the cagg */ jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME, INTERNAL_SCHEMA_NAME, - mat_htid); + cagg->data.mat_hypertable_id); if (jobs != NIL) { @@ -313,14 +498,14 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS) if (policy_config_check_hypertable_lag_equality(existing->fd.config, CONFIG_KEY_START_OFFSET, - dim_type, - start_offset_type, - start_offset) && + cagg->partition_type, + policyconf.offset_start.type, + policyconf.offset_start.value) && policy_config_check_hypertable_lag_equality(existing->fd.config, CONFIG_KEY_END_OFFSET, - dim_type, - end_offset_type, - end_offset)) + cagg->partition_type, + policyconf.offset_end.type, + policyconf.offset_end.value)) { /* If all arguments are the same, do nothing */ ereport(NOTICE, @@ -348,19 +533,20 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS) namestrcpy(&owner, GetUserNameFromId(owner_id, false)); pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL); - ts_jsonb_add_int32(parse_state, CONFIG_KEY_MAT_HYPERTABLE_ID, mat_htid); - if (!start_isnull) + ts_jsonb_add_int32(parse_state, CONFIG_KEY_MAT_HYPERTABLE_ID, cagg->data.mat_hypertable_id); + if (!policyconf.offset_start.isnull) json_add_dim_interval_value(parse_state, CONFIG_KEY_START_OFFSET, - start_offset_type, - start_offset); + policyconf.offset_start.type, + policyconf.offset_start.value); else ts_jsonb_add_null(parse_state, CONFIG_KEY_START_OFFSET); - if (!end_isnull) + + if (!policyconf.offset_end.isnull) json_add_dim_interval_value(parse_state, CONFIG_KEY_END_OFFSET, - end_offset_type, - end_offset); + policyconf.offset_end.type, + policyconf.offset_end.value); else ts_jsonb_add_null(parse_state, CONFIG_KEY_END_OFFSET); JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL); @@ -376,7 +562,7 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS) &proc_name, &owner, true, - mat_htid, + cagg->data.mat_hypertable_id, config); PG_RETURN_INT32(job_id); diff --git a/tsl/src/bgw_policy/job.c b/tsl/src/bgw_policy/job.c index 5d8b52768..b11f46b1a 100644 --- a/tsl/src/bgw_policy/job.c +++ b/tsl/src/bgw_policy/job.c @@ -323,13 +323,9 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config) PolicyContinuousAggData policy_data; policy_refresh_cagg_read_and_validate_config(config, &policy_data); - elog(LOG, - "refresh continuous aggregate range %s , %s", - ts_internal_to_time_string(policy_data.refresh_window.start, - policy_data.refresh_window.type), - ts_internal_to_time_string(policy_data.refresh_window.end, - policy_data.refresh_window.type)); - continuous_agg_refresh_internal(policy_data.cagg, &policy_data.refresh_window, false); + continuous_agg_refresh_internal(policy_data.cagg, + &policy_data.refresh_window, + CAGG_REFRESH_POLICY); return true; } diff --git a/tsl/src/continuous_aggs/create.c b/tsl/src/continuous_aggs/create.c index b0b9b1a55..92e7a937c 100644 --- a/tsl/src/continuous_aggs/create.c +++ b/tsl/src/continuous_aggs/create.c @@ -1787,8 +1787,6 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void * { Oid relid; ContinuousAgg *cagg; - Hypertable *cagg_ht; - Dimension *time_dim; InternalTimeRange refresh_window = { .type = InvalidOid, }; @@ -1803,13 +1801,11 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void * relid = get_relname_relid(stmt->into->rel->relname, nspid); cagg = ts_continuous_agg_find_by_relid(relid); Assert(cagg != NULL); - cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id); - time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0); - refresh_window.type = ts_dimension_get_partition_type(time_dim); + refresh_window.type = cagg->partition_type; refresh_window.start = ts_time_get_min(refresh_window.type); refresh_window.end = ts_time_get_noend_or_max(refresh_window.type); - continuous_agg_refresh_internal(cagg, &refresh_window, true); + continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION); } return DDL_DONE; } diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index 554fcc4c9..ba74c3d4b 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -274,9 +274,6 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang Oid outfuncid = InvalidOid; bool isvarlena; - if (client_min_messages > elevel) - return; - start_ts = ts_internal_to_time_value(refresh_window->start, refresh_window->type); end_ts = ts_internal_to_time_value(refresh_window->end, refresh_window->type); getTypeOutputInfo(refresh_window->type, &outfuncid, &isvarlena); @@ -360,15 +357,6 @@ get_cagg_by_relid(const Oid cagg_relid) return cagg; } -static Oid -get_partition_type_by_cagg(const ContinuousAgg *const cagg) -{ - Hypertable *cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id); - Dimension *time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0); - - return ts_dimension_get_partition_type(time_dim); -} - #define REFRESH_FUNCTION_NAME "refresh_continuous_aggregate()" /* * Refresh a continuous aggregate across the given window. @@ -383,7 +371,7 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) }; cagg = get_cagg_by_relid(cagg_relid); - refresh_window.type = get_partition_type_by_cagg(cagg); + refresh_window.type = cagg->partition_type; if (!PG_ARGISNULL(1)) refresh_window.start = ts_time_value_from_arg(PG_GETARG_DATUM(1), @@ -399,22 +387,31 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) else refresh_window.end = ts_time_get_noend_or_max(refresh_window.type); - continuous_agg_refresh_internal(cagg, &refresh_window, false); + continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_WINDOW); PG_RETURN_VOID(); } static void -emit_up_to_date_notice(const ContinuousAgg *cagg) +emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshCallContext callctx) { - elog(NOTICE, - "continuous aggregate \"%s\" is already up-to-date", - NameStr(cagg->data.user_view_name)); + switch (callctx) + { + case CAGG_REFRESH_CHUNK: + case CAGG_REFRESH_WINDOW: + case CAGG_REFRESH_CREATION: + elog(NOTICE, + "continuous aggregate \"%s\" is already up-to-date", + NameStr(cagg->data.user_view_name)); + break; + case CAGG_REFRESH_POLICY: + break; + } } static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg, - const InternalTimeRange *refresh_window, bool verbose, - int32 chunk_id) + const InternalTimeRange *refresh_window, + const CaggRefreshCallContext callctx, int32 chunk_id) { InvalidationStore *invalidations; Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id); @@ -432,7 +429,7 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg, if (invalidations != NULL) { - if (verbose) + if (callctx == CAGG_REFRESH_CREATION) { Assert(OidIsValid(cagg->relid)); ereport(NOTICE, @@ -450,7 +447,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg, void continuous_agg_refresh_internal(const ContinuousAgg *cagg, - const InternalTimeRange *refresh_window_arg, bool verbose) + const InternalTimeRange *refresh_window_arg, + const CaggRefreshCallContext callctx) { Catalog *catalog = ts_catalog_get(); int32 mat_id = cagg->data.mat_hypertable_id; @@ -486,7 +484,10 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, errhint("Align the refresh window with the bucket" " time zone or use at least two buckets."))); - log_refresh_window(DEBUG1, cagg, &refresh_window, "refreshing continuous aggregate"); + log_refresh_window(callctx == CAGG_REFRESH_POLICY ? LOG : DEBUG1, + cagg, + &refresh_window, + "refreshing continuous aggregate"); /* Perform the refresh across two transactions. * @@ -530,7 +531,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, * nothing to refresh in that case */ if (refresh_window.start >= refresh_window.end) { - emit_up_to_date_notice(cagg); + emit_up_to_date_notice(cagg, callctx); return; } @@ -544,8 +545,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, StartTransactionCommand(); cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id); - if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, verbose, INVALID_CHUNK_ID)) - emit_up_to_date_notice(cagg); + if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID)) + emit_up_to_date_notice(cagg, callctx); } /* @@ -565,7 +566,7 @@ continuous_agg_refresh_chunk(PG_FUNCTION_ARGS) Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true); Catalog *catalog = ts_catalog_get(); const InternalTimeRange refresh_window = { - .type = get_partition_type_by_cagg(cagg), + .type = cagg->partition_type, .start = ts_chunk_primary_dimension_start(chunk), .end = ts_chunk_primary_dimension_end(chunk), }; @@ -596,7 +597,7 @@ continuous_agg_refresh_chunk(PG_FUNCTION_ARGS) invalidation_process_hypertable_log(cagg); /* Must make invalidation processing visible */ CommandCounterIncrement(); - process_cagg_invalidations_and_refresh(cagg, &refresh_window, false, chunk->fd.id); + process_cagg_invalidations_and_refresh(cagg, &refresh_window, CAGG_REFRESH_CHUNK, chunk->fd.id); PG_RETURN_VOID(); } diff --git a/tsl/src/continuous_aggs/refresh.h b/tsl/src/continuous_aggs/refresh.h index ee48e3484..a812e6636 100644 --- a/tsl/src/continuous_aggs/refresh.h +++ b/tsl/src/continuous_aggs/refresh.h @@ -12,9 +12,18 @@ #include "materialize.h" +typedef enum CaggRefreshCallContext +{ + CAGG_REFRESH_CREATION, + CAGG_REFRESH_WINDOW, + CAGG_REFRESH_CHUNK, + CAGG_REFRESH_POLICY, +} CaggRefreshCallContext; + extern Datum continuous_agg_refresh(PG_FUNCTION_ARGS); extern Datum continuous_agg_refresh_chunk(PG_FUNCTION_ARGS); extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg, - const InternalTimeRange *refresh_window, bool verbose); + const InternalTimeRange *refresh_window, + const CaggRefreshCallContext callctx); #endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_REFRESH_H */ diff --git a/tsl/test/expected/continuous_aggs_bgw.out b/tsl/test/expected/continuous_aggs_bgw.out index cc3c80de1..5edc3b3d5 100644 --- a/tsl/test/expected/continuous_aggs_bgw.out +++ b/tsl/test/expected/continuous_aggs_bgw.out @@ -138,11 +138,11 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); (1 row) SELECT * FROM sorted_bgw_log; - msg_no | mock_time | application_name | msg ---------+-----------+--------------------------------------------+---------------------------------------------------- + msg_no | mock_time | application_name | msg +--------+-----------+--------------------------------------------+----------------------------------------------------------------------------------------- 0 | 0 | DB Scheduler | [TESTING] Registered new background worker 1 | 0 | DB Scheduler | [TESTING] Wait until 25000, started at 0 - 0 | 0 | Refresh Continuous Aggregate Policy [1000] | refresh continuous aggregate range -2147483648 , 6 + 0 | 0 | Refresh Continuous Aggregate Policy [1000] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -2147483648, 6 ] (3 rows) SELECT * FROM _timescaledb_config.bgw_job where id=:job_id; @@ -356,7 +356,7 @@ CREATE MATERIALIZED VIEW test_continuous_agg_view AS SELECT time_bucket('2', time), SUM(data) as value FROM test_continuous_agg_table GROUP BY 1 WITH NO DATA; -SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, -2::integer, '12 h'::interval); +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); add_continuous_aggregate_policy --------------------------------- 1001 @@ -371,11 +371,11 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25); (1 row) SELECT * FROM sorted_bgw_log; - msg_no | mock_time | application_name | msg ---------+-----------+--------------------------------------------+----------------------------------------------------- + msg_no | mock_time | application_name | msg +--------+-----------+--------------------------------------------+---------------------------------------------------------------------------------- 0 | 0 | DB Scheduler | [TESTING] Registered new background worker 1 | 0 | DB Scheduler | [TESTING] Wait until 25000, started at 0 - 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refresh continuous aggregate range -2147483648 , 12 + 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] (3 rows) -- job ran once, successfully @@ -416,14 +416,14 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25); (1 row) SELECT * FROM sorted_bgw_log; - msg_no | mock_time | application_name | msg ---------+-------------+--------------------------------------------+---------------------------------------------------------- + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+---------------------------------------------------------------------------------- 0 | 0 | DB Scheduler | [TESTING] Registered new background worker 1 | 0 | DB Scheduler | [TESTING] Wait until 25000, started at 0 - 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refresh continuous aggregate range -2147483648 , 12 + 0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] 0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker 1 | 43200000000 | DB Scheduler | [TESTING] Wait until 43200025000, started at 43200000000 - 0 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | refresh continuous aggregate range -2147483648 , 12 + 0 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ] (6 rows) SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes @@ -484,7 +484,7 @@ CREATE MATERIALIZED VIEW test_continuous_agg_view AS SELECT time_bucket('2', time), SUM(data) as value, get_constant_no_perms() FROM test_continuous_agg_table GROUP BY 1 WITH NO DATA; -SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, -2::integer, '12 h'::interval); +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); add_continuous_aggregate_policy --------------------------------- 1002 @@ -635,11 +635,11 @@ SELECT * FROM test_continuous_agg_view_user_2; \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER SELECT * from sorted_bgw_log; - msg_no | mock_time | application_name | msg ---------+-------------+--------------------------------------------+--------------------------------------------------------------- + msg_no | mock_time | application_name | msg +--------+-------------+--------------------------------------------+------------------------------------------------------------------------------------------------ 0 | 0 | DB Scheduler | [TESTING] Registered new background worker 1 | 0 | DB Scheduler | [TESTING] Wait until 25000, started at 0 - 0 | 0 | Refresh Continuous Aggregate Policy [1003] | refresh continuous aggregate range -2147483648 , 3 + 0 | 0 | Refresh Continuous Aggregate Policy [1003] | refreshing continuous aggregate "test_continuous_agg_view_user_2" in window [ -2147483648, 2 ] 0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker 1 | 43200000000 | DB Scheduler | [TESTING] Wait until 43200025000, started at 43200000000 0 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | job 1003 threw an error diff --git a/tsl/test/expected/continuous_aggs_policy.out b/tsl/test/expected/continuous_aggs_policy.out index 7b2e79068..00a0b5163 100644 --- a/tsl/test/expected/continuous_aggs_policy.out +++ b/tsl/test/expected/continuous_aggs_policy.out @@ -45,23 +45,38 @@ SELECT count(*) FROM _timescaledb_config.bgw_job; (1 row) \set ON_ERROR_STOP 0 +\set VERBOSITY default SELECT add_continuous_aggregate_policy('int_tab', '1 day'::interval, 10 , '1 h'::interval); ERROR: "int_tab" is not a continuous aggregate SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 , '1 h'::interval); ERROR: invalid parameter value for start_offset +HINT: Use time interval of type integer with the continuous aggregate. SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 ); -ERROR: function add_continuous_aggregate_policy(unknown, interval, integer) does not exist at character 8 +ERROR: function add_continuous_aggregate_policy(unknown, interval, integer) does not exist +LINE 1: SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::in... + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. SELECT add_continuous_aggregate_policy('mat_m1', 10, '1 day'::interval, '1 h'::interval); ERROR: invalid parameter value for end_offset +HINT: Use time interval of type integer with the continuous aggregate. --start_interval < end_interval -SELECT add_continuous_aggregate_policy('mat_m1', 5, 10, '1h'::interval) as job_id \gset -ERROR: start interval should be greater than end interval +SELECT add_continuous_aggregate_policy('mat_m1', 5, 10, '1h'::interval); +ERROR: policy refresh window too small +DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "integer". +HINT: Use a start and end offset that specifies a window of at least 2. +--refresh window less than two buckets +SELECT add_continuous_aggregate_policy('mat_m1', 11, 10, '1h'::interval); +ERROR: policy refresh window too small +DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "integer". +HINT: Use a start and end offset that specifies a window of at least 2. SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id \gset --adding again should warn/error SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>false); ERROR: continuous aggregate policy already exists for "mat_m1" SELECT add_continuous_aggregate_policy('mat_m1', 20, 15, '1h'::interval, if_not_exists=>true); WARNING: continuous aggregate policy already exists for "mat_m1" +DETAIL: A policy already exists with different arguments. +HINT: Remove the existing policy before adding a new one. add_continuous_aggregate_policy --------------------------------- -1 @@ -82,6 +97,8 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; (1 row) SELECT hypertable_id as mat_id FROM _timescaledb_config.bgw_job where id = :job_id \gset +\set VERBOSITY terse +\set ON_ERROR_STOP 1 \c :TEST_DBNAME :ROLE_SUPERUSER UPDATE _timescaledb_config.bgw_job SET config = jsonb_build_object('mat_hypertable_id', :mat_id) @@ -93,6 +110,8 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; {"mat_hypertable_id": 2} (1 row) +\set ON_ERROR_STOP 0 +\set VERBOSITY default SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true); ERROR: could not find start_offset in config for existing job SELECT remove_continuous_aggregate_policy('int_tab'); @@ -118,6 +137,8 @@ NOTICE: continuous aggregate policy not found for "mat_m1", skipping SET ROLE :ROLE_DEFAULT_PERM_USER_2; SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id ; ERROR: must be owner of continuous aggregate "mat_m1" +\set VERBOSITY terse +\set ON_ERROR_STOP 1 SET ROLE :ROLE_DEFAULT_PERM_USER; DROP MATERIALIZED VIEW mat_m1; --- code coverage tests : add policy for timestamp and date based table --- @@ -130,31 +151,75 @@ NOTICE: adding not-null constraint to column "time" (1 row) CREATE MATERIALIZED VIEW max_mat_view_date - WITH (timescaledb.continuous, timescaledb.materialized_only=true) - AS SELECT time_bucket('7 days', time) - FROM continuous_agg_max_mat_date - GROUP BY 1 WITH NO DATA; + WITH (timescaledb.continuous, timescaledb.materialized_only=true) + AS SELECT time_bucket('7 days', time) + FROM continuous_agg_max_mat_date + GROUP BY 1 WITH NO DATA; \set ON_ERROR_STOP 0 +\set VERBOSITY default SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval); ERROR: invalid parameter value for end_offset +HINT: Use time interval with a continuous aggregate using timestamp-based time bucket. --start_interval < end_interval SELECT add_continuous_aggregate_policy('max_mat_view_date', '1 day'::interval, '2 days'::interval , '1 day'::interval) ; -ERROR: start interval should be greater than end interval +ERROR: policy refresh window too small +DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "date". +HINT: Use a start and end offset that specifies a window of at least @ 14 days. +--interval less than two buckets +SELECT add_continuous_aggregate_policy('max_mat_view_date', '7 days', '1 day', '1 day'::interval); +ERROR: policy refresh window too small +DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "date". +HINT: Use a start and end offset that specifies a window of at least @ 14 days. +SELECT add_continuous_aggregate_policy('max_mat_view_date', '14 days', '1 day', '1 day'::interval); +ERROR: policy refresh window too small +DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "date". +HINT: Use a start and end offset that specifies a window of at least @ 14 days. +SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-10 hours', '1 day'::interval); +ERROR: policy refresh window too small +DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "date". +HINT: Use a start and end offset that specifies a window of at least @ 14 days. +\set VERBOSITY terse \set ON_ERROR_STOP 1 -SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset +-- Negative start offset gives two bucket window: +SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-1 day', '1 day'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1001 +(1 row) + +SELECT remove_continuous_aggregate_policy('max_mat_view_date'); + remove_continuous_aggregate_policy +------------------------------------ + +(1 row) + +-- Both offsets NULL: +SELECT add_continuous_aggregate_policy('max_mat_view_date', NULL, NULL, '1 day'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1002 +(1 row) + +SELECT remove_continuous_aggregate_policy('max_mat_view_date'); + remove_continuous_aggregate_policy +------------------------------------ + +(1 row) + +SELECT add_continuous_aggregate_policy('max_mat_view_date', '15 days', '1 day', '1 day'::interval) as job_id \gset SELECT config FROM _timescaledb_config.bgw_job WHERE id = :job_id; - config -------------------------------------------------------------------------------- - {"end_offset": "@ 1 day", "start_offset": "@ 2 days", "mat_hypertable_id": 4} + config +-------------------------------------------------------------------------------- + {"end_offset": "@ 1 day", "start_offset": "@ 15 days", "mat_hypertable_id": 4} (1 row) INSERT INTO continuous_agg_max_mat_date - SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day'); + SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day'); --- to prevent NOTICES set message level to warning -SET client_min_messages TO warning; +SET client_min_messages TO warning; CALL run_job(:job_id); -RESET client_min_messages ; +RESET client_min_messages; DROP MATERIALIZED VIEW max_mat_view_date; CREATE TABLE continuous_agg_timestamp(time TIMESTAMP); SELECT create_hypertable('continuous_agg_timestamp', 'time'); @@ -165,42 +230,48 @@ NOTICE: adding not-null constraint to column "time" (1 row) CREATE MATERIALIZED VIEW max_mat_view_timestamp - WITH (timescaledb.continuous, timescaledb.materialized_only=true) - AS SELECT time_bucket('7 days', time) - FROM continuous_agg_timestamp - GROUP BY 1 WITH NO DATA; + WITH (timescaledb.continuous, timescaledb.materialized_only=true) + AS SELECT time_bucket('7 days', time) + FROM continuous_agg_timestamp + GROUP BY 1 WITH NO DATA; +--the start offset overflows the smallest time value, but is capped at +--the min value +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval); + add_continuous_aggregate_policy +--------------------------------- + 1004 +(1 row) + +SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp'); + remove_continuous_aggregate_policy +------------------------------------ + +(1 row) + \set ON_ERROR_STOP 0 ---will overflow at runtime even though policy check works -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval) as job_id \gset -CALL run_job(:job_id); -ERROR: timestamp out of range --- bad timestamps at runtime even though policy check works -SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp'); - remove_continuous_aggregate_policy ------------------------------------- - -(1 row) - -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '301 days', '10 months' , '1 h'::interval) as job_id \gset -CALL run_job(:job_id); -ERROR: invalid refresh window +\set VERBOSITY default +--start and end offset capped at the lowest time value, which means +--zero size window +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '900000 years' , '1 h'::interval); +ERROR: policy refresh window too small +DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "timestamp without time zone". +HINT: Use a start and end offset that specifies a window of at least @ 14 days. +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '301 days', '10 months' , '1 h'::interval); +ERROR: policy refresh window too small +DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "timestamp without time zone". +HINT: Use a start and end offset that specifies a window of at least @ 14 days. +\set VERBOSITY terse \set ON_ERROR_STOP 1 -SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp'); - remove_continuous_aggregate_policy ------------------------------------- - -(1 row) - -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 days', '1 h'::interval , '1 h'::interval) as job_id \gset --- to prevent NOTICES set message level to warning -SET client_min_messages TO warning; +SET client_min_messages TO warning; CALL run_job(:job_id); RESET client_min_messages ; SELECT config FROM _timescaledb_config.bgw_job WHERE id = :job_id; config --------------------------------------------------------------------------------- - {"end_offset": "@ 1 hour", "start_offset": "@ 10 days", "mat_hypertable_id": 6} + {"end_offset": "@ 1 hour", "start_offset": "@ 15 days", "mat_hypertable_id": 6} (1 row) \c :TEST_DBNAME :ROLE_SUPERUSER @@ -215,7 +286,7 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; (1 row) \set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 day', '1 day', '1h'::interval, if_not_exists=>true); ERROR: could not find start_offset in config for job SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true); ERROR: invalid input syntax for type interval: "xyz" @@ -244,12 +315,17 @@ SELECT time_bucket( SMALLINT '1', a) , count(*) FROM smallint_tab GROUP BY 1 WITH NO DATA; \set ON_ERROR_STOP 0 +\set VERBOSITY default SELECT add_continuous_aggregate_policy('mat_smallint', 15, 0 , '1 h'::interval); ERROR: invalid parameter value for start_offset +HINT: Use time interval of type smallint with the continuous aggregate. SELECT add_continuous_aggregate_policy('mat_smallint', 98898::smallint , 0::smallint, '1 h'::interval); ERROR: smallint out of range SELECT add_continuous_aggregate_policy('mat_smallint', 5::smallint, 10::smallint , '1 h'::interval) as job_id \gset -ERROR: start interval should be greater than end interval +ERROR: policy refresh window too small +DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "smallint". +HINT: Use a start and end offset that specifies a window of at least 2. +\set VERBOSITY terse \set ON_ERROR_STOP 1 SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 0::smallint , '1 h'::interval) as job_id \gset INSERT INTO smallint_tab VALUES(5); @@ -296,7 +372,7 @@ SELECT * FROM mat_smallint ORDER BY 1; ---+-------- (0 rows) --- overflow start_interval. now this runs as range is capped [-32768, -32765) +-- overflow start_interval. now this runs as range is capped [-32768, -32765) INSERT INTO smallint_tab VALUES( -32760 ); SELECT maxval, maxval - 10, maxval -5 FROM integer_now_smallint_tab() as maxval; maxval | ?column? | ?column? @@ -320,7 +396,7 @@ SELECT * FROM mat_smallint ORDER BY 1; (0 rows) -- Case 2: overflow by subtracting from PG_INT16_MAX ---overflow start and end . will fail as range is [32767, 32767] +--overflow start and end . will fail as range is [32767, 32767] SELECT remove_continuous_aggregate_policy('mat_smallint'); remove_continuous_aggregate_policy ------------------------------------ @@ -335,7 +411,7 @@ SELECT maxval, maxval - (-1), maxval - (-2) FROM integer_now_smallint_tab() as m 32767 | 32768 | 32769 (1 row) -SELECT add_continuous_aggregate_policy('mat_smallint', -1::smallint, -2::smallint , '1 h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('mat_smallint', -1::smallint, -3::smallint , '1 h'::interval) as job_id \gset \set ON_ERROR_STOP 0 CALL run_job(:job_id); ERROR: invalid refresh window @@ -351,14 +427,14 @@ SELECT remove_continuous_aggregate_policy('mat_smallint'); (1 row) ---overflow end . will work range is [32765, 32767) +--overflow end . will work range is [32765, 32767) SELECT maxval, maxval - (1), maxval - (-2) FROM integer_now_smallint_tab() as maxval; maxval | ?column? | ?column? --------+----------+---------- 32767 | 32766 | 32769 (1 row) -SELECT add_continuous_aggregate_policy('mat_smallint', 1::smallint, -2::smallint , '1 h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('mat_smallint', 1::smallint, -3::smallint , '1 h'::interval) as job_id \gset \set ON_ERROR_STOP 0 CALL run_job(:job_id); SELECT * FROM mat_smallint ORDER BY 1; @@ -409,7 +485,7 @@ FROM bigint_tab GROUP BY 1 WITH NO DATA; \set ON_ERROR_STOP 0 SELECT add_continuous_aggregate_policy('mat_bigint', 5::bigint, 10::bigint , '1 h'::interval) ; -ERROR: start interval should be greater than end interval +ERROR: policy refresh window too small \set ON_ERROR_STOP 1 SELECT add_continuous_aggregate_policy('mat_bigint', 15::bigint, 0::bigint , '1 h'::interval) as job_mid \gset INSERT INTO bigint_tab VALUES(5); diff --git a/tsl/test/expected/continuous_aggs_policy_run.out b/tsl/test/expected/continuous_aggs_policy_run.out index 8bdb5e520..84a983ef2 100644 --- a/tsl/test/expected/continuous_aggs_policy_run.out +++ b/tsl/test/expected/continuous_aggs_policy_run.out @@ -55,7 +55,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp AS SELECT time_bucket('7 days', time), count(*) FROM continuous_agg_timestamp GROUP BY 1 WITH NO DATA; -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 days', '1 h'::interval , '1 h'::interval) as job_id \gset INSERT INTO continuous_agg_timestamp SELECT generate_series('2019-09-01 00:00'::timestamp, '2019-09-10 00:00'::timestamp, '1 day'); --- to prevent NOTICES set message level to warning diff --git a/tsl/test/sql/continuous_aggs_bgw.sql b/tsl/test/sql/continuous_aggs_bgw.sql index 589c309b8..9edec6773 100644 --- a/tsl/test/sql/continuous_aggs_bgw.sql +++ b/tsl/test/sql/continuous_aggs_bgw.sql @@ -227,7 +227,7 @@ CREATE MATERIALIZED VIEW test_continuous_agg_view FROM test_continuous_agg_table GROUP BY 1 WITH NO DATA; -SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, -2::integer, '12 h'::interval); +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset SELECT id AS job_id FROM _timescaledb_config.bgw_job WHERE hypertable_id=:mat_hypertable_id \gset @@ -288,7 +288,7 @@ CREATE MATERIALIZED VIEW test_continuous_agg_view FROM test_continuous_agg_table GROUP BY 1 WITH NO DATA; -SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, -2::integer, '12 h'::interval); +SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval); SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset diff --git a/tsl/test/sql/continuous_aggs_policy.sql b/tsl/test/sql/continuous_aggs_policy.sql index f7df055c8..cff626daa 100644 --- a/tsl/test/sql/continuous_aggs_policy.sql +++ b/tsl/test/sql/continuous_aggs_policy.sql @@ -40,12 +40,15 @@ SET ROLE :ROLE_DEFAULT_PERM_USER; SELECT count(*) FROM _timescaledb_config.bgw_job; \set ON_ERROR_STOP 0 +\set VERBOSITY default SELECT add_continuous_aggregate_policy('int_tab', '1 day'::interval, 10 , '1 h'::interval); SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 , '1 h'::interval); SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 ); SELECT add_continuous_aggregate_policy('mat_m1', 10, '1 day'::interval, '1 h'::interval); --start_interval < end_interval -SELECT add_continuous_aggregate_policy('mat_m1', 5, 10, '1h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('mat_m1', 5, 10, '1h'::interval); +--refresh window less than two buckets +SELECT add_continuous_aggregate_policy('mat_m1', 11, 10, '1h'::interval); SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id \gset --adding again should warn/error @@ -56,12 +59,18 @@ SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_ -- modify config and try to add, should error out SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; SELECT hypertable_id as mat_id FROM _timescaledb_config.bgw_job where id = :job_id \gset +\set VERBOSITY terse +\set ON_ERROR_STOP 1 + \c :TEST_DBNAME :ROLE_SUPERUSER UPDATE _timescaledb_config.bgw_job SET config = jsonb_build_object('mat_hypertable_id', :mat_id) WHERE id = :job_id; SET ROLE :ROLE_DEFAULT_PERM_USER; SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; + +\set ON_ERROR_STOP 0 +\set VERBOSITY default SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true); SELECT remove_continuous_aggregate_policy('int_tab'); @@ -74,6 +83,8 @@ SELECT remove_continuous_aggregate_policy('mat_m1', if_not_exists=>true); --should fail SET ROLE :ROLE_DEFAULT_PERM_USER_2; SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id ; +\set VERBOSITY terse +\set ON_ERROR_STOP 1 SET ROLE :ROLE_DEFAULT_PERM_USER; DROP MATERIALIZED VIEW mat_m1; @@ -82,52 +93,69 @@ DROP MATERIALIZED VIEW mat_m1; CREATE TABLE continuous_agg_max_mat_date(time DATE); SELECT create_hypertable('continuous_agg_max_mat_date', 'time'); CREATE MATERIALIZED VIEW max_mat_view_date - WITH (timescaledb.continuous, timescaledb.materialized_only=true) - AS SELECT time_bucket('7 days', time) - FROM continuous_agg_max_mat_date - GROUP BY 1 WITH NO DATA; + WITH (timescaledb.continuous, timescaledb.materialized_only=true) + AS SELECT time_bucket('7 days', time) + FROM continuous_agg_max_mat_date + GROUP BY 1 WITH NO DATA; \set ON_ERROR_STOP 0 +\set VERBOSITY default SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval); --start_interval < end_interval SELECT add_continuous_aggregate_policy('max_mat_view_date', '1 day'::interval, '2 days'::interval , '1 day'::interval) ; +--interval less than two buckets +SELECT add_continuous_aggregate_policy('max_mat_view_date', '7 days', '1 day', '1 day'::interval); +SELECT add_continuous_aggregate_policy('max_mat_view_date', '14 days', '1 day', '1 day'::interval); +SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-10 hours', '1 day'::interval); +\set VERBOSITY terse \set ON_ERROR_STOP 1 -SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset + +-- Negative start offset gives two bucket window: +SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-1 day', '1 day'::interval); +SELECT remove_continuous_aggregate_policy('max_mat_view_date'); +-- Both offsets NULL: +SELECT add_continuous_aggregate_policy('max_mat_view_date', NULL, NULL, '1 day'::interval); +SELECT remove_continuous_aggregate_policy('max_mat_view_date'); + +SELECT add_continuous_aggregate_policy('max_mat_view_date', '15 days', '1 day', '1 day'::interval) as job_id \gset SELECT config FROM _timescaledb_config.bgw_job WHERE id = :job_id; INSERT INTO continuous_agg_max_mat_date - SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day'); + SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day'); --- to prevent NOTICES set message level to warning -SET client_min_messages TO warning; +SET client_min_messages TO warning; CALL run_job(:job_id); -RESET client_min_messages ; +RESET client_min_messages; DROP MATERIALIZED VIEW max_mat_view_date; CREATE TABLE continuous_agg_timestamp(time TIMESTAMP); SELECT create_hypertable('continuous_agg_timestamp', 'time'); CREATE MATERIALIZED VIEW max_mat_view_timestamp - WITH (timescaledb.continuous, timescaledb.materialized_only=true) - AS SELECT time_bucket('7 days', time) - FROM continuous_agg_timestamp - GROUP BY 1 WITH NO DATA; + WITH (timescaledb.continuous, timescaledb.materialized_only=true) + AS SELECT time_bucket('7 days', time) + FROM continuous_agg_timestamp + GROUP BY 1 WITH NO DATA; + +--the start offset overflows the smallest time value, but is capped at +--the min value +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval); +SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp'); \set ON_ERROR_STOP 0 ---will overflow at runtime even though policy check works -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval) as job_id \gset -CALL run_job(:job_id); - --- bad timestamps at runtime even though policy check works -SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp'); -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '301 days', '10 months' , '1 h'::interval) as job_id \gset -CALL run_job(:job_id); - +\set VERBOSITY default +--start and end offset capped at the lowest time value, which means +--zero size window +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '900000 years' , '1 h'::interval); +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '301 days', '10 months' , '1 h'::interval); +\set VERBOSITY terse \set ON_ERROR_STOP 1 -SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp'); -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset + +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 days', '1 h'::interval , '1 h'::interval) as job_id \gset + --- to prevent NOTICES set message level to warning -SET client_min_messages TO warning; +SET client_min_messages TO warning; CALL run_job(:job_id); RESET client_min_messages ; @@ -142,7 +170,7 @@ WHERE id = :job_id; SET ROLE :ROLE_DEFAULT_PERM_USER; SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; \set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true); +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 day', '1 day', '1h'::interval, if_not_exists=>true); SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true); \set ON_ERROR_STOP 1 @@ -161,9 +189,11 @@ SELECT time_bucket( SMALLINT '1', a) , count(*) FROM smallint_tab GROUP BY 1 WITH NO DATA; \set ON_ERROR_STOP 0 +\set VERBOSITY default SELECT add_continuous_aggregate_policy('mat_smallint', 15, 0 , '1 h'::interval); SELECT add_continuous_aggregate_policy('mat_smallint', 98898::smallint , 0::smallint, '1 h'::interval); SELECT add_continuous_aggregate_policy('mat_smallint', 5::smallint, 10::smallint , '1 h'::interval) as job_id \gset +\set VERBOSITY terse \set ON_ERROR_STOP 1 SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 0::smallint , '1 h'::interval) as job_id \gset INSERT INTO smallint_tab VALUES(5); @@ -189,7 +219,7 @@ CALL run_job(:job_id); \set ON_ERROR_STOP 1 SELECT * FROM mat_smallint ORDER BY 1; --- overflow start_interval. now this runs as range is capped [-32768, -32765) +-- overflow start_interval. now this runs as range is capped [-32768, -32765) INSERT INTO smallint_tab VALUES( -32760 ); SELECT maxval, maxval - 10, maxval -5 FROM integer_now_smallint_tab() as maxval; CALL run_job(:job_id); @@ -201,21 +231,21 @@ CALL refresh_continuous_aggregate('mat_smallint', NULL, NULL); SELECT * FROM mat_smallint ORDER BY 1; -- Case 2: overflow by subtracting from PG_INT16_MAX ---overflow start and end . will fail as range is [32767, 32767] +--overflow start and end . will fail as range is [32767, 32767] SELECT remove_continuous_aggregate_policy('mat_smallint'); INSERT INTO smallint_tab VALUES( 32766 ); INSERT INTO smallint_tab VALUES( 32767 ); SELECT maxval, maxval - (-1), maxval - (-2) FROM integer_now_smallint_tab() as maxval; -SELECT add_continuous_aggregate_policy('mat_smallint', -1::smallint, -2::smallint , '1 h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('mat_smallint', -1::smallint, -3::smallint , '1 h'::interval) as job_id \gset \set ON_ERROR_STOP 0 CALL run_job(:job_id); \set ON_ERROR_STOP 1 SELECT * FROM mat_smallint ORDER BY 1; SELECT remove_continuous_aggregate_policy('mat_smallint'); ---overflow end . will work range is [32765, 32767) +--overflow end . will work range is [32765, 32767) SELECT maxval, maxval - (1), maxval - (-2) FROM integer_now_smallint_tab() as maxval; -SELECT add_continuous_aggregate_policy('mat_smallint', 1::smallint, -2::smallint , '1 h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('mat_smallint', 1::smallint, -3::smallint , '1 h'::interval) as job_id \gset \set ON_ERROR_STOP 0 CALL run_job(:job_id); SELECT * FROM mat_smallint ORDER BY 1; diff --git a/tsl/test/sql/continuous_aggs_policy_run.sql b/tsl/test/sql/continuous_aggs_policy_run.sql index 5f7488966..d252f1a47 100644 --- a/tsl/test/sql/continuous_aggs_policy_run.sql +++ b/tsl/test/sql/continuous_aggs_policy_run.sql @@ -40,7 +40,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp FROM continuous_agg_timestamp GROUP BY 1 WITH NO DATA; -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset +SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 days', '1 h'::interval , '1 h'::interval) as job_id \gset INSERT INTO continuous_agg_timestamp SELECT generate_series('2019-09-01 00:00'::timestamp, '2019-09-10 00:00'::timestamp, '1 day'); --- to prevent NOTICES set message level to warning diff --git a/tsl/test/sql/continuous_aggs_refresh.sql b/tsl/test/sql/continuous_aggs_refresh.sql index a801302ab..c64b637d9 100644 --- a/tsl/test/sql/continuous_aggs_refresh.sql +++ b/tsl/test/sql/continuous_aggs_refresh.sql @@ -109,7 +109,6 @@ CALL refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-01'); -- Bad time input CALL refresh_continuous_aggregate('daily_temp', '2020-05-01'::text, '2020-05-03'::text); CALL refresh_continuous_aggregate('daily_temp', 0, '2020-05-01'); - \set ON_ERROR_STOP 1 -- Test different time types