Validate continuous aggregate policy

This change adds validation of the settings for a continuous aggregate
policy when the policy is created. Previously it was possible to
create policies that would either fail at runtime or never refresh
anything due to bad configuration.

In particular, the refresh window (start and end offsets for
refreshing) must now be at least two buckets in size or an error is
generated when the policy is created. The policy must cover at least
two buckets to ensure that at least one bucket is refreshed when the
policy runs, since it is unlikely that the policy runs at a time that
is perfectly aligned with the beginning of a bucket.

Note that it is still possible to create policies that might not
refresh anything depending on the time when it runs. For instance, if
the "current" time is close to the minimum allowed time value, the
refresh window can lag enough to fall outside the valid time range
(e.g., the end offset is big enough to push the window outside the
valid time range). As time moves on, the window would eventually move
into the valid time range, however.

Fixes #2929
This commit is contained in:
Erik Nordström 2021-02-11 13:16:29 +01:00 committed by Erik Nordström
parent ff7fb0aedf
commit 82a30ae420
17 changed files with 595 additions and 272 deletions

View File

@ -12,6 +12,7 @@ accidentally triggering the load of a previous DB version.**
**Bugfixes**
* #2883 Fix join qual propagation for nested joins
* #2908 Fix changing column type of clustered hypertables
* #2942 Validate continuous aggregate policy
**Thanks**
* @zeeshanshabbir93 for reporting an issue with joins

View File

@ -199,9 +199,18 @@ static void
continuous_agg_init(ContinuousAgg *cagg, const Form_continuous_agg fd)
{
Oid nspid = get_namespace_oid(NameStr(fd->user_view_schema), false);
Hypertable *cagg_ht = ts_hypertable_get_by_id(fd->mat_hypertable_id);
Dimension *time_dim;
Assert(NULL != cagg_ht);
time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0);
Assert(NULL != time_dim);
cagg->partition_type = ts_dimension_get_partition_type(time_dim);
cagg->relid = get_relname_relid(NameStr(fd->user_view_name), nspid);
memcpy(&cagg->data, fd, sizeof(cagg->data));
Assert(OidIsValid(cagg->relid));
Assert(OidIsValid(cagg->partition_type));
}
TSDLLEXPORT ContinuousAggHypertableStatus
@ -294,12 +303,11 @@ ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id)
return ca;
}
ContinuousAgg *
ts_continuous_agg_find_by_view_name(const char *schema, const char *name,
ContinuousAggViewType type)
static bool
continuous_agg_fill_form_data(const char *schema, const char *name, ContinuousAggViewType type,
FormData_continuous_agg *fd)
{
ScanIterator iterator;
ContinuousAgg *ca = NULL;
AttrNumber view_name_attrnum = 0;
AttrNumber schema_name_attrnum = 0;
int count = 0;
@ -353,8 +361,7 @@ ts_continuous_agg_find_by_view_name(const char *schema, const char *name,
if (vtype != ContinuousAggAnyView)
{
ca = ts_scan_iterator_alloc_result(&iterator, sizeof(*ca));
continuous_agg_init(ca, data);
memcpy(fd, data, sizeof(*fd));
count++;
}
@ -364,6 +371,22 @@ ts_continuous_agg_find_by_view_name(const char *schema, const char *name,
Assert(count <= 1);
return count == 1;
}
ContinuousAgg *
ts_continuous_agg_find_by_view_name(const char *schema, const char *name,
ContinuousAggViewType type)
{
FormData_continuous_agg fd;
ContinuousAgg *ca;
if (!continuous_agg_fill_form_data(schema, name, type, &fd))
return NULL;
ca = palloc0(sizeof(ContinuousAgg));
continuous_agg_init(ca, &fd);
return ca;
}
@ -626,12 +649,12 @@ ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id)
/* Block dropping the partial and direct view if the continuous aggregate still exists */
static void
drop_internal_view(ContinuousAgg *agg)
drop_internal_view(const FormData_continuous_agg *fd)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
int count = 0;
init_scan_by_mat_hypertable_id(&iterator, agg->data.mat_hypertable_id);
init_scan_by_mat_hypertable_id(&iterator, fd->mat_hypertable_id);
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
@ -647,25 +670,37 @@ drop_internal_view(ContinuousAgg *agg)
}
/* This gets called when a view gets dropped. */
void
ts_continuous_agg_drop_view_callback(ContinuousAgg *ca, const char *schema, const char *name)
static void
continuous_agg_drop_view_callback(FormData_continuous_agg *fd, const char *schema, const char *name)
{
ContinuousAggViewType vtyp;
vtyp = ts_continuous_agg_view_type(&ca->data, schema, name);
vtyp = ts_continuous_agg_view_type(fd, schema, name);
switch (vtyp)
{
case ContinuousAggUserView:
drop_continuous_agg(&ca->data, false /* The user view has already been dropped */);
drop_continuous_agg(fd, false /* The user view has already been dropped */);
break;
case ContinuousAggPartialView:
case ContinuousAggDirectView:
drop_internal_view(ca);
drop_internal_view(fd);
break;
default:
elog(ERROR, "unknown continuous aggregate view type");
}
}
bool
ts_continuous_agg_drop(const char *view_schema, const char *view_name)
{
FormData_continuous_agg fd;
bool found = continuous_agg_fill_form_data(view_schema, view_name, ContinuousAggAnyView, &fd);
if (found)
continuous_agg_drop_view_callback(&fd, view_schema, view_name);
return found;
}
static inline bool
ts_continuous_agg_is_user_view_schema(FormData_continuous_agg *data, const char *schema)
{

View File

@ -37,7 +37,10 @@ extern TSDLLEXPORT WithClauseResult *ts_continuous_agg_with_clause_parse(const L
typedef struct ContinuousAgg
{
FormData_continuous_agg data;
/* Relid of the user-facing view */
Oid relid;
/* Type of the primary partitioning dimension */
Oid partition_type;
} ContinuousAgg;
typedef enum ContinuousAggHypertableStatus
@ -69,9 +72,7 @@ extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_relid(Oid relid);
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_rv(const RangeVar *rv);
extern void ts_continuous_agg_drop_view_callback(ContinuousAgg *ca, const char *schema,
const char *name);
extern bool ts_continuous_agg_drop(const char *view_schema, const char *view_name);
extern void ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id);
extern TSDLLEXPORT ContinuousAggViewType ts_continuous_agg_view_type(FormData_continuous_agg *data,

View File

@ -3827,14 +3827,7 @@ process_drop_trigger(EventTriggerDropObject *obj)
static void
process_drop_view(EventTriggerDropView *dropped_view)
{
ContinuousAgg *ca;
ca = ts_continuous_agg_find_by_view_name(dropped_view->schema,
dropped_view->view_name,
ContinuousAggAnyView);
if (ca != NULL)
ts_continuous_agg_drop_view_callback(ca, dropped_view->schema, dropped_view->view_name);
ts_continuous_agg_drop(dropped_view->schema, dropped_view->view_name);
}
static void

View File

@ -368,7 +368,7 @@ BEGIN
IF ts_version < '2.0.0' THEN
CREATE VIEW mat_inttime
WITH ( timescaledb.continuous, timescaledb.materialized_only=true,
timescaledb.ignore_invalidation_older_than = 5,
timescaledb.ignore_invalidation_older_than = 6,
timescaledb.refresh_lag = 2,
timescaledb.refresh_interval='12 hours')
AS
@ -400,7 +400,7 @@ BEGIN
FROM int_time_test
GROUP BY 1 WITH NO DATA;
PERFORM add_continuous_aggregate_policy('mat_inttime', 5, 2, '12 hours');
PERFORM add_continuous_aggregate_policy('mat_inttime', 6, 2, '12 hours');
PERFORM add_continuous_aggregate_policy('mat_inttime2', NULL, 2, '12 hours');
END IF;
END $$;

View File

@ -5,6 +5,8 @@
*/
#include <postgres.h>
#include <access/xact.h>
#include <common/int128.h>
#include <miscadmin.h>
#include <parser/parse_coerce.h>
#include <utils/acl.h>
@ -50,36 +52,62 @@ policy_continuous_aggregate_get_mat_hypertable_id(const Jsonb *config)
}
static int64
get_interval_from_config(const Dimension *dim, const Jsonb *config, const char *json_label,
bool *isnull)
get_time_from_interval(const Dimension *dim, Datum interval, Oid type)
{
Oid partitioning_type = ts_dimension_get_partition_type(dim);
if (IS_INTEGER_TYPE(type))
{
Oid now_func = ts_get_integer_now_func(dim);
int64 value = ts_interval_value_to_internal(interval, type);
Assert(now_func);
return ts_subtract_integer_from_now_saturating(now_func, value, partitioning_type);
}
else if (type == INTERVALOID)
{
Datum res = subtract_interval_from_now(DatumGetIntervalP(interval), partitioning_type);
return ts_time_value_to_internal(res, partitioning_type);
}
else
elog(ERROR, "unsupported offset type for continuous aggregate policy");
pg_unreachable();
return 0;
}
static int64
get_time_from_config(const Dimension *dim, const Jsonb *config, const char *json_label,
bool *isnull)
{
Oid partitioning_type = ts_dimension_get_partition_type(dim);
*isnull = false;
if (IS_INTEGER_TYPE(partitioning_type))
{
bool found;
int64 interval_val = ts_jsonb_get_int64_field(config, json_label, &found);
if (!found)
{
*isnull = true;
return 0;
}
Oid now_func = ts_get_integer_now_func(dim);
Assert(now_func);
return ts_subtract_integer_from_now_saturating(now_func, interval_val, partitioning_type);
return get_time_from_interval(dim, Int64GetDatum(interval_val), INT8OID);
}
else
{
Datum res;
Interval *interval_val = ts_jsonb_get_interval_field(config, json_label);
if (!interval_val)
{
*isnull = true;
return 0;
}
res = subtract_interval_from_now(interval_val, partitioning_type);
return ts_time_value_to_internal(res, partitioning_type);
return get_time_from_interval(dim, IntervalPGetDatum(interval_val), INTERVALOID);
}
}
@ -87,7 +115,7 @@ int64
policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config)
{
bool start_isnull;
int64 res = get_interval_from_config(dim, config, CONFIG_KEY_START_OFFSET, &start_isnull);
int64 res = get_time_from_config(dim, config, CONFIG_KEY_START_OFFSET, &start_isnull);
/* interpret NULL as min value for that type */
if (start_isnull)
return ts_time_get_min(ts_dimension_get_partition_type(dim));
@ -98,7 +126,7 @@ int64
policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config)
{
bool end_isnull;
int64 res = get_interval_from_config(dim, config, CONFIG_KEY_END_OFFSET, &end_isnull);
int64 res = get_time_from_config(dim, config, CONFIG_KEY_END_OFFSET, &end_isnull);
if (end_isnull)
return ts_time_get_end_or_max(ts_dimension_get_partition_type(dim));
return res;
@ -159,12 +187,13 @@ static Datum
convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const char *str_msg)
{
Oid convert_to = dim_type;
Datum converted;
if (IS_TIMESTAMP_TYPE(dim_type))
convert_to = INTERVALOID;
if (*interval_type != convert_to)
{
if (IS_TIMESTAMP_TYPE(dim_type))
convert_to = INTERVALOID;
if (!can_coerce_type(1, interval_type, &convert_to, COERCION_IMPLICIT))
{
if (IS_INTEGER_TYPE(dim_type))
@ -183,68 +212,253 @@ convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const cha
}
}
return ts_time_datum_convert_arg(interval, interval_type, convert_to);
converted = ts_time_datum_convert_arg(interval, interval_type, convert_to);
/* For integer types, first convert all types to int64 to get on a common
* type. Then check valid time ranges against the partition/dimension
* type */
switch (*interval_type)
{
case INT2OID:
converted = Int64GetDatum((int64) DatumGetInt16(converted));
break;
case INT4OID:
converted = Int64GetDatum((int64) DatumGetInt32(converted));
break;
case INT8OID:
break;
case INTERVALOID:
/* For timestamp types, we only support Interval, so nothing further
* to do. */
return converted;
default:
pg_unreachable();
break;
}
/* Cap at min and max */
if (DatumGetInt64(converted) < ts_time_get_min(dim_type))
converted = ts_time_get_min(dim_type);
else if (DatumGetInt64(converted) > ts_time_get_max(dim_type))
converted = ts_time_get_max(dim_type);
/* Convert to the desired integer type */
switch (dim_type)
{
case INT2OID:
converted = Int16GetDatum((int16) DatumGetInt64(converted));
break;
case INT4OID:
converted = Int32GetDatum((int32) DatumGetInt64(converted));
break;
case INT8OID:
/* Already int64, so nothing to do. */
break;
default:
pg_unreachable();
break;
}
*interval_type = dim_type;
return converted;
}
typedef struct CaggPolicyOffset
{
Datum value;
Oid type;
bool isnull;
const char *name;
} CaggPolicyOffset;
typedef struct CaggPolicyConfig
{
Oid partition_type;
CaggPolicyOffset offset_start;
CaggPolicyOffset offset_end;
} CaggPolicyConfig;
/*
* Convert an interval to a 128 integer value.
*
* Based on PostgreSQL's interval_cmp_value().
*/
static inline INT128
interval_to_int128(const Interval *interval)
{
INT128 span;
int64 dayfraction;
int64 days;
/*
* Separate time field into days and dayfraction, then add the month and
* day fields to the days part. We cannot overflow int64 days here.
*/
dayfraction = interval->time % USECS_PER_DAY;
days = interval->time / USECS_PER_DAY;
days += interval->month * INT64CONST(30);
days += interval->day;
/* Widen dayfraction to 128 bits */
span = int64_to_int128(dayfraction);
/* Scale up days to microseconds, forming a 128-bit product */
int128_add_int64_mul_int64(&span, days, USECS_PER_DAY);
return span;
}
static int64
interval_to_int64(Datum interval, Oid type)
{
switch (type)
{
case INT2OID:
return DatumGetInt16(interval);
case INT4OID:
return DatumGetInt32(interval);
case INT8OID:
return DatumGetInt64(interval);
case INTERVALOID:
{
const int64 max = ts_time_get_max(TIMESTAMPTZOID);
const int64 min = ts_time_get_min(TIMESTAMPTZOID);
INT128 bigres = interval_to_int128(DatumGetIntervalP(interval));
if (int128_compare(bigres, int64_to_int128(max)) >= 0)
return max;
else if (int128_compare(bigres, int64_to_int128(min)) <= 0)
return min;
else
return int128_to_int64(bigres);
}
default:
break;
}
pg_unreachable();
return 0;
}
static const char *
two_buckets_to_str(const ContinuousAgg *cagg)
{
Oid bucket_type;
Oid outfuncid;
int64 two_buckets;
Datum min_range;
bool isvarlena;
if (IS_TIMESTAMP_TYPE(cagg->partition_type))
bucket_type = INTERVALOID;
else
bucket_type = cagg->partition_type;
two_buckets = ts_time_saturating_add(cagg->data.bucket_width,
cagg->data.bucket_width,
cagg->partition_type);
min_range = ts_internal_to_interval_value(two_buckets, bucket_type);
getTypeOutputInfo(bucket_type, &outfuncid, &isvarlena);
Assert(!isvarlena);
return DatumGetCString(OidFunctionCall1(outfuncid, min_range));
}
/*
* Enforce that a policy has a refresh window of at least two buckets to
* ensure we materialize at least one bucket each run.
*
* Why two buckets? Note that the policy probably won't execute at at time
* that exactly aligns with a bucket boundary, so a window of one bucket
* might not cover a full bucket that we want to materialize:
*
* Refresh window: [-----)
* Materialized buckets: |-----|-----|-----|
*/
static void
validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config)
{
int64 start_offset;
int64 end_offset;
if (config->offset_start.isnull)
start_offset = ts_time_get_max(cagg->partition_type);
else
start_offset = interval_to_int64(config->offset_start.value, config->offset_start.type);
if (config->offset_end.isnull)
end_offset = ts_time_get_min(cagg->partition_type);
else
end_offset = interval_to_int64(config->offset_end.value, config->offset_end.type);
if (ts_time_saturating_add(end_offset, cagg->data.bucket_width * 2, INT8OID) > start_offset)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("policy refresh window too small"),
errdetail("The start and end offsets must cover at least"
" two buckets in the valid time range of type \"%s\".",
format_type_be(cagg->partition_type)),
errhint("Use a start and end offset that specifies"
" a window of at least %s.",
two_buckets_to_str(cagg))));
}
static void
check_valid_interval_values(Oid interval_type, Datum start_offset, Datum end_offset)
parse_offset_arg(const ContinuousAgg *cagg, const FunctionCallInfo fcinfo, CaggPolicyOffset *offset,
int argnum)
{
bool valid = true;
if (IS_INTEGER_TYPE(interval_type))
offset->isnull = PG_ARGISNULL(argnum);
if (!offset->isnull)
{
switch (interval_type)
{
case INT2OID:
{
if (DatumGetInt16(start_offset) <= DatumGetInt16(end_offset))
valid = false;
break;
}
case INT4OID:
{
if (DatumGetInt32(start_offset) <= DatumGetInt32(end_offset))
valid = false;
break;
}
case INT8OID:
{
if (DatumGetInt64(start_offset) <= DatumGetInt64(end_offset))
valid = false;
break;
}
}
}
else
{
Assert(interval_type == INTERVALOID);
valid = DatumGetBool(DirectFunctionCall2(interval_gt, start_offset, end_offset));
}
if (!valid)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("start interval should be greater than end interval")));
Oid type = get_fn_expr_argtype(fcinfo->flinfo, argnum);
Datum arg = PG_GETARG_DATUM(argnum);
offset->value = convert_interval_arg(cagg->partition_type, arg, &type, offset->name);
offset->type = type;
}
}
static void
parse_cagg_policy_config(const ContinuousAgg *cagg, const FunctionCallInfo fcinfo,
CaggPolicyConfig *config)
{
MemSet(config, 0, sizeof(CaggPolicyConfig));
config->partition_type = cagg->partition_type;
/* This might seem backwards, but since we are dealing with offsets, start
* actually translates to max and end to min for maximum window. */
config->offset_start.value = ts_time_datum_get_max(config->partition_type);
config->offset_end.value = ts_time_datum_get_min(config->partition_type);
config->offset_start.type = config->offset_end.type =
IS_TIMESTAMP_TYPE(cagg->partition_type) ? INTERVALOID : cagg->partition_type;
config->offset_start.name = CONFIG_KEY_START_OFFSET;
config->offset_end.name = CONFIG_KEY_END_OFFSET;
parse_offset_arg(cagg, fcinfo, &config->offset_start, 1);
parse_offset_arg(cagg, fcinfo, &config->offset_end, 2);
Assert(config->offset_start.type == config->offset_end.type);
validate_window_size(cagg, config);
}
Datum
policy_refresh_cagg_add(PG_FUNCTION_ARGS)
{
NameData application_name;
NameData refresh_name;
NameData proc_name, proc_schema, owner;
Cache *hcache;
Hypertable *mat_ht;
Dimension *dim;
ContinuousAgg *cagg;
int32 job_id, mat_htid;
Datum start_offset, end_offset;
CaggPolicyConfig policyconf;
int32 job_id;
Interval refresh_interval;
Oid dim_type, start_offset_type, end_offset_type;
Oid cagg_oid, owner_id;
List *jobs;
JsonbParseState *parse_state = NULL;
bool if_not_exists, start_isnull, end_isnull;
bool if_not_exists;
/* Verify that the owner can create a background worker */
cagg_oid = PG_GETARG_OID(0);
@ -257,49 +471,20 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid))));
hcache = ts_hypertable_cache_pin();
mat_htid = cagg->data.mat_hypertable_id;
mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, mat_htid);
dim = hyperspace_get_open_dimension(mat_ht->space, 0);
dim_type = ts_dimension_get_partition_type(dim);
ts_cache_release(hcache);
/* Try to convert the argument to the time type used by the
* continuous aggregate */
start_offset = PG_GETARG_DATUM(1);
end_offset = PG_GETARG_DATUM(2);
start_isnull = PG_ARGISNULL(1);
end_isnull = PG_ARGISNULL(2);
start_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
end_offset_type = get_fn_expr_argtype(fcinfo->flinfo, 2);
if (!start_isnull)
start_offset = convert_interval_arg(dim_type,
start_offset,
&start_offset_type,
CONFIG_KEY_START_OFFSET);
if (!end_isnull)
end_offset =
convert_interval_arg(dim_type, end_offset, &end_offset_type, CONFIG_KEY_END_OFFSET);
if (!start_isnull && !end_isnull)
{
Assert(start_offset_type == end_offset_type);
check_valid_interval_values(start_offset_type, start_offset, end_offset);
}
parse_cagg_policy_config(cagg, fcinfo, &policyconf);
if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL schedule interval")));
refresh_interval = *PG_GETARG_INTERVAL_P(3);
if_not_exists = PG_GETARG_BOOL(4);
/* Make sure there is only 1 refresh policy on the cagg */
jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME,
INTERNAL_SCHEMA_NAME,
mat_htid);
cagg->data.mat_hypertable_id);
if (jobs != NIL)
{
@ -313,14 +498,14 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
if (policy_config_check_hypertable_lag_equality(existing->fd.config,
CONFIG_KEY_START_OFFSET,
dim_type,
start_offset_type,
start_offset) &&
cagg->partition_type,
policyconf.offset_start.type,
policyconf.offset_start.value) &&
policy_config_check_hypertable_lag_equality(existing->fd.config,
CONFIG_KEY_END_OFFSET,
dim_type,
end_offset_type,
end_offset))
cagg->partition_type,
policyconf.offset_end.type,
policyconf.offset_end.value))
{
/* If all arguments are the same, do nothing */
ereport(NOTICE,
@ -348,19 +533,20 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
namestrcpy(&owner, GetUserNameFromId(owner_id, false));
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
ts_jsonb_add_int32(parse_state, CONFIG_KEY_MAT_HYPERTABLE_ID, mat_htid);
if (!start_isnull)
ts_jsonb_add_int32(parse_state, CONFIG_KEY_MAT_HYPERTABLE_ID, cagg->data.mat_hypertable_id);
if (!policyconf.offset_start.isnull)
json_add_dim_interval_value(parse_state,
CONFIG_KEY_START_OFFSET,
start_offset_type,
start_offset);
policyconf.offset_start.type,
policyconf.offset_start.value);
else
ts_jsonb_add_null(parse_state, CONFIG_KEY_START_OFFSET);
if (!end_isnull)
if (!policyconf.offset_end.isnull)
json_add_dim_interval_value(parse_state,
CONFIG_KEY_END_OFFSET,
end_offset_type,
end_offset);
policyconf.offset_end.type,
policyconf.offset_end.value);
else
ts_jsonb_add_null(parse_state, CONFIG_KEY_END_OFFSET);
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
@ -376,7 +562,7 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
&proc_name,
&owner,
true,
mat_htid,
cagg->data.mat_hypertable_id,
config);
PG_RETURN_INT32(job_id);

View File

@ -323,13 +323,9 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
PolicyContinuousAggData policy_data;
policy_refresh_cagg_read_and_validate_config(config, &policy_data);
elog(LOG,
"refresh continuous aggregate range %s , %s",
ts_internal_to_time_string(policy_data.refresh_window.start,
policy_data.refresh_window.type),
ts_internal_to_time_string(policy_data.refresh_window.end,
policy_data.refresh_window.type));
continuous_agg_refresh_internal(policy_data.cagg, &policy_data.refresh_window, false);
continuous_agg_refresh_internal(policy_data.cagg,
&policy_data.refresh_window,
CAGG_REFRESH_POLICY);
return true;
}

View File

@ -1787,8 +1787,6 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
{
Oid relid;
ContinuousAgg *cagg;
Hypertable *cagg_ht;
Dimension *time_dim;
InternalTimeRange refresh_window = {
.type = InvalidOid,
};
@ -1803,13 +1801,11 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
relid = get_relname_relid(stmt->into->rel->relname, nspid);
cagg = ts_continuous_agg_find_by_relid(relid);
Assert(cagg != NULL);
cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0);
refresh_window.type = ts_dimension_get_partition_type(time_dim);
refresh_window.type = cagg->partition_type;
refresh_window.start = ts_time_get_min(refresh_window.type);
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);
continuous_agg_refresh_internal(cagg, &refresh_window, true);
continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION);
}
return DDL_DONE;
}

View File

@ -274,9 +274,6 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang
Oid outfuncid = InvalidOid;
bool isvarlena;
if (client_min_messages > elevel)
return;
start_ts = ts_internal_to_time_value(refresh_window->start, refresh_window->type);
end_ts = ts_internal_to_time_value(refresh_window->end, refresh_window->type);
getTypeOutputInfo(refresh_window->type, &outfuncid, &isvarlena);
@ -360,15 +357,6 @@ get_cagg_by_relid(const Oid cagg_relid)
return cagg;
}
static Oid
get_partition_type_by_cagg(const ContinuousAgg *const cagg)
{
Hypertable *cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
Dimension *time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0);
return ts_dimension_get_partition_type(time_dim);
}
#define REFRESH_FUNCTION_NAME "refresh_continuous_aggregate()"
/*
* Refresh a continuous aggregate across the given window.
@ -383,7 +371,7 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
};
cagg = get_cagg_by_relid(cagg_relid);
refresh_window.type = get_partition_type_by_cagg(cagg);
refresh_window.type = cagg->partition_type;
if (!PG_ARGISNULL(1))
refresh_window.start = ts_time_value_from_arg(PG_GETARG_DATUM(1),
@ -399,22 +387,31 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
else
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);
continuous_agg_refresh_internal(cagg, &refresh_window, false);
continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_WINDOW);
PG_RETURN_VOID();
}
static void
emit_up_to_date_notice(const ContinuousAgg *cagg)
emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshCallContext callctx)
{
elog(NOTICE,
"continuous aggregate \"%s\" is already up-to-date",
NameStr(cagg->data.user_view_name));
switch (callctx)
{
case CAGG_REFRESH_CHUNK:
case CAGG_REFRESH_WINDOW:
case CAGG_REFRESH_CREATION:
elog(NOTICE,
"continuous aggregate \"%s\" is already up-to-date",
NameStr(cagg->data.user_view_name));
break;
case CAGG_REFRESH_POLICY:
break;
}
}
static bool
process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window, bool verbose,
int32 chunk_id)
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx, int32 chunk_id)
{
InvalidationStore *invalidations;
Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id);
@ -432,7 +429,7 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
if (invalidations != NULL)
{
if (verbose)
if (callctx == CAGG_REFRESH_CREATION)
{
Assert(OidIsValid(cagg->relid));
ereport(NOTICE,
@ -450,7 +447,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
void
continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window_arg, bool verbose)
const InternalTimeRange *refresh_window_arg,
const CaggRefreshCallContext callctx)
{
Catalog *catalog = ts_catalog_get();
int32 mat_id = cagg->data.mat_hypertable_id;
@ -486,7 +484,10 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
errhint("Align the refresh window with the bucket"
" time zone or use at least two buckets.")));
log_refresh_window(DEBUG1, cagg, &refresh_window, "refreshing continuous aggregate");
log_refresh_window(callctx == CAGG_REFRESH_POLICY ? LOG : DEBUG1,
cagg,
&refresh_window,
"refreshing continuous aggregate");
/* Perform the refresh across two transactions.
*
@ -530,7 +531,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
* nothing to refresh in that case */
if (refresh_window.start >= refresh_window.end)
{
emit_up_to_date_notice(cagg);
emit_up_to_date_notice(cagg, callctx);
return;
}
@ -544,8 +545,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
StartTransactionCommand();
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id);
if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, verbose, INVALID_CHUNK_ID))
emit_up_to_date_notice(cagg);
if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID))
emit_up_to_date_notice(cagg, callctx);
}
/*
@ -565,7 +566,7 @@ continuous_agg_refresh_chunk(PG_FUNCTION_ARGS)
Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
Catalog *catalog = ts_catalog_get();
const InternalTimeRange refresh_window = {
.type = get_partition_type_by_cagg(cagg),
.type = cagg->partition_type,
.start = ts_chunk_primary_dimension_start(chunk),
.end = ts_chunk_primary_dimension_end(chunk),
};
@ -596,7 +597,7 @@ continuous_agg_refresh_chunk(PG_FUNCTION_ARGS)
invalidation_process_hypertable_log(cagg);
/* Must make invalidation processing visible */
CommandCounterIncrement();
process_cagg_invalidations_and_refresh(cagg, &refresh_window, false, chunk->fd.id);
process_cagg_invalidations_and_refresh(cagg, &refresh_window, CAGG_REFRESH_CHUNK, chunk->fd.id);
PG_RETURN_VOID();
}

View File

@ -12,9 +12,18 @@
#include "materialize.h"
typedef enum CaggRefreshCallContext
{
CAGG_REFRESH_CREATION,
CAGG_REFRESH_WINDOW,
CAGG_REFRESH_CHUNK,
CAGG_REFRESH_POLICY,
} CaggRefreshCallContext;
extern Datum continuous_agg_refresh(PG_FUNCTION_ARGS);
extern Datum continuous_agg_refresh_chunk(PG_FUNCTION_ARGS);
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window, bool verbose);
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx);
#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_REFRESH_H */

View File

@ -138,11 +138,11 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-----------+--------------------------------------------+----------------------------------------------------
msg_no | mock_time | application_name | msg
--------+-----------+--------------------------------------------+-----------------------------------------------------------------------------------------
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
1 | 0 | DB Scheduler | [TESTING] Wait until 25000, started at 0
0 | 0 | Refresh Continuous Aggregate Policy [1000] | refresh continuous aggregate range -2147483648 , 6
0 | 0 | Refresh Continuous Aggregate Policy [1000] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -2147483648, 6 ]
(3 rows)
SELECT * FROM _timescaledb_config.bgw_job where id=:job_id;
@ -356,7 +356,7 @@ CREATE MATERIALIZED VIEW test_continuous_agg_view
AS SELECT time_bucket('2', time), SUM(data) as value
FROM test_continuous_agg_table
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, -2::integer, '12 h'::interval);
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval);
add_continuous_aggregate_policy
---------------------------------
1001
@ -371,11 +371,11 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-----------+--------------------------------------------+-----------------------------------------------------
msg_no | mock_time | application_name | msg
--------+-----------+--------------------------------------------+----------------------------------------------------------------------------------
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
1 | 0 | DB Scheduler | [TESTING] Wait until 25000, started at 0
0 | 0 | Refresh Continuous Aggregate Policy [1001] | refresh continuous aggregate range -2147483648 , 12
0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ]
(3 rows)
-- job ran once, successfully
@ -416,14 +416,14 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25, 25);
(1 row)
SELECT * FROM sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+----------------------------------------------------------
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+----------------------------------------------------------------------------------
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
1 | 0 | DB Scheduler | [TESTING] Wait until 25000, started at 0
0 | 0 | Refresh Continuous Aggregate Policy [1001] | refresh continuous aggregate range -2147483648 , 12
0 | 0 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ]
0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 43200000000 | DB Scheduler | [TESTING] Wait until 43200025000, started at 43200000000
0 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | refresh continuous aggregate range -2147483648 , 12
0 | 43200000000 | Refresh Continuous Aggregate Policy [1001] | refreshing continuous aggregate "test_continuous_agg_view" in window [ -90, 12 ]
(6 rows)
SELECT job_id, next_start - last_finish as until_next, last_run_success, total_runs, total_successes, total_failures, total_crashes
@ -484,7 +484,7 @@ CREATE MATERIALIZED VIEW test_continuous_agg_view
AS SELECT time_bucket('2', time), SUM(data) as value, get_constant_no_perms()
FROM test_continuous_agg_table
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, -2::integer, '12 h'::interval);
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval);
add_continuous_aggregate_policy
---------------------------------
1002
@ -635,11 +635,11 @@ SELECT * FROM test_continuous_agg_view_user_2;
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
SELECT * from sorted_bgw_log;
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+---------------------------------------------------------------
msg_no | mock_time | application_name | msg
--------+-------------+--------------------------------------------+------------------------------------------------------------------------------------------------
0 | 0 | DB Scheduler | [TESTING] Registered new background worker
1 | 0 | DB Scheduler | [TESTING] Wait until 25000, started at 0
0 | 0 | Refresh Continuous Aggregate Policy [1003] | refresh continuous aggregate range -2147483648 , 3
0 | 0 | Refresh Continuous Aggregate Policy [1003] | refreshing continuous aggregate "test_continuous_agg_view_user_2" in window [ -2147483648, 2 ]
0 | 43200000000 | DB Scheduler | [TESTING] Registered new background worker
1 | 43200000000 | DB Scheduler | [TESTING] Wait until 43200025000, started at 43200000000
0 | 43200000000 | Refresh Continuous Aggregate Policy [1003] | job 1003 threw an error

View File

@ -45,23 +45,38 @@ SELECT count(*) FROM _timescaledb_config.bgw_job;
(1 row)
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('int_tab', '1 day'::interval, 10 , '1 h'::interval);
ERROR: "int_tab" is not a continuous aggregate
SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 , '1 h'::interval);
ERROR: invalid parameter value for start_offset
HINT: Use time interval of type integer with the continuous aggregate.
SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 );
ERROR: function add_continuous_aggregate_policy(unknown, interval, integer) does not exist at character 8
ERROR: function add_continuous_aggregate_policy(unknown, interval, integer) does not exist
LINE 1: SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::in...
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT add_continuous_aggregate_policy('mat_m1', 10, '1 day'::interval, '1 h'::interval);
ERROR: invalid parameter value for end_offset
HINT: Use time interval of type integer with the continuous aggregate.
--start_interval < end_interval
SELECT add_continuous_aggregate_policy('mat_m1', 5, 10, '1h'::interval) as job_id \gset
ERROR: start interval should be greater than end interval
SELECT add_continuous_aggregate_policy('mat_m1', 5, 10, '1h'::interval);
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "integer".
HINT: Use a start and end offset that specifies a window of at least 2.
--refresh window less than two buckets
SELECT add_continuous_aggregate_policy('mat_m1', 11, 10, '1h'::interval);
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "integer".
HINT: Use a start and end offset that specifies a window of at least 2.
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id \gset
--adding again should warn/error
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>false);
ERROR: continuous aggregate policy already exists for "mat_m1"
SELECT add_continuous_aggregate_policy('mat_m1', 20, 15, '1h'::interval, if_not_exists=>true);
WARNING: continuous aggregate policy already exists for "mat_m1"
DETAIL: A policy already exists with different arguments.
HINT: Remove the existing policy before adding a new one.
add_continuous_aggregate_policy
---------------------------------
-1
@ -82,6 +97,8 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
(1 row)
SELECT hypertable_id as mat_id FROM _timescaledb_config.bgw_job where id = :job_id \gset
\set VERBOSITY terse
\set ON_ERROR_STOP 1
\c :TEST_DBNAME :ROLE_SUPERUSER
UPDATE _timescaledb_config.bgw_job
SET config = jsonb_build_object('mat_hypertable_id', :mat_id)
@ -93,6 +110,8 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
{"mat_hypertable_id": 2}
(1 row)
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true);
ERROR: could not find start_offset in config for existing job
SELECT remove_continuous_aggregate_policy('int_tab');
@ -118,6 +137,8 @@ NOTICE: continuous aggregate policy not found for "mat_m1", skipping
SET ROLE :ROLE_DEFAULT_PERM_USER_2;
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id ;
ERROR: must be owner of continuous aggregate "mat_m1"
\set VERBOSITY terse
\set ON_ERROR_STOP 1
SET ROLE :ROLE_DEFAULT_PERM_USER;
DROP MATERIALIZED VIEW mat_m1;
--- code coverage tests : add policy for timestamp and date based table ---
@ -130,31 +151,75 @@ NOTICE: adding not-null constraint to column "time"
(1 row)
CREATE MATERIALIZED VIEW max_mat_view_date
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_max_mat_date
GROUP BY 1 WITH NO DATA;
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_max_mat_date
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
ERROR: invalid parameter value for end_offset
HINT: Use time interval with a continuous aggregate using timestamp-based time bucket.
--start_interval < end_interval
SELECT add_continuous_aggregate_policy('max_mat_view_date', '1 day'::interval, '2 days'::interval , '1 day'::interval) ;
ERROR: start interval should be greater than end interval
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "date".
HINT: Use a start and end offset that specifies a window of at least @ 14 days.
--interval less than two buckets
SELECT add_continuous_aggregate_policy('max_mat_view_date', '7 days', '1 day', '1 day'::interval);
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "date".
HINT: Use a start and end offset that specifies a window of at least @ 14 days.
SELECT add_continuous_aggregate_policy('max_mat_view_date', '14 days', '1 day', '1 day'::interval);
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "date".
HINT: Use a start and end offset that specifies a window of at least @ 14 days.
SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-10 hours', '1 day'::interval);
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "date".
HINT: Use a start and end offset that specifies a window of at least @ 14 days.
\set VERBOSITY terse
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset
-- Negative start offset gives two bucket window:
SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-1 day', '1 day'::interval);
add_continuous_aggregate_policy
---------------------------------
1001
(1 row)
SELECT remove_continuous_aggregate_policy('max_mat_view_date');
remove_continuous_aggregate_policy
------------------------------------
(1 row)
-- Both offsets NULL:
SELECT add_continuous_aggregate_policy('max_mat_view_date', NULL, NULL, '1 day'::interval);
add_continuous_aggregate_policy
---------------------------------
1002
(1 row)
SELECT remove_continuous_aggregate_policy('max_mat_view_date');
remove_continuous_aggregate_policy
------------------------------------
(1 row)
SELECT add_continuous_aggregate_policy('max_mat_view_date', '15 days', '1 day', '1 day'::interval) as job_id \gset
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
config
-------------------------------------------------------------------------------
{"end_offset": "@ 1 day", "start_offset": "@ 2 days", "mat_hypertable_id": 4}
config
--------------------------------------------------------------------------------
{"end_offset": "@ 1 day", "start_offset": "@ 15 days", "mat_hypertable_id": 4}
(1 row)
INSERT INTO continuous_agg_max_mat_date
SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day');
SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day');
--- to prevent NOTICES set message level to warning
SET client_min_messages TO warning;
SET client_min_messages TO warning;
CALL run_job(:job_id);
RESET client_min_messages ;
RESET client_min_messages;
DROP MATERIALIZED VIEW max_mat_view_date;
CREATE TABLE continuous_agg_timestamp(time TIMESTAMP);
SELECT create_hypertable('continuous_agg_timestamp', 'time');
@ -165,42 +230,48 @@ NOTICE: adding not-null constraint to column "time"
(1 row)
CREATE MATERIALIZED VIEW max_mat_view_timestamp
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
--the start offset overflows the smallest time value, but is capped at
--the min value
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval);
add_continuous_aggregate_policy
---------------------------------
1004
(1 row)
SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp');
remove_continuous_aggregate_policy
------------------------------------
(1 row)
\set ON_ERROR_STOP 0
--will overflow at runtime even though policy check works
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);
ERROR: timestamp out of range
-- bad timestamps at runtime even though policy check works
SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp');
remove_continuous_aggregate_policy
------------------------------------
(1 row)
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '301 days', '10 months' , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);
ERROR: invalid refresh window
\set VERBOSITY default
--start and end offset capped at the lowest time value, which means
--zero size window
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '900000 years' , '1 h'::interval);
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "timestamp without time zone".
HINT: Use a start and end offset that specifies a window of at least @ 14 days.
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '301 days', '10 months' , '1 h'::interval);
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "timestamp without time zone".
HINT: Use a start and end offset that specifies a window of at least @ 14 days.
\set VERBOSITY terse
\set ON_ERROR_STOP 1
SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp');
remove_continuous_aggregate_policy
------------------------------------
(1 row)
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 days', '1 h'::interval , '1 h'::interval) as job_id \gset
--- to prevent NOTICES set message level to warning
SET client_min_messages TO warning;
SET client_min_messages TO warning;
CALL run_job(:job_id);
RESET client_min_messages ;
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
config
---------------------------------------------------------------------------------
{"end_offset": "@ 1 hour", "start_offset": "@ 10 days", "mat_hypertable_id": 6}
{"end_offset": "@ 1 hour", "start_offset": "@ 15 days", "mat_hypertable_id": 6}
(1 row)
\c :TEST_DBNAME :ROLE_SUPERUSER
@ -215,7 +286,7 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
(1 row)
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 day', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: could not find start_offset in config for job
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: invalid input syntax for type interval: "xyz"
@ -244,12 +315,17 @@ SELECT time_bucket( SMALLINT '1', a) , count(*)
FROM smallint_tab
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 0 , '1 h'::interval);
ERROR: invalid parameter value for start_offset
HINT: Use time interval of type smallint with the continuous aggregate.
SELECT add_continuous_aggregate_policy('mat_smallint', 98898::smallint , 0::smallint, '1 h'::interval);
ERROR: smallint out of range
SELECT add_continuous_aggregate_policy('mat_smallint', 5::smallint, 10::smallint , '1 h'::interval) as job_id \gset
ERROR: start interval should be greater than end interval
ERROR: policy refresh window too small
DETAIL: The start and end offsets must cover at least two buckets in the valid time range of type "smallint".
HINT: Use a start and end offset that specifies a window of at least 2.
\set VERBOSITY terse
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 0::smallint , '1 h'::interval) as job_id \gset
INSERT INTO smallint_tab VALUES(5);
@ -296,7 +372,7 @@ SELECT * FROM mat_smallint ORDER BY 1;
---+--------
(0 rows)
-- overflow start_interval. now this runs as range is capped [-32768, -32765)
-- overflow start_interval. now this runs as range is capped [-32768, -32765)
INSERT INTO smallint_tab VALUES( -32760 );
SELECT maxval, maxval - 10, maxval -5 FROM integer_now_smallint_tab() as maxval;
maxval | ?column? | ?column?
@ -320,7 +396,7 @@ SELECT * FROM mat_smallint ORDER BY 1;
(0 rows)
-- Case 2: overflow by subtracting from PG_INT16_MAX
--overflow start and end . will fail as range is [32767, 32767]
--overflow start and end . will fail as range is [32767, 32767]
SELECT remove_continuous_aggregate_policy('mat_smallint');
remove_continuous_aggregate_policy
------------------------------------
@ -335,7 +411,7 @@ SELECT maxval, maxval - (-1), maxval - (-2) FROM integer_now_smallint_tab() as m
32767 | 32768 | 32769
(1 row)
SELECT add_continuous_aggregate_policy('mat_smallint', -1::smallint, -2::smallint , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('mat_smallint', -1::smallint, -3::smallint , '1 h'::interval) as job_id \gset
\set ON_ERROR_STOP 0
CALL run_job(:job_id);
ERROR: invalid refresh window
@ -351,14 +427,14 @@ SELECT remove_continuous_aggregate_policy('mat_smallint');
(1 row)
--overflow end . will work range is [32765, 32767)
--overflow end . will work range is [32765, 32767)
SELECT maxval, maxval - (1), maxval - (-2) FROM integer_now_smallint_tab() as maxval;
maxval | ?column? | ?column?
--------+----------+----------
32767 | 32766 | 32769
(1 row)
SELECT add_continuous_aggregate_policy('mat_smallint', 1::smallint, -2::smallint , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('mat_smallint', 1::smallint, -3::smallint , '1 h'::interval) as job_id \gset
\set ON_ERROR_STOP 0
CALL run_job(:job_id);
SELECT * FROM mat_smallint ORDER BY 1;
@ -409,7 +485,7 @@ FROM bigint_tab
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_bigint', 5::bigint, 10::bigint , '1 h'::interval) ;
ERROR: start interval should be greater than end interval
ERROR: policy refresh window too small
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('mat_bigint', 15::bigint, 0::bigint , '1 h'::interval) as job_mid \gset
INSERT INTO bigint_tab VALUES(5);

View File

@ -55,7 +55,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
AS SELECT time_bucket('7 days', time), count(*)
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 days', '1 h'::interval , '1 h'::interval) as job_id \gset
INSERT INTO continuous_agg_timestamp
SELECT generate_series('2019-09-01 00:00'::timestamp, '2019-09-10 00:00'::timestamp, '1 day');
--- to prevent NOTICES set message level to warning

View File

@ -227,7 +227,7 @@ CREATE MATERIALIZED VIEW test_continuous_agg_view
FROM test_continuous_agg_table
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, -2::integer, '12 h'::interval);
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval);
SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg \gset
SELECT id AS job_id FROM _timescaledb_config.bgw_job WHERE hypertable_id=:mat_hypertable_id \gset
@ -288,7 +288,7 @@ CREATE MATERIALIZED VIEW test_continuous_agg_view
FROM test_continuous_agg_table
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', NULL, -2::integer, '12 h'::interval);
SELECT add_continuous_aggregate_policy('test_continuous_agg_view', 100::integer, -2::integer, '12 h'::interval);
SELECT id AS job_id FROM _timescaledb_config.bgw_job ORDER BY id desc limit 1 \gset

View File

@ -40,12 +40,15 @@ SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT count(*) FROM _timescaledb_config.bgw_job;
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('int_tab', '1 day'::interval, 10 , '1 h'::interval);
SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 , '1 h'::interval);
SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 );
SELECT add_continuous_aggregate_policy('mat_m1', 10, '1 day'::interval, '1 h'::interval);
--start_interval < end_interval
SELECT add_continuous_aggregate_policy('mat_m1', 5, 10, '1h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('mat_m1', 5, 10, '1h'::interval);
--refresh window less than two buckets
SELECT add_continuous_aggregate_policy('mat_m1', 11, 10, '1h'::interval);
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id \gset
--adding again should warn/error
@ -56,12 +59,18 @@ SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_
-- modify config and try to add, should error out
SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
SELECT hypertable_id as mat_id FROM _timescaledb_config.bgw_job where id = :job_id \gset
\set VERBOSITY terse
\set ON_ERROR_STOP 1
\c :TEST_DBNAME :ROLE_SUPERUSER
UPDATE _timescaledb_config.bgw_job
SET config = jsonb_build_object('mat_hypertable_id', :mat_id)
WHERE id = :job_id;
SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true);
SELECT remove_continuous_aggregate_policy('int_tab');
@ -74,6 +83,8 @@ SELECT remove_continuous_aggregate_policy('mat_m1', if_not_exists=>true);
--should fail
SET ROLE :ROLE_DEFAULT_PERM_USER_2;
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id ;
\set VERBOSITY terse
\set ON_ERROR_STOP 1
SET ROLE :ROLE_DEFAULT_PERM_USER;
DROP MATERIALIZED VIEW mat_m1;
@ -82,52 +93,69 @@ DROP MATERIALIZED VIEW mat_m1;
CREATE TABLE continuous_agg_max_mat_date(time DATE);
SELECT create_hypertable('continuous_agg_max_mat_date', 'time');
CREATE MATERIALIZED VIEW max_mat_view_date
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_max_mat_date
GROUP BY 1 WITH NO DATA;
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_max_mat_date
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
--start_interval < end_interval
SELECT add_continuous_aggregate_policy('max_mat_view_date', '1 day'::interval, '2 days'::interval , '1 day'::interval) ;
--interval less than two buckets
SELECT add_continuous_aggregate_policy('max_mat_view_date', '7 days', '1 day', '1 day'::interval);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '14 days', '1 day', '1 day'::interval);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-10 hours', '1 day'::interval);
\set VERBOSITY terse
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset
-- Negative start offset gives two bucket window:
SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-1 day', '1 day'::interval);
SELECT remove_continuous_aggregate_policy('max_mat_view_date');
-- Both offsets NULL:
SELECT add_continuous_aggregate_policy('max_mat_view_date', NULL, NULL, '1 day'::interval);
SELECT remove_continuous_aggregate_policy('max_mat_view_date');
SELECT add_continuous_aggregate_policy('max_mat_view_date', '15 days', '1 day', '1 day'::interval) as job_id \gset
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
INSERT INTO continuous_agg_max_mat_date
SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day');
SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day');
--- to prevent NOTICES set message level to warning
SET client_min_messages TO warning;
SET client_min_messages TO warning;
CALL run_job(:job_id);
RESET client_min_messages ;
RESET client_min_messages;
DROP MATERIALIZED VIEW max_mat_view_date;
CREATE TABLE continuous_agg_timestamp(time TIMESTAMP);
SELECT create_hypertable('continuous_agg_timestamp', 'time');
CREATE MATERIALIZED VIEW max_mat_view_timestamp
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
--the start offset overflows the smallest time value, but is capped at
--the min value
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval);
SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp');
\set ON_ERROR_STOP 0
--will overflow at runtime even though policy check works
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);
-- bad timestamps at runtime even though policy check works
SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp');
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '301 days', '10 months' , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);
\set VERBOSITY default
--start and end offset capped at the lowest time value, which means
--zero size window
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '900000 years' , '1 h'::interval);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '301 days', '10 months' , '1 h'::interval);
\set VERBOSITY terse
\set ON_ERROR_STOP 1
SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp');
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 days', '1 h'::interval , '1 h'::interval) as job_id \gset
--- to prevent NOTICES set message level to warning
SET client_min_messages TO warning;
SET client_min_messages TO warning;
CALL run_job(:job_id);
RESET client_min_messages ;
@ -142,7 +170,7 @@ WHERE id = :job_id;
SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 day', '1 day', '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1
@ -161,9 +189,11 @@ SELECT time_bucket( SMALLINT '1', a) , count(*)
FROM smallint_tab
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 0 , '1 h'::interval);
SELECT add_continuous_aggregate_policy('mat_smallint', 98898::smallint , 0::smallint, '1 h'::interval);
SELECT add_continuous_aggregate_policy('mat_smallint', 5::smallint, 10::smallint , '1 h'::interval) as job_id \gset
\set VERBOSITY terse
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 0::smallint , '1 h'::interval) as job_id \gset
INSERT INTO smallint_tab VALUES(5);
@ -189,7 +219,7 @@ CALL run_job(:job_id);
\set ON_ERROR_STOP 1
SELECT * FROM mat_smallint ORDER BY 1;
-- overflow start_interval. now this runs as range is capped [-32768, -32765)
-- overflow start_interval. now this runs as range is capped [-32768, -32765)
INSERT INTO smallint_tab VALUES( -32760 );
SELECT maxval, maxval - 10, maxval -5 FROM integer_now_smallint_tab() as maxval;
CALL run_job(:job_id);
@ -201,21 +231,21 @@ CALL refresh_continuous_aggregate('mat_smallint', NULL, NULL);
SELECT * FROM mat_smallint ORDER BY 1;
-- Case 2: overflow by subtracting from PG_INT16_MAX
--overflow start and end . will fail as range is [32767, 32767]
--overflow start and end . will fail as range is [32767, 32767]
SELECT remove_continuous_aggregate_policy('mat_smallint');
INSERT INTO smallint_tab VALUES( 32766 );
INSERT INTO smallint_tab VALUES( 32767 );
SELECT maxval, maxval - (-1), maxval - (-2) FROM integer_now_smallint_tab() as maxval;
SELECT add_continuous_aggregate_policy('mat_smallint', -1::smallint, -2::smallint , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('mat_smallint', -1::smallint, -3::smallint , '1 h'::interval) as job_id \gset
\set ON_ERROR_STOP 0
CALL run_job(:job_id);
\set ON_ERROR_STOP 1
SELECT * FROM mat_smallint ORDER BY 1;
SELECT remove_continuous_aggregate_policy('mat_smallint');
--overflow end . will work range is [32765, 32767)
--overflow end . will work range is [32765, 32767)
SELECT maxval, maxval - (1), maxval - (-2) FROM integer_now_smallint_tab() as maxval;
SELECT add_continuous_aggregate_policy('mat_smallint', 1::smallint, -2::smallint , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('mat_smallint', 1::smallint, -3::smallint , '1 h'::interval) as job_id \gset
\set ON_ERROR_STOP 0
CALL run_job(:job_id);
SELECT * FROM mat_smallint ORDER BY 1;

View File

@ -40,7 +40,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 days', '1 h'::interval , '1 h'::interval) as job_id \gset
INSERT INTO continuous_agg_timestamp
SELECT generate_series('2019-09-01 00:00'::timestamp, '2019-09-10 00:00'::timestamp, '1 day');
--- to prevent NOTICES set message level to warning

View File

@ -109,7 +109,6 @@ CALL refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-01');
-- Bad time input
CALL refresh_continuous_aggregate('daily_temp', '2020-05-01'::text, '2020-05-03'::text);
CALL refresh_continuous_aggregate('daily_temp', 0, '2020-05-01');
\set ON_ERROR_STOP 1
-- Test different time types