mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-18 19:59:48 +08:00
Support for CAgg with origin/offset parameter
So far, we allowed only CAggs without origin or offset parameters in the time_bucket definition. This commit adds support for the remaining time_bucket variants. Fixes #2265, Fixes #5453, Fixes #5828
This commit is contained in:
parent
52094a3103
commit
8d9b06294e
.unreleased
src
tsl
src/continuous_aggs
test
1
.unreleased/feature_6382
Normal file
1
.unreleased/feature_6382
Normal file
@ -0,0 +1 @@
|
||||
Implements: #6382 Support for time_bucket with origin and offset in CAggs
|
@ -204,20 +204,22 @@ static FuncInfo funcinfo[] = {
|
||||
.group_estimate = time_bucket_group_estimate,
|
||||
.sort_transform = time_bucket_sort_transform,
|
||||
},
|
||||
/* Interval Bucket with origin */
|
||||
{
|
||||
.origin = ORIGIN_TIMESCALE,
|
||||
.is_bucketing_func = true,
|
||||
.allowed_in_cagg_definition = false,
|
||||
.allowed_in_cagg_definition = true,
|
||||
.funcname = "time_bucket",
|
||||
.nargs = 3,
|
||||
.arg_types = { INTERVALOID, TIMESTAMPOID, TIMESTAMPOID },
|
||||
.group_estimate = time_bucket_group_estimate,
|
||||
.sort_transform = time_bucket_sort_transform,
|
||||
},
|
||||
/* Interval Bucket with offset */
|
||||
{
|
||||
.origin = ORIGIN_TIMESCALE,
|
||||
.is_bucketing_func = true,
|
||||
.allowed_in_cagg_definition = false,
|
||||
.allowed_in_cagg_definition = true,
|
||||
.funcname = "time_bucket",
|
||||
.nargs = 3,
|
||||
.arg_types = { INTERVALOID, TIMESTAMPOID, INTERVALOID },
|
||||
@ -234,20 +236,22 @@ static FuncInfo funcinfo[] = {
|
||||
.group_estimate = time_bucket_group_estimate,
|
||||
.sort_transform = time_bucket_sort_transform,
|
||||
},
|
||||
/* Interval Bucket with origin */
|
||||
{
|
||||
.origin = ORIGIN_TIMESCALE,
|
||||
.is_bucketing_func = true,
|
||||
.allowed_in_cagg_definition = false,
|
||||
.allowed_in_cagg_definition = true,
|
||||
.funcname = "time_bucket",
|
||||
.nargs = 3,
|
||||
.arg_types = { INTERVALOID, TIMESTAMPTZOID, TIMESTAMPTZOID },
|
||||
.group_estimate = time_bucket_group_estimate,
|
||||
.sort_transform = time_bucket_sort_transform,
|
||||
},
|
||||
/* Interval Bucket with offset */
|
||||
{
|
||||
.origin = ORIGIN_TIMESCALE,
|
||||
.is_bucketing_func = true,
|
||||
.allowed_in_cagg_definition = false,
|
||||
.allowed_in_cagg_definition = true,
|
||||
.funcname = "time_bucket",
|
||||
.nargs = 3,
|
||||
.arg_types = { INTERVALOID, TIMESTAMPTZOID, INTERVALOID },
|
||||
@ -264,20 +268,22 @@ static FuncInfo funcinfo[] = {
|
||||
.group_estimate = time_bucket_group_estimate,
|
||||
.sort_transform = time_bucket_sort_transform,
|
||||
},
|
||||
/* Interval Bucket with origin */
|
||||
{
|
||||
.origin = ORIGIN_TIMESCALE,
|
||||
.is_bucketing_func = true,
|
||||
.allowed_in_cagg_definition = false,
|
||||
.allowed_in_cagg_definition = true,
|
||||
.funcname = "time_bucket",
|
||||
.nargs = 3,
|
||||
.arg_types = { INTERVALOID, DATEOID, DATEOID },
|
||||
.group_estimate = time_bucket_group_estimate,
|
||||
.sort_transform = time_bucket_sort_transform,
|
||||
},
|
||||
/* Interval Bucket with offset */
|
||||
{
|
||||
.origin = ORIGIN_TIMESCALE,
|
||||
.is_bucketing_func = true,
|
||||
.allowed_in_cagg_definition = false,
|
||||
.allowed_in_cagg_definition = true,
|
||||
.funcname = "time_bucket",
|
||||
.nargs = 3,
|
||||
.arg_types = { INTERVALOID, DATEOID, INTERVALOID },
|
||||
@ -294,10 +300,11 @@ static FuncInfo funcinfo[] = {
|
||||
.group_estimate = time_bucket_group_estimate,
|
||||
.sort_transform = time_bucket_sort_transform,
|
||||
},
|
||||
/* Int2 Bucket with offset */
|
||||
{
|
||||
.origin = ORIGIN_TIMESCALE,
|
||||
.is_bucketing_func = true,
|
||||
.allowed_in_cagg_definition = false,
|
||||
.allowed_in_cagg_definition = true,
|
||||
.funcname = "time_bucket",
|
||||
.nargs = 3,
|
||||
.arg_types = { INT2OID, INT2OID, INT2OID },
|
||||
@ -314,10 +321,11 @@ static FuncInfo funcinfo[] = {
|
||||
.group_estimate = time_bucket_group_estimate,
|
||||
.sort_transform = time_bucket_sort_transform,
|
||||
},
|
||||
/* Int4 Bucket with offset */
|
||||
{
|
||||
.origin = ORIGIN_TIMESCALE,
|
||||
.is_bucketing_func = true,
|
||||
.allowed_in_cagg_definition = false,
|
||||
.allowed_in_cagg_definition = true,
|
||||
.funcname = "time_bucket",
|
||||
.nargs = 3,
|
||||
.arg_types = { INT4OID, INT4OID, INT4OID },
|
||||
@ -334,10 +342,11 @@ static FuncInfo funcinfo[] = {
|
||||
.group_estimate = time_bucket_group_estimate,
|
||||
.sort_transform = time_bucket_sort_transform,
|
||||
},
|
||||
/* Int8 Bucket with offset */
|
||||
{
|
||||
.origin = ORIGIN_TIMESCALE,
|
||||
.is_bucketing_func = true,
|
||||
.allowed_in_cagg_definition = false,
|
||||
.allowed_in_cagg_definition = true,
|
||||
.funcname = "time_bucket",
|
||||
.nargs = 3,
|
||||
.arg_types = { INT8OID, INT8OID, INT8OID },
|
||||
|
@ -459,13 +459,28 @@ ts_date_offset_bucket(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_DATUM(date);
|
||||
}
|
||||
|
||||
TSDLLEXPORT int64
|
||||
ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid timestamp_type)
|
||||
{
|
||||
NullableDatum null_datum = INIT_NULL_DATUM;
|
||||
return ts_time_bucket_by_type_extended(interval,
|
||||
timestamp,
|
||||
timestamp_type,
|
||||
null_datum,
|
||||
null_datum);
|
||||
}
|
||||
|
||||
/* when working with time_buckets stored in our catalog, we may not know ahead of time which
|
||||
* bucketing function to use, this function dynamically dispatches to the correct time_bucket_<foo>
|
||||
* based on an inputted timestamp_type
|
||||
*/
|
||||
TSDLLEXPORT int64
|
||||
ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid timestamp_type)
|
||||
ts_time_bucket_by_type_extended(int64 interval, int64 timestamp, Oid timestamp_type,
|
||||
NullableDatum offset, NullableDatum origin)
|
||||
{
|
||||
/* Defined offset and origin in one function is not supported */
|
||||
Assert(offset.isnull == true || origin.isnull == true);
|
||||
|
||||
Datum timestamp_in_time_type = ts_internal_to_time_value(timestamp, timestamp_type);
|
||||
Datum interval_in_interval_type;
|
||||
Datum time_bucketed;
|
||||
@ -487,22 +502,48 @@ ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid timestamp_type)
|
||||
break;
|
||||
case TIMESTAMPOID:
|
||||
interval_in_interval_type = ts_internal_to_interval_value(interval, INTERVALOID);
|
||||
bucket_function = ts_timestamp_bucket;
|
||||
if (offset.isnull)
|
||||
bucket_function = ts_timestamp_bucket; /* handles also origin */
|
||||
else
|
||||
bucket_function = ts_timestamp_offset_bucket;
|
||||
break;
|
||||
case TIMESTAMPTZOID:
|
||||
interval_in_interval_type = ts_internal_to_interval_value(interval, INTERVALOID);
|
||||
bucket_function = ts_timestamptz_bucket;
|
||||
if (offset.isnull)
|
||||
bucket_function = ts_timestamptz_bucket; /* handles also origin */
|
||||
else
|
||||
bucket_function = ts_timestamptz_offset_bucket;
|
||||
break;
|
||||
case DATEOID:
|
||||
interval_in_interval_type = ts_internal_to_interval_value(interval, INTERVALOID);
|
||||
bucket_function = ts_date_bucket;
|
||||
if (offset.isnull)
|
||||
bucket_function = ts_date_bucket; /* handles also origin */
|
||||
else
|
||||
bucket_function = ts_date_offset_bucket;
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "invalid time_bucket type \"%s\"", format_type_be(timestamp_type));
|
||||
}
|
||||
|
||||
time_bucketed =
|
||||
DirectFunctionCall2(bucket_function, interval_in_interval_type, timestamp_in_time_type);
|
||||
if (!offset.isnull)
|
||||
{
|
||||
time_bucketed = DirectFunctionCall3(bucket_function,
|
||||
interval_in_interval_type,
|
||||
timestamp_in_time_type,
|
||||
offset.value);
|
||||
}
|
||||
else if (!origin.isnull)
|
||||
{
|
||||
time_bucketed = DirectFunctionCall3(bucket_function,
|
||||
interval_in_interval_type,
|
||||
timestamp_in_time_type,
|
||||
origin.value);
|
||||
}
|
||||
else
|
||||
{
|
||||
time_bucketed =
|
||||
DirectFunctionCall2(bucket_function, interval_in_interval_type, timestamp_in_time_type);
|
||||
}
|
||||
|
||||
return ts_time_value_to_internal(time_bucketed, timestamp_type);
|
||||
}
|
||||
|
@ -18,6 +18,9 @@ extern TSDLLEXPORT Datum ts_timestamp_bucket(PG_FUNCTION_ARGS);
|
||||
extern TSDLLEXPORT Datum ts_timestamptz_bucket(PG_FUNCTION_ARGS);
|
||||
extern TSDLLEXPORT Datum ts_timestamptz_timezone_bucket(PG_FUNCTION_ARGS);
|
||||
extern TSDLLEXPORT int64 ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid type);
|
||||
extern TSDLLEXPORT int64 ts_time_bucket_by_type_extended(int64 interval, int64 timestamp, Oid type,
|
||||
NullableDatum offset,
|
||||
NullableDatum origin);
|
||||
extern TSDLLEXPORT Datum ts_time_bucket_ng_date(PG_FUNCTION_ARGS);
|
||||
extern TSDLLEXPORT Datum ts_time_bucket_ng_timestamp(PG_FUNCTION_ARGS);
|
||||
extern TSDLLEXPORT Datum ts_time_bucket_ng_timestamptz(PG_FUNCTION_ARGS);
|
||||
|
@ -1383,8 +1383,9 @@ ts_continuous_agg_bucket_on_interval(Oid bucket_function)
|
||||
}
|
||||
|
||||
/*
|
||||
* Calls one of time_bucket_ng() versions depending on the arguments. This is
|
||||
* a common procedure used by ts_compute_* below.
|
||||
* Calls the desired time bucket function depending on the arguments. If the experimental flag is
|
||||
* set on ContinuousAggsBucketFunction, one of time_bucket_ng() versions is used. This is a common
|
||||
* procedure used by ts_compute_* below.
|
||||
*/
|
||||
static Datum
|
||||
generic_time_bucket(const ContinuousAggsBucketFunction *bf, Datum timestamp)
|
||||
@ -1524,7 +1525,8 @@ void
|
||||
ts_compute_inscribed_bucketed_refresh_window_variable(int64 *start, int64 *end,
|
||||
const ContinuousAggsBucketFunction *bf)
|
||||
{
|
||||
Datum start_old, end_old, start_new, end_new;
|
||||
Datum start_old, end_old, start_aligned, end_aliged;
|
||||
|
||||
/*
|
||||
* It's OK to use TIMESTAMPOID here. Variable-sized buckets can be used
|
||||
* only for dates, timestamps and timestamptz's. For all these types our
|
||||
@ -1535,16 +1537,16 @@ ts_compute_inscribed_bucketed_refresh_window_variable(int64 *start, int64 *end,
|
||||
start_old = ts_internal_to_time_value(*start, TIMESTAMPOID);
|
||||
end_old = ts_internal_to_time_value(*end, TIMESTAMPOID);
|
||||
|
||||
start_new = generic_time_bucket(bf, start_old);
|
||||
end_new = generic_time_bucket(bf, end_old);
|
||||
start_aligned = generic_time_bucket(bf, start_old);
|
||||
end_aliged = generic_time_bucket(bf, end_old);
|
||||
|
||||
if (DatumGetTimestamp(start_new) != DatumGetTimestamp(start_old))
|
||||
if (DatumGetTimestamp(start_aligned) != DatumGetTimestamp(start_old))
|
||||
{
|
||||
start_new = generic_add_interval(bf, start_new);
|
||||
start_aligned = generic_add_interval(bf, start_aligned);
|
||||
}
|
||||
|
||||
*start = ts_time_value_to_internal(start_new, TIMESTAMPOID);
|
||||
*end = ts_time_value_to_internal(end_new, TIMESTAMPOID);
|
||||
*start = ts_time_value_to_internal(start_aligned, TIMESTAMPOID);
|
||||
*end = ts_time_value_to_internal(end_aliged, TIMESTAMPOID);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -129,6 +129,11 @@ extern TSDLLEXPORT List *ts_get_reloptions(Oid relid);
|
||||
|
||||
#define is_inheritance_table(relid) (is_inheritance_child(relid) || is_inheritance_parent(relid))
|
||||
|
||||
#define INIT_NULL_DATUM \
|
||||
{ \
|
||||
.value = 0, .isnull = true \
|
||||
}
|
||||
|
||||
static inline int64
|
||||
int64_min(int64 a, int64 b)
|
||||
{
|
||||
|
@ -6,6 +6,9 @@
|
||||
|
||||
#include "common.h"
|
||||
|
||||
#include <utils/date.h>
|
||||
#include <utils/timestamp.h>
|
||||
|
||||
static Const *check_time_bucket_argument(Node *arg, char *position);
|
||||
static void caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id,
|
||||
Oid hypertable_oid, AttrNumber hypertable_partition_colno,
|
||||
@ -147,6 +150,65 @@ destroy_union_query(Query *q)
|
||||
return query;
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle additional parameter of the timebucket function such as timezone, offset, or origin
|
||||
*/
|
||||
static void
|
||||
process_additional_timebucket_parameter(CAggTimebucketInfo *tbinfo, Const *arg)
|
||||
{
|
||||
char *tz_name;
|
||||
switch (exprType((Node *) arg))
|
||||
{
|
||||
/* Timezone as text */
|
||||
case TEXTOID:
|
||||
tz_name = TextDatumGetCString(arg->constvalue);
|
||||
if (!ts_is_valid_timezone_name(tz_name))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid timezone name \"%s\"", tz_name)));
|
||||
}
|
||||
|
||||
tbinfo->bucket_time_timezone = tz_name;
|
||||
break;
|
||||
case INTERVALOID:
|
||||
/* Bucket offset as interval */
|
||||
tbinfo->bucket_time_offset = DatumGetIntervalP(arg->constvalue);
|
||||
break;
|
||||
case DATEOID:
|
||||
/* Bucket origin as Date */
|
||||
tbinfo->bucket_time_origin =
|
||||
date2timestamptz_opt_overflow(DatumGetDateADT(arg->constvalue), NULL);
|
||||
break;
|
||||
case TIMESTAMPOID:
|
||||
/* Bucket origin as Timestamp */
|
||||
tbinfo->bucket_time_origin = DatumGetTimestamp(arg->constvalue);
|
||||
break;
|
||||
case TIMESTAMPTZOID:
|
||||
/* Bucket origin as TimestampTZ */
|
||||
tbinfo->bucket_time_origin = DatumGetTimestampTz(arg->constvalue);
|
||||
break;
|
||||
case INT2OID:
|
||||
/* Bucket offset as smallint */
|
||||
tbinfo->bucket_integer_offset = DatumGetInt16(arg->constvalue);
|
||||
break;
|
||||
case INT4OID:
|
||||
/* Bucket offset as int */
|
||||
tbinfo->bucket_integer_offset = DatumGetInt32(arg->constvalue);
|
||||
break;
|
||||
case INT8OID:
|
||||
/* Bucket offset as bigint */
|
||||
tbinfo->bucket_integer_offset = DatumGetInt64(arg->constvalue);
|
||||
break;
|
||||
default:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_TS_INTERNAL_ERROR),
|
||||
errmsg("unable to handle time_bucket parameter of type: %s",
|
||||
format_type_be(exprType((Node *) arg)))));
|
||||
pg_unreachable();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if the group-by clauses has exactly 1 time_bucket(.., <col>) where
|
||||
* <col> is the hypertable's partitioning column and other invariants. Then fill
|
||||
@ -213,36 +275,13 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar
|
||||
if (list_length(fe->args) >= 3)
|
||||
{
|
||||
Const *arg = check_time_bucket_argument(lthird(fe->args), "third");
|
||||
if (exprType((Node *) arg) == TEXTOID)
|
||||
{
|
||||
const char *tz_name = TextDatumGetCString(arg->constvalue);
|
||||
if (!ts_is_valid_timezone_name(tz_name))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid timezone name \"%s\"", tz_name)));
|
||||
}
|
||||
|
||||
tbinfo->bucket_time_timezone = tz_name;
|
||||
}
|
||||
process_additional_timebucket_parameter(tbinfo, arg);
|
||||
}
|
||||
|
||||
if (list_length(fe->args) >= 4)
|
||||
{
|
||||
/* origin */
|
||||
Const *arg = check_time_bucket_argument(lfourth(fe->args), "fourth");
|
||||
if (exprType((Node *) arg) == TEXTOID)
|
||||
{
|
||||
const char *tz_name = TextDatumGetCString(arg->constvalue);
|
||||
if (!ts_is_valid_timezone_name(tz_name))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid timezone name \"%s\"", tz_name)));
|
||||
}
|
||||
|
||||
tbinfo->bucket_time_timezone = tz_name;
|
||||
}
|
||||
process_additional_timebucket_parameter(tbinfo, arg);
|
||||
}
|
||||
|
||||
/* Check for custom origin. */
|
||||
@ -250,7 +289,7 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar
|
||||
{
|
||||
case DATEOID:
|
||||
/* Origin is always 3rd arg for date variants. */
|
||||
if (list_length(fe->args) == 3)
|
||||
if (list_length(fe->args) == 3 && exprType(lthird(fe->args)) == DATEOID)
|
||||
{
|
||||
Node *arg = lthird(fe->args);
|
||||
custom_origin = true;
|
||||
@ -262,7 +301,7 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar
|
||||
break;
|
||||
case TIMESTAMPOID:
|
||||
/* Origin is always 3rd arg for timestamp variants. */
|
||||
if (list_length(fe->args) == 3)
|
||||
if (list_length(fe->args) == 3 && exprType(lthird(fe->args)) == TIMESTAMPOID)
|
||||
{
|
||||
Node *arg = lthird(fe->args);
|
||||
custom_origin = true;
|
||||
@ -274,9 +313,10 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar
|
||||
/* Origin can be 3rd or 4th arg for timestamptz variants. */
|
||||
if (list_length(fe->args) >= 3 && exprType(lthird(fe->args)) == TIMESTAMPTZOID)
|
||||
{
|
||||
Node *arg = lthird(fe->args);
|
||||
custom_origin = true;
|
||||
tbinfo->bucket_time_origin =
|
||||
DatumGetTimestampTz(castNode(Const, lthird(fe->args))->constvalue);
|
||||
Const *constval = check_time_bucket_argument(arg, "third");
|
||||
tbinfo->bucket_time_origin = DatumGetTimestampTz(constval->constvalue);
|
||||
}
|
||||
else if (list_length(fe->args) >= 4 &&
|
||||
exprType(lfourth(fe->args)) == TIMESTAMPTZOID)
|
||||
@ -559,7 +599,8 @@ CAggTimebucketInfo
|
||||
cagg_validate_query(const Query *query, const bool finalized, const char *cagg_schema,
|
||||
const char *cagg_name, const bool is_cagg_create)
|
||||
{
|
||||
CAggTimebucketInfo bucket_info = { 0 }, bucket_info_parent;
|
||||
CAggTimebucketInfo bucket_info = { 0 };
|
||||
CAggTimebucketInfo bucket_info_parent = { 0 };
|
||||
Hypertable *ht = NULL, *ht_parent = NULL;
|
||||
RangeTblRef *rtref = NULL, *rtref_other = NULL;
|
||||
RangeTblEntry *rte = NULL, *rte_other = NULL;
|
||||
@ -891,6 +932,43 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create continuous aggregate on hypertable with row security")));
|
||||
|
||||
/* Test for broken time_bucket configurations (variable with with offset and origin). We need to
|
||||
* check only time based buckets since integer based buckets are always fixed. */
|
||||
bool time_offset_or_origin_set =
|
||||
(bucket_info.bucket_time_offset != NULL) ||
|
||||
(TIMESTAMP_NOT_FINITE(bucket_info.bucket_time_origin) == false);
|
||||
|
||||
/* Ignore time_bucket_ng in this check, since offset and origin were allowed in the past */
|
||||
FuncInfo *func_info = ts_func_cache_get_bucketing_func(bucket_info.bucket_func->funcid);
|
||||
bool is_time_bucket_ng = func_info->origin == ORIGIN_TIMESCALE_EXPERIMENTAL;
|
||||
|
||||
/*
|
||||
* Some time_bucket variants using variable-sized buckets and custom origin/offset values are
|
||||
* not behaving correctly. To prevent misaligned buckets, these variants are blocked at the
|
||||
* moment. This restriction can be removed as soon as time_bucket behaves correctly.
|
||||
*
|
||||
* --- Align with default origin ('midnight on January 1, 2000')
|
||||
* test2=# SELECT time_bucket('1 month', '2000-01-01 01:05:00 UTC'::timestamptz,
|
||||
* timezone=>'UTC'); time_bucket
|
||||
* ------------------------
|
||||
* 2000-01-01 00:00:00+00
|
||||
*
|
||||
* --- Using a custom origin
|
||||
* test2=# SELECT time_bucket('1 month', '2000-01-01 01:05:00 UTC'::timestamptz,
|
||||
* origin=>'2000-01-01 01:05:00 UTC'::timestamptz, timezone=>'UTC'); time_bucket
|
||||
* ------------------------
|
||||
* 2000-01-01 00:00:00+00 <--- Should be 2000-01-01 01:05:00+00
|
||||
* (1 row)
|
||||
*/
|
||||
if (time_bucket_info_has_fixed_width(&bucket_info) == false && time_offset_or_origin_set &&
|
||||
!is_time_bucket_ng)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create continuous aggregate with variable-width bucket using "
|
||||
"offset or origin.")));
|
||||
}
|
||||
|
||||
/* hierarchical cagg validations */
|
||||
if (is_hierarchical)
|
||||
{
|
||||
@ -975,6 +1053,85 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
|
||||
NameStr(cagg_parent->data.user_view_name),
|
||||
width_out_parent)));
|
||||
}
|
||||
|
||||
/* Test compatible time origin values */
|
||||
if (bucket_info.bucket_time_origin != bucket_info_parent.bucket_time_origin)
|
||||
{
|
||||
char *origin = DatumGetCString(
|
||||
DirectFunctionCall1(timestamptz_out,
|
||||
TimestampTzGetDatum(bucket_info.bucket_time_origin)));
|
||||
|
||||
char *origin_parent = DatumGetCString(
|
||||
DirectFunctionCall1(timestamptz_out,
|
||||
TimestampTzGetDatum(bucket_info_parent.bucket_time_origin)));
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg(
|
||||
"cannot create continuous aggregate with different bucket origin values"),
|
||||
errdetail("Time origin of \"%s.%s\" [%s] and \"%s.%s\" [%s] should be the "
|
||||
"same.",
|
||||
cagg_schema,
|
||||
cagg_name,
|
||||
origin,
|
||||
NameStr(cagg_parent->data.user_view_schema),
|
||||
NameStr(cagg_parent->data.user_view_name),
|
||||
origin_parent)));
|
||||
}
|
||||
|
||||
/* Test compatible time offset values */
|
||||
if (bucket_info.bucket_time_offset != NULL || bucket_info_parent.bucket_time_offset != NULL)
|
||||
{
|
||||
Datum offset_datum = IntervalPGetDatum(bucket_info.bucket_time_offset);
|
||||
Datum offset_datum_parent = IntervalPGetDatum(bucket_info_parent.bucket_time_offset);
|
||||
|
||||
bool both_buckets_are_equal = false;
|
||||
bool both_buckets_have_offset = (bucket_info.bucket_time_offset != NULL) &&
|
||||
(bucket_info_parent.bucket_time_offset != NULL);
|
||||
|
||||
if (both_buckets_have_offset)
|
||||
{
|
||||
both_buckets_are_equal = DatumGetBool(
|
||||
DirectFunctionCall2(interval_eq, offset_datum, offset_datum_parent));
|
||||
}
|
||||
|
||||
if (!both_buckets_are_equal)
|
||||
{
|
||||
char *offset = DatumGetCString(DirectFunctionCall1(interval_out, offset_datum));
|
||||
char *offset_parent =
|
||||
DatumGetCString(DirectFunctionCall1(interval_out, offset_datum_parent));
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create continuous aggregate with different bucket offset "
|
||||
"values"),
|
||||
errdetail("Time origin of \"%s.%s\" [%s] and \"%s.%s\" [%s] should be the "
|
||||
"same.",
|
||||
cagg_schema,
|
||||
cagg_name,
|
||||
offset,
|
||||
NameStr(cagg_parent->data.user_view_schema),
|
||||
NameStr(cagg_parent->data.user_view_name),
|
||||
offset_parent)));
|
||||
}
|
||||
}
|
||||
|
||||
/* Test compatible integer offset values */
|
||||
if (bucket_info.bucket_integer_offset != bucket_info_parent.bucket_integer_offset)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg(
|
||||
"cannot create continuous aggregate with different bucket offset values"),
|
||||
errdetail("Integer offset of \"%s.%s\" [" INT64_FORMAT
|
||||
"] and \"%s.%s\" [" INT64_FORMAT "] should be the same.",
|
||||
cagg_schema,
|
||||
cagg_name,
|
||||
bucket_info.bucket_integer_offset,
|
||||
NameStr(cagg_parent->data.user_view_schema),
|
||||
NameStr(cagg_parent->data.user_view_name),
|
||||
bucket_info_parent.bucket_integer_offset)));
|
||||
}
|
||||
}
|
||||
|
||||
return bucket_info;
|
||||
@ -1189,9 +1346,11 @@ makeRangeTblEntry(Query *query, const char *aliasname)
|
||||
* UNION ALL
|
||||
* SELECT * from q2 where existing_qual and <coale_qual>
|
||||
* where coale_qual is: time < ----> (or >= )
|
||||
* COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark( <htid>)),
|
||||
*
|
||||
* COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(<htid>)),
|
||||
* '-infinity'::timestamp with time zone)
|
||||
* See build_union_quals for COALESCE clauses.
|
||||
*
|
||||
* See build_union_query_quals for COALESCE clauses.
|
||||
*/
|
||||
Query *
|
||||
build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Query *q1, Query *q2,
|
||||
@ -1225,9 +1384,7 @@ build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Query *q1, Query
|
||||
/*
|
||||
* If there is join in CAgg definition then adjust varno
|
||||
* to get time column from the hypertable in the join.
|
||||
*/
|
||||
|
||||
/*
|
||||
*
|
||||
* In case of joins it is enough to check if the first node is not RangeTblRef,
|
||||
* because the jointree has RangeTblRef as leaves and JoinExpr above them.
|
||||
* So if JoinExpr is present, it is the first node.
|
||||
@ -1276,11 +1433,13 @@ build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Query *q1, Query
|
||||
}
|
||||
else
|
||||
varno = list_length(q2->rtable);
|
||||
|
||||
q2_quals = build_union_query_quals(materialize_htid,
|
||||
tbinfo->htpartcoltype,
|
||||
get_negator(tce->lt_opr),
|
||||
varno,
|
||||
tbinfo->htpartcolno);
|
||||
|
||||
q2->jointree->quals = make_and_qual(q2->jointree->quals, q2_quals);
|
||||
|
||||
Query *query = makeNode(Query);
|
||||
|
@ -835,12 +835,28 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer
|
||||
DirectFunctionCall1(timestamptz_out,
|
||||
TimestampTzGetDatum(bucket_info->bucket_time_origin)));
|
||||
}
|
||||
|
||||
if (bucket_info->bucket_time_offset != NULL)
|
||||
{
|
||||
bucket_offset = DatumGetCString(
|
||||
DirectFunctionCall1(interval_out,
|
||||
IntervalPGetDatum(bucket_info->bucket_time_offset)));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Bucket on integers */
|
||||
/* Bucketing on integers */
|
||||
bucket_width = palloc0(MAXINT8LEN + 1 * sizeof(char));
|
||||
pg_lltoa(bucket_info->bucket_integer_width, bucket_width);
|
||||
|
||||
/* Integer buckets with origin are not supported, so noting to do. */
|
||||
Assert(bucket_origin == NULL);
|
||||
|
||||
if (bucket_info->bucket_integer_offset != 0)
|
||||
{
|
||||
bucket_offset = palloc0(MAXINT8LEN + 1 * sizeof(char));
|
||||
pg_lltoa(bucket_info->bucket_integer_offset, bucket_offset);
|
||||
}
|
||||
}
|
||||
|
||||
create_bucket_function_catalog_entry(materialize_hypertable_id,
|
||||
|
@ -1015,7 +1015,8 @@ invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange
|
||||
if (count && tuplestore_tuple_count(store->tupstore) > max_materializations)
|
||||
{
|
||||
InternalTimeRange merged_refresh_window;
|
||||
continuous_agg_calculate_merged_refresh_window(refresh_window,
|
||||
continuous_agg_calculate_merged_refresh_window(cagg,
|
||||
refresh_window,
|
||||
store,
|
||||
state.bucket_function,
|
||||
&merged_refresh_window,
|
||||
|
@ -4,23 +4,24 @@
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
#include <postgres.h>
|
||||
#include <executor/spi.h>
|
||||
#include <fmgr.h>
|
||||
#include <scan_iterator.h>
|
||||
#include <scanner.h>
|
||||
#include <time_utils.h>
|
||||
#include <compat/compat.h>
|
||||
#include <executor/spi.h>
|
||||
#include <lib/stringinfo.h>
|
||||
#include <utils/builtins.h>
|
||||
#include <utils/date.h>
|
||||
#include <utils/palloc.h>
|
||||
#include <utils/rel.h>
|
||||
#include <utils/relcache.h>
|
||||
#include <utils/date.h>
|
||||
#include <utils/snapmgr.h>
|
||||
#include <utils/timestamp.h>
|
||||
|
||||
#include <scanner.h>
|
||||
#include <compat/compat.h>
|
||||
#include <scan_iterator.h>
|
||||
#include "ts_catalog/continuous_agg.h"
|
||||
#include "ts_catalog/continuous_aggs_watermark.h"
|
||||
#include <time_utils.h>
|
||||
#include "debug_assert.h"
|
||||
|
||||
#include "materialize.h"
|
||||
|
||||
#define CHUNKIDFROMRELID "chunk_id_from_relid"
|
||||
@ -37,7 +38,8 @@ static Datum internal_to_time_value_or_infinite(int64 internal, Oid time_type,
|
||||
* materialization support *
|
||||
***************************/
|
||||
|
||||
static void spi_update_materializations(Hypertable *mat_ht, SchemaAndName partial_view,
|
||||
static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
|
||||
SchemaAndName partial_view,
|
||||
SchemaAndName materialization_table,
|
||||
const NameData *time_column_name,
|
||||
TimeRange invalidation_range, const int32 chunk_id);
|
||||
@ -45,14 +47,16 @@ static void spi_delete_materializations(SchemaAndName materialization_table,
|
||||
const NameData *time_column_name,
|
||||
TimeRange invalidation_range,
|
||||
const char *const chunk_condition);
|
||||
static void spi_insert_materializations(Hypertable *mat_ht, SchemaAndName partial_view,
|
||||
static void spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
|
||||
SchemaAndName partial_view,
|
||||
SchemaAndName materialization_table,
|
||||
const NameData *time_column_name,
|
||||
TimeRange materialization_range,
|
||||
const char *const chunk_condition);
|
||||
|
||||
void
|
||||
continuous_agg_update_materialization(Hypertable *mat_ht, SchemaAndName partial_view,
|
||||
continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg,
|
||||
SchemaAndName partial_view,
|
||||
SchemaAndName materialization_table,
|
||||
const NameData *time_column_name,
|
||||
InternalTimeRange new_materialization_range,
|
||||
@ -101,6 +105,7 @@ continuous_agg_update_materialization(Hypertable *mat_ht, SchemaAndName partial_
|
||||
if (range_length(invalidation_range) == 0 || !materialize_invalidations_separately)
|
||||
{
|
||||
spi_update_materializations(mat_ht,
|
||||
cagg,
|
||||
partial_view,
|
||||
materialization_table,
|
||||
time_column_name,
|
||||
@ -111,6 +116,7 @@ continuous_agg_update_materialization(Hypertable *mat_ht, SchemaAndName partial_
|
||||
else
|
||||
{
|
||||
spi_update_materializations(mat_ht,
|
||||
cagg,
|
||||
partial_view,
|
||||
materialization_table,
|
||||
time_column_name,
|
||||
@ -118,6 +124,7 @@ continuous_agg_update_materialization(Hypertable *mat_ht, SchemaAndName partial_
|
||||
chunk_id);
|
||||
|
||||
spi_update_materializations(mat_ht,
|
||||
cagg,
|
||||
partial_view,
|
||||
materialization_table,
|
||||
time_column_name,
|
||||
@ -215,9 +222,10 @@ internal_time_range_to_time_range(InternalTimeRange internal)
|
||||
}
|
||||
|
||||
static void
|
||||
spi_update_materializations(Hypertable *mat_ht, SchemaAndName partial_view,
|
||||
SchemaAndName materialization_table, const NameData *time_column_name,
|
||||
TimeRange invalidation_range, const int32 chunk_id)
|
||||
spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
|
||||
SchemaAndName partial_view, SchemaAndName materialization_table,
|
||||
const NameData *time_column_name, TimeRange invalidation_range,
|
||||
const int32 chunk_id)
|
||||
{
|
||||
StringInfo chunk_condition = makeStringInfo();
|
||||
|
||||
@ -235,6 +243,7 @@ spi_update_materializations(Hypertable *mat_ht, SchemaAndName partial_view,
|
||||
invalidation_range,
|
||||
chunk_condition->data);
|
||||
spi_insert_materializations(mat_ht,
|
||||
cagg,
|
||||
partial_view,
|
||||
materialization_table,
|
||||
time_column_name,
|
||||
@ -284,9 +293,10 @@ spi_delete_materializations(SchemaAndName materialization_table, const NameData
|
||||
}
|
||||
|
||||
static void
|
||||
spi_insert_materializations(Hypertable *mat_ht, SchemaAndName partial_view,
|
||||
SchemaAndName materialization_table, const NameData *time_column_name,
|
||||
TimeRange materialization_range, const char *const chunk_condition)
|
||||
spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
|
||||
SchemaAndName partial_view, SchemaAndName materialization_table,
|
||||
const NameData *time_column_name, TimeRange materialization_range,
|
||||
const char *const chunk_condition)
|
||||
{
|
||||
int res;
|
||||
StringInfo command = makeStringInfo();
|
||||
|
@ -35,7 +35,8 @@ typedef struct InternalTimeRange
|
||||
int64 end; /* exclusive */
|
||||
} InternalTimeRange;
|
||||
|
||||
void continuous_agg_update_materialization(Hypertable *mat_ht, SchemaAndName partial_view,
|
||||
void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg,
|
||||
SchemaAndName partial_view,
|
||||
SchemaAndName materialization_table,
|
||||
const NameData *time_column_name,
|
||||
InternalTimeRange new_materialization_range,
|
||||
|
@ -4,17 +4,19 @@
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
#include <postgres.h>
|
||||
#include <utils/acl.h>
|
||||
#include <utils/lsyscache.h>
|
||||
#include <utils/fmgrprotos.h>
|
||||
#include <utils/snapmgr.h>
|
||||
#include <utils/guc.h>
|
||||
#include <utils/builtins.h>
|
||||
|
||||
#include <access/xact.h>
|
||||
#include <storage/lmgr.h>
|
||||
#include <miscadmin.h>
|
||||
#include <fmgr.h>
|
||||
#include <executor/spi.h>
|
||||
#include <fmgr.h>
|
||||
#include <miscadmin.h>
|
||||
#include <storage/lmgr.h>
|
||||
#include <utils/acl.h>
|
||||
#include <utils/builtins.h>
|
||||
#include <utils/date.h>
|
||||
#include <utils/fmgrprotos.h>
|
||||
#include <utils/guc.h>
|
||||
#include <utils/lsyscache.h>
|
||||
#include <utils/snapmgr.h>
|
||||
|
||||
#include "ts_catalog/catalog.h"
|
||||
#include "ts_catalog/continuous_agg.h"
|
||||
@ -44,10 +46,12 @@ typedef struct CaggRefreshState
|
||||
static Hypertable *cagg_get_hypertable_or_fail(int32 hypertable_id);
|
||||
static InternalTimeRange get_largest_bucketed_window(Oid timetype, int64 bucket_width);
|
||||
static InternalTimeRange
|
||||
compute_inscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window,
|
||||
compute_inscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *const refresh_window,
|
||||
const int64 bucket_width);
|
||||
static InternalTimeRange
|
||||
compute_circumscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window,
|
||||
compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *const refresh_window,
|
||||
const ContinuousAggsBucketFunction *bucket_function);
|
||||
static void continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *refresh_window);
|
||||
@ -75,6 +79,9 @@ static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *refresh_window,
|
||||
const CaggRefreshCallContext callctx,
|
||||
int32 chunk_id);
|
||||
static void fill_bucket_offset_origin(const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *const refresh_window,
|
||||
NullableDatum *offset, NullableDatum *origin);
|
||||
|
||||
static Hypertable *
|
||||
cagg_get_hypertable_or_fail(int32 hypertable_id)
|
||||
@ -144,9 +151,14 @@ get_largest_bucketed_window(Oid timetype, int64 bucket_width)
|
||||
* where part of its data were dropped by a retention policy. See #2198 for details.
|
||||
*/
|
||||
static InternalTimeRange
|
||||
compute_inscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window,
|
||||
compute_inscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *const refresh_window,
|
||||
const int64 bucket_width)
|
||||
{
|
||||
Assert(cagg != NULL);
|
||||
Assert(cagg->bucket_function != NULL);
|
||||
|
||||
NullableDatum NULL_DATUM = INIT_NULL_DATUM;
|
||||
InternalTimeRange result = *refresh_window;
|
||||
InternalTimeRange largest_bucketed_window =
|
||||
get_largest_bucketed_window(refresh_window->type, bucket_width);
|
||||
@ -164,7 +176,11 @@ compute_inscribed_bucketed_refresh_window(const InternalTimeRange *const refresh
|
||||
int64 included_bucket =
|
||||
ts_time_saturating_add(refresh_window->start, bucket_width - 1, refresh_window->type);
|
||||
/* Get the start of the included bucket. */
|
||||
result.start = ts_time_bucket_by_type(bucket_width, included_bucket, refresh_window->type);
|
||||
result.start = ts_time_bucket_by_type_extended(bucket_width,
|
||||
included_bucket,
|
||||
refresh_window->type,
|
||||
NULL_DATUM,
|
||||
NULL_DATUM);
|
||||
}
|
||||
|
||||
if (refresh_window->end >= largest_bucketed_window.end)
|
||||
@ -175,12 +191,85 @@ compute_inscribed_bucketed_refresh_window(const InternalTimeRange *const refresh
|
||||
{
|
||||
/* The window is reduced to the beginning of the bucket, which contains the exclusive
|
||||
* end of the refresh window. */
|
||||
result.end =
|
||||
ts_time_bucket_by_type(bucket_width, refresh_window->end, refresh_window->type);
|
||||
result.end = ts_time_bucket_by_type_extended(bucket_width,
|
||||
refresh_window->end,
|
||||
refresh_window->type,
|
||||
NULL_DATUM,
|
||||
NULL_DATUM);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the offset as Datum value of an integer based bucket
|
||||
*/
|
||||
static Datum
|
||||
int_bucket_offset_to_datum(Oid type, const ContinuousAggsBucketFunction *bucket_function)
|
||||
{
|
||||
Assert(bucket_function->bucket_time_based == false);
|
||||
|
||||
switch (type)
|
||||
{
|
||||
case INT2OID:
|
||||
return Int16GetDatum(bucket_function->bucket_integer_offset);
|
||||
case INT4OID:
|
||||
return Int32GetDatum(bucket_function->bucket_integer_offset);
|
||||
case INT8OID:
|
||||
return Int64GetDatum(bucket_function->bucket_integer_offset);
|
||||
default:
|
||||
elog(ERROR, "invalid integer time_bucket type \"%s\"", format_type_be(type));
|
||||
pg_unreachable();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Get a NullableDatum for offset and origin based on the CAgg information
|
||||
*/
|
||||
static void
|
||||
fill_bucket_offset_origin(const ContinuousAgg *cagg, const InternalTimeRange *const refresh_window,
|
||||
NullableDatum *offset, NullableDatum *origin)
|
||||
{
|
||||
Assert(cagg != NULL);
|
||||
Assert(offset != NULL);
|
||||
Assert(origin != NULL);
|
||||
Assert(offset->isnull);
|
||||
Assert(origin->isnull);
|
||||
|
||||
if (cagg->bucket_function->bucket_time_based)
|
||||
{
|
||||
if (cagg->bucket_function->bucket_time_offset != NULL)
|
||||
{
|
||||
offset->isnull = false;
|
||||
offset->value = IntervalPGetDatum(cagg->bucket_function->bucket_time_offset);
|
||||
}
|
||||
|
||||
if (TIMESTAMP_NOT_FINITE(cagg->bucket_function->bucket_time_origin) == false)
|
||||
{
|
||||
origin->isnull = false;
|
||||
if (refresh_window->type == DATEOID)
|
||||
{
|
||||
/* Date was converted into a timestamp in process_additional_timebucket_parameter(),
|
||||
* build a Date again */
|
||||
origin->value = DirectFunctionCall1(timestamp_date,
|
||||
TimestampGetDatum(
|
||||
cagg->bucket_function->bucket_time_origin));
|
||||
}
|
||||
else
|
||||
{
|
||||
origin->value = TimestampGetDatum(cagg->bucket_function->bucket_time_origin);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (cagg->bucket_function->bucket_integer_offset != 0)
|
||||
{
|
||||
offset->isnull = false;
|
||||
offset->value = int_bucket_offset_to_datum(refresh_window->type, cagg->bucket_function);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Adjust the refresh window to align with circumscribed buckets, so it includes buckets, which
|
||||
* fully cover the refresh window.
|
||||
@ -209,9 +298,13 @@ compute_inscribed_bucketed_refresh_window(const InternalTimeRange *const refresh
|
||||
* dropping chunks manually or as part of retention policy.
|
||||
*/
|
||||
static InternalTimeRange
|
||||
compute_circumscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window,
|
||||
compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *const refresh_window,
|
||||
const ContinuousAggsBucketFunction *bucket_function)
|
||||
{
|
||||
Assert(cagg != NULL);
|
||||
Assert(cagg->bucket_function != NULL);
|
||||
|
||||
if (bucket_function->bucket_fixed_interval == false)
|
||||
{
|
||||
InternalTimeRange result = *refresh_window;
|
||||
@ -229,6 +322,14 @@ compute_circumscribed_bucketed_refresh_window(const InternalTimeRange *const ref
|
||||
InternalTimeRange largest_bucketed_window =
|
||||
get_largest_bucketed_window(refresh_window->type, bucket_width);
|
||||
|
||||
/* Get offset and origin for bucket function */
|
||||
NullableDatum offset = INIT_NULL_DATUM;
|
||||
NullableDatum origin = INIT_NULL_DATUM;
|
||||
fill_bucket_offset_origin(cagg, refresh_window, &offset, &origin);
|
||||
|
||||
/* Defined offset and origin in one function is not supported */
|
||||
Assert(offset.isnull == true || origin.isnull == true);
|
||||
|
||||
if (refresh_window->start <= largest_bucketed_window.start)
|
||||
{
|
||||
result.start = largest_bucketed_window.start;
|
||||
@ -237,8 +338,11 @@ compute_circumscribed_bucketed_refresh_window(const InternalTimeRange *const ref
|
||||
{
|
||||
/* For alignment with a bucket, which includes the start of the refresh window, we just
|
||||
* need to get start of the bucket. */
|
||||
result.start =
|
||||
ts_time_bucket_by_type(bucket_width, refresh_window->start, refresh_window->type);
|
||||
result.start = ts_time_bucket_by_type_extended(bucket_width,
|
||||
refresh_window->start,
|
||||
refresh_window->type,
|
||||
offset,
|
||||
origin);
|
||||
}
|
||||
|
||||
if (refresh_window->end >= largest_bucketed_window.end)
|
||||
@ -256,7 +360,11 @@ compute_circumscribed_bucketed_refresh_window(const InternalTimeRange *const ref
|
||||
* bucketing in case we're already at the end of the bucket (we don't
|
||||
* want to add an extra bucket). */
|
||||
exclusive_end = ts_time_saturating_sub(refresh_window->end, 1, refresh_window->type);
|
||||
bucketed_end = ts_time_bucket_by_type(bucket_width, exclusive_end, refresh_window->type);
|
||||
bucketed_end = ts_time_bucket_by_type_extended(bucket_width,
|
||||
exclusive_end,
|
||||
refresh_window->type,
|
||||
offset,
|
||||
origin);
|
||||
|
||||
/* We get the time value for the start of the bucket, so need to add
|
||||
* bucket_width to get the end of it. */
|
||||
@ -310,6 +418,7 @@ continuous_agg_refresh_execute(const CaggRefreshState *refresh,
|
||||
Assert(time_dim != NULL);
|
||||
|
||||
continuous_agg_update_materialization(refresh->cagg_ht,
|
||||
&refresh->cagg,
|
||||
refresh->partial_view,
|
||||
cagg_hypertable_name,
|
||||
&time_dim->fd.column_name,
|
||||
@ -382,7 +491,8 @@ update_merged_refresh_window(const InternalTimeRange *bucketed_refresh_window,
|
||||
}
|
||||
|
||||
static long
|
||||
continuous_agg_scan_refresh_window_ranges(const InternalTimeRange *refresh_window,
|
||||
continuous_agg_scan_refresh_window_ranges(const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *refresh_window,
|
||||
const InvalidationStore *invalidations,
|
||||
const ContinuousAggsBucketFunction *bucket_function,
|
||||
const CaggRefreshCallContext callctx,
|
||||
@ -408,6 +518,7 @@ continuous_agg_scan_refresh_window_ranges(const InternalTimeRange *refresh_windo
|
||||
slot,
|
||||
Anum_continuous_aggs_materialization_invalidation_log_greatest_modified_value,
|
||||
&isnull);
|
||||
|
||||
InternalTimeRange invalidation = {
|
||||
.type = refresh_window->type,
|
||||
.start = DatumGetInt64(start),
|
||||
@ -417,7 +528,7 @@ continuous_agg_scan_refresh_window_ranges(const InternalTimeRange *refresh_windo
|
||||
};
|
||||
|
||||
InternalTimeRange bucketed_refresh_window =
|
||||
compute_circumscribed_bucketed_refresh_window(&invalidation, bucket_function);
|
||||
compute_circumscribed_bucketed_refresh_window(cagg, &invalidation, bucket_function);
|
||||
|
||||
(*exec_func)(&bucketed_refresh_window, callctx, count, func_arg1, func_arg2);
|
||||
|
||||
@ -498,7 +609,8 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
|
||||
else
|
||||
{
|
||||
long count pg_attribute_unused();
|
||||
count = continuous_agg_scan_refresh_window_ranges(refresh_window,
|
||||
count = continuous_agg_scan_refresh_window_ranges(cagg,
|
||||
refresh_window,
|
||||
invalidations,
|
||||
cagg->bucket_function,
|
||||
callctx,
|
||||
@ -597,14 +709,16 @@ emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshCallContext c
|
||||
}
|
||||
|
||||
void
|
||||
continuous_agg_calculate_merged_refresh_window(const InternalTimeRange *refresh_window,
|
||||
continuous_agg_calculate_merged_refresh_window(const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *refresh_window,
|
||||
const InvalidationStore *invalidations,
|
||||
const ContinuousAggsBucketFunction *bucket_function,
|
||||
InternalTimeRange *merged_refresh_window,
|
||||
const CaggRefreshCallContext callctx)
|
||||
{
|
||||
long count pg_attribute_unused();
|
||||
count = continuous_agg_scan_refresh_window_ranges(refresh_window,
|
||||
count = continuous_agg_scan_refresh_window_ranges(cagg,
|
||||
refresh_window,
|
||||
invalidations,
|
||||
bucket_function,
|
||||
callctx,
|
||||
@ -721,7 +835,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
|
||||
int64 bucket_width = ts_continuous_agg_fixed_bucket_width(cagg->bucket_function);
|
||||
Assert(bucket_width > 0);
|
||||
refresh_window =
|
||||
compute_inscribed_bucketed_refresh_window(refresh_window_arg, bucket_width);
|
||||
compute_inscribed_bucketed_refresh_window(cagg, refresh_window_arg, bucket_width);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -14,9 +14,9 @@
|
||||
|
||||
extern Datum continuous_agg_refresh(PG_FUNCTION_ARGS);
|
||||
extern void continuous_agg_calculate_merged_refresh_window(
|
||||
const InternalTimeRange *refresh_window, const InvalidationStore *invalidations,
|
||||
const ContinuousAggsBucketFunction *bucket_function, InternalTimeRange *merged_refresh_window,
|
||||
const CaggRefreshCallContext callctx);
|
||||
const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
|
||||
const InvalidationStore *invalidations, const ContinuousAggsBucketFunction *bucket_function,
|
||||
InternalTimeRange *merged_refresh_window, const CaggRefreshCallContext callctx);
|
||||
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
|
||||
const InternalTimeRange *refresh_window,
|
||||
const CaggRefreshCallContext callctx,
|
||||
|
@ -1811,29 +1811,36 @@ SELECT * FROM cashflows;
|
||||
-- 3. test named ts
|
||||
-- 4. test named bucket width
|
||||
-- named origin
|
||||
-- Currently not supported due to a bug in time_bucket (see comment in cagg_validate_query)
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg_named_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named timezone
|
||||
CREATE MATERIALIZED VIEW cagg_named_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named ts
|
||||
CREATE MATERIALIZED VIEW cagg_named_ts_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', ts => time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named bucket width
|
||||
CREATE MATERIALIZED VIEW cagg_named_all WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket(bucket_width => '1h', ts => time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
\set ON_ERROR_STOP 1
|
||||
-- Refreshing from the beginning (NULL) of a CAGG with variable time bucket and
|
||||
-- using an INTERVAL for the end timestamp (issue #5534)
|
||||
CREATE MATERIALIZED VIEW transactions_montly
|
||||
@ -1916,7 +1923,7 @@ CREATE TABLE conditions (
|
||||
SELECT create_hypertable('conditions', 'time');
|
||||
create_hypertable
|
||||
--------------------------
|
||||
(52,public,conditions,t)
|
||||
(48,public,conditions,t)
|
||||
(1 row)
|
||||
|
||||
INSERT INTO conditions VALUES ( '2018-01-01 09:20:00-08', 'SFO', 55);
|
||||
@ -1946,10 +1953,10 @@ WITH NO DATA;
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53;
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49;
|
||||
|
||||
-- Should return NO ROWS
|
||||
SELECT * FROM conditions_daily ORDER BY bucket, avg;
|
||||
@ -1966,17 +1973,17 @@ ALTER MATERIALIZED VIEW conditions_daily SET (timescaledb.materialized_only=fals
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53
|
||||
WHERE _materialized_hypertable_53.bucket < COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(53)), '-infinity'::timestamp with time zone)
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49
|
||||
WHERE _materialized_hypertable_49.bucket < COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(49)), '-infinity'::timestamp with time zone)
|
||||
UNION ALL
|
||||
SELECT conditions.location,
|
||||
time_bucket('@ 1 day'::interval, conditions."time") AS bucket,
|
||||
avg(conditions.temperature) AS avg
|
||||
FROM conditions
|
||||
WHERE conditions."time" >= COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(53)), '-infinity'::timestamp with time zone)
|
||||
WHERE conditions."time" >= COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(49)), '-infinity'::timestamp with time zone)
|
||||
GROUP BY conditions.location, (time_bucket('@ 1 day'::interval, conditions."time"));
|
||||
|
||||
-- Should return ROWS because now it is realtime
|
||||
@ -2001,10 +2008,10 @@ ALTER MATERIALIZED VIEW conditions_daily SET (timescaledb.materialized_only=true
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53;
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49;
|
||||
|
||||
CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
|
||||
SELECT * FROM conditions_daily ORDER BY bucket, avg;
|
||||
|
@ -1811,29 +1811,36 @@ SELECT * FROM cashflows;
|
||||
-- 3. test named ts
|
||||
-- 4. test named bucket width
|
||||
-- named origin
|
||||
-- Currently not supported due to a bug in time_bucket (see comment in cagg_validate_query)
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg_named_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named timezone
|
||||
CREATE MATERIALIZED VIEW cagg_named_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named ts
|
||||
CREATE MATERIALIZED VIEW cagg_named_ts_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', ts => time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named bucket width
|
||||
CREATE MATERIALIZED VIEW cagg_named_all WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket(bucket_width => '1h', ts => time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
\set ON_ERROR_STOP 1
|
||||
-- Refreshing from the beginning (NULL) of a CAGG with variable time bucket and
|
||||
-- using an INTERVAL for the end timestamp (issue #5534)
|
||||
CREATE MATERIALIZED VIEW transactions_montly
|
||||
@ -1916,7 +1923,7 @@ CREATE TABLE conditions (
|
||||
SELECT create_hypertable('conditions', 'time');
|
||||
create_hypertable
|
||||
--------------------------
|
||||
(52,public,conditions,t)
|
||||
(48,public,conditions,t)
|
||||
(1 row)
|
||||
|
||||
INSERT INTO conditions VALUES ( '2018-01-01 09:20:00-08', 'SFO', 55);
|
||||
@ -1946,10 +1953,10 @@ WITH NO DATA;
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53;
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49;
|
||||
|
||||
-- Should return NO ROWS
|
||||
SELECT * FROM conditions_daily ORDER BY bucket, avg;
|
||||
@ -1966,17 +1973,17 @@ ALTER MATERIALIZED VIEW conditions_daily SET (timescaledb.materialized_only=fals
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53
|
||||
WHERE _materialized_hypertable_53.bucket < COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(53)), '-infinity'::timestamp with time zone)
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49
|
||||
WHERE _materialized_hypertable_49.bucket < COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(49)), '-infinity'::timestamp with time zone)
|
||||
UNION ALL
|
||||
SELECT conditions.location,
|
||||
time_bucket('@ 1 day'::interval, conditions."time") AS bucket,
|
||||
avg(conditions.temperature) AS avg
|
||||
FROM conditions
|
||||
WHERE conditions."time" >= COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(53)), '-infinity'::timestamp with time zone)
|
||||
WHERE conditions."time" >= COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(49)), '-infinity'::timestamp with time zone)
|
||||
GROUP BY conditions.location, (time_bucket('@ 1 day'::interval, conditions."time"));
|
||||
|
||||
-- Should return ROWS because now it is realtime
|
||||
@ -2001,10 +2008,10 @@ ALTER MATERIALIZED VIEW conditions_daily SET (timescaledb.materialized_only=true
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53;
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49;
|
||||
|
||||
CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
|
||||
SELECT * FROM conditions_daily ORDER BY bucket, avg;
|
||||
|
@ -1811,29 +1811,36 @@ SELECT * FROM cashflows;
|
||||
-- 3. test named ts
|
||||
-- 4. test named bucket width
|
||||
-- named origin
|
||||
-- Currently not supported due to a bug in time_bucket (see comment in cagg_validate_query)
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg_named_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named timezone
|
||||
CREATE MATERIALIZED VIEW cagg_named_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named ts
|
||||
CREATE MATERIALIZED VIEW cagg_named_ts_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', ts => time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named bucket width
|
||||
CREATE MATERIALIZED VIEW cagg_named_all WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket(bucket_width => '1h', ts => time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
\set ON_ERROR_STOP 1
|
||||
-- Refreshing from the beginning (NULL) of a CAGG with variable time bucket and
|
||||
-- using an INTERVAL for the end timestamp (issue #5534)
|
||||
CREATE MATERIALIZED VIEW transactions_montly
|
||||
@ -1916,7 +1923,7 @@ CREATE TABLE conditions (
|
||||
SELECT create_hypertable('conditions', 'time');
|
||||
create_hypertable
|
||||
--------------------------
|
||||
(52,public,conditions,t)
|
||||
(48,public,conditions,t)
|
||||
(1 row)
|
||||
|
||||
INSERT INTO conditions VALUES ( '2018-01-01 09:20:00-08', 'SFO', 55);
|
||||
@ -1946,10 +1953,10 @@ WITH NO DATA;
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53;
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49;
|
||||
|
||||
-- Should return NO ROWS
|
||||
SELECT * FROM conditions_daily ORDER BY bucket, avg;
|
||||
@ -1966,17 +1973,17 @@ ALTER MATERIALIZED VIEW conditions_daily SET (timescaledb.materialized_only=fals
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53
|
||||
WHERE _materialized_hypertable_53.bucket < COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(53)), '-infinity'::timestamp with time zone)
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49
|
||||
WHERE _materialized_hypertable_49.bucket < COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(49)), '-infinity'::timestamp with time zone)
|
||||
UNION ALL
|
||||
SELECT conditions.location,
|
||||
time_bucket('@ 1 day'::interval, conditions."time") AS bucket,
|
||||
avg(conditions.temperature) AS avg
|
||||
FROM conditions
|
||||
WHERE conditions."time" >= COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(53)), '-infinity'::timestamp with time zone)
|
||||
WHERE conditions."time" >= COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(49)), '-infinity'::timestamp with time zone)
|
||||
GROUP BY conditions.location, (time_bucket('@ 1 day'::interval, conditions."time"));
|
||||
|
||||
-- Should return ROWS because now it is realtime
|
||||
@ -2001,10 +2008,10 @@ ALTER MATERIALIZED VIEW conditions_daily SET (timescaledb.materialized_only=true
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53;
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49;
|
||||
|
||||
CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
|
||||
SELECT * FROM conditions_daily ORDER BY bucket, avg;
|
||||
|
@ -1811,29 +1811,36 @@ SELECT * FROM cashflows;
|
||||
-- 3. test named ts
|
||||
-- 4. test named bucket width
|
||||
-- named origin
|
||||
-- Currently not supported due to a bug in time_bucket (see comment in cagg_validate_query)
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg_named_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named timezone
|
||||
CREATE MATERIALIZED VIEW cagg_named_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named ts
|
||||
CREATE MATERIALIZED VIEW cagg_named_ts_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', ts => time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- named bucket width
|
||||
CREATE MATERIALIZED VIEW cagg_named_all WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket(bucket_width => '1h', ts => time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
\set ON_ERROR_STOP 1
|
||||
-- Refreshing from the beginning (NULL) of a CAGG with variable time bucket and
|
||||
-- using an INTERVAL for the end timestamp (issue #5534)
|
||||
CREATE MATERIALIZED VIEW transactions_montly
|
||||
@ -1916,7 +1923,7 @@ CREATE TABLE conditions (
|
||||
SELECT create_hypertable('conditions', 'time');
|
||||
create_hypertable
|
||||
--------------------------
|
||||
(52,public,conditions,t)
|
||||
(48,public,conditions,t)
|
||||
(1 row)
|
||||
|
||||
INSERT INTO conditions VALUES ( '2018-01-01 09:20:00-08', 'SFO', 55);
|
||||
@ -1949,7 +1956,7 @@ View definition:
|
||||
SELECT location,
|
||||
bucket,
|
||||
avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53;
|
||||
FROM _timescaledb_internal._materialized_hypertable_49;
|
||||
|
||||
-- Should return NO ROWS
|
||||
SELECT * FROM conditions_daily ORDER BY bucket, avg;
|
||||
@ -1966,17 +1973,17 @@ ALTER MATERIALIZED VIEW conditions_daily SET (timescaledb.materialized_only=fals
|
||||
bucket | timestamp with time zone | | | | plain |
|
||||
avg | double precision | | | | plain |
|
||||
View definition:
|
||||
SELECT _materialized_hypertable_53.location,
|
||||
_materialized_hypertable_53.bucket,
|
||||
_materialized_hypertable_53.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53
|
||||
WHERE _materialized_hypertable_53.bucket < COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(53)), '-infinity'::timestamp with time zone)
|
||||
SELECT _materialized_hypertable_49.location,
|
||||
_materialized_hypertable_49.bucket,
|
||||
_materialized_hypertable_49.avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_49
|
||||
WHERE _materialized_hypertable_49.bucket < COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(49)), '-infinity'::timestamp with time zone)
|
||||
UNION ALL
|
||||
SELECT conditions.location,
|
||||
time_bucket('@ 1 day'::interval, conditions."time") AS bucket,
|
||||
avg(conditions.temperature) AS avg
|
||||
FROM conditions
|
||||
WHERE conditions."time" >= COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(53)), '-infinity'::timestamp with time zone)
|
||||
WHERE conditions."time" >= COALESCE(_timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(49)), '-infinity'::timestamp with time zone)
|
||||
GROUP BY conditions.location, (time_bucket('@ 1 day'::interval, conditions."time"));
|
||||
|
||||
-- Should return ROWS because now it is realtime
|
||||
@ -2004,7 +2011,7 @@ View definition:
|
||||
SELECT location,
|
||||
bucket,
|
||||
avg
|
||||
FROM _timescaledb_internal._materialized_hypertable_53;
|
||||
FROM _timescaledb_internal._materialized_hypertable_49;
|
||||
|
||||
CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
|
||||
SELECT * FROM conditions_daily ORDER BY bucket, avg;
|
||||
|
@ -85,13 +85,6 @@ Select max(temperature)
|
||||
from conditions
|
||||
group by time_bucket('1week', timec) , time_bucket('1month', timec), location WITH NO DATA;
|
||||
ERROR: continuous aggregate view cannot contain multiple time bucket functions
|
||||
--time_bucket using additional args
|
||||
CREATE MATERIALIZED VIEW mat_m1 WITH (timescaledb.continuous, timescaledb.materialized_only=false)
|
||||
AS
|
||||
Select max(temperature)
|
||||
from conditions
|
||||
group by time_bucket( INTERVAL '5 minutes', timec, INTERVAL '-2.5 minutes') , location WITH NO DATA;
|
||||
ERROR: continuous aggregate view must include a valid time bucket function
|
||||
--time_bucket using non-const for first argument
|
||||
CREATE MATERIALIZED VIEW mat_m1 WITH (timescaledb.continuous, timescaledb.materialized_only=false)
|
||||
AS
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -467,18 +467,11 @@ SELECT * FROM cagg2;
|
||||
Sat Jan 01 00:00:00 2000 PST
|
||||
(2 rows)
|
||||
|
||||
-- custom origin
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
NOTICE: refreshing continuous aggregate "cagg3"
|
||||
SELECT * FROM cagg3;
|
||||
time_bucket
|
||||
------------------------------
|
||||
Wed Dec 01 00:00:00 1999 PST
|
||||
Sat Jan 01 00:00:00 2000 PST
|
||||
(2 rows)
|
||||
|
||||
-- offset not supported atm
|
||||
-- custom origin - not supported due to variable size
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- offset - not supported due to variable size
|
||||
CREATE MATERIALIZED VIEW cagg4 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', "offset":= INTERVAL '15 day') FROM metrics GROUP BY 1;
|
||||
ERROR: continuous aggregate view must include a valid time bucket function
|
||||
\set ON_ERROR_STOP 1
|
||||
@ -506,8 +499,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
----------------+-----------
|
||||
cagg1 | t
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
-- dropping chunk should also remove the catalog data
|
||||
SELECT drop_chunks('metrics', older_than => '2000-01-01 00:00:00-02'::timestamptz);
|
||||
@ -540,8 +532,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
----------------+-----------
|
||||
cagg1 | f
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
-- cagg1 now is in the old format (finalized=false)
|
||||
-- dropping chunk should NOT remove the catalog data
|
||||
@ -573,8 +564,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
user_view_name | finalized
|
||||
----------------+-----------
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(2 rows)
|
||||
(1 row)
|
||||
|
||||
-- dropping chunk should remove the catalog data
|
||||
SELECT drop_chunks('metrics', older_than => '2000-01-25 00:00:00-02'::timestamptz);
|
||||
|
@ -467,18 +467,11 @@ SELECT * FROM cagg2;
|
||||
Sat Jan 01 00:00:00 2000 PST
|
||||
(2 rows)
|
||||
|
||||
-- custom origin
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
NOTICE: refreshing continuous aggregate "cagg3"
|
||||
SELECT * FROM cagg3;
|
||||
time_bucket
|
||||
------------------------------
|
||||
Wed Dec 01 00:00:00 1999 PST
|
||||
Sat Jan 01 00:00:00 2000 PST
|
||||
(2 rows)
|
||||
|
||||
-- offset not supported atm
|
||||
-- custom origin - not supported due to variable size
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- offset - not supported due to variable size
|
||||
CREATE MATERIALIZED VIEW cagg4 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', "offset":= INTERVAL '15 day') FROM metrics GROUP BY 1;
|
||||
ERROR: continuous aggregate view must include a valid time bucket function
|
||||
\set ON_ERROR_STOP 1
|
||||
@ -506,8 +499,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
----------------+-----------
|
||||
cagg1 | t
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
-- dropping chunk should also remove the catalog data
|
||||
SELECT drop_chunks('metrics', older_than => '2000-01-01 00:00:00-02'::timestamptz);
|
||||
@ -540,8 +532,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
----------------+-----------
|
||||
cagg1 | f
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
-- cagg1 now is in the old format (finalized=false)
|
||||
-- dropping chunk should NOT remove the catalog data
|
||||
@ -573,8 +564,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
user_view_name | finalized
|
||||
----------------+-----------
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(2 rows)
|
||||
(1 row)
|
||||
|
||||
-- dropping chunk should remove the catalog data
|
||||
SELECT drop_chunks('metrics', older_than => '2000-01-25 00:00:00-02'::timestamptz);
|
||||
|
@ -467,18 +467,11 @@ SELECT * FROM cagg2;
|
||||
Sat Jan 01 00:00:00 2000 PST
|
||||
(2 rows)
|
||||
|
||||
-- custom origin
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
NOTICE: refreshing continuous aggregate "cagg3"
|
||||
SELECT * FROM cagg3;
|
||||
time_bucket
|
||||
------------------------------
|
||||
Wed Dec 01 00:00:00 1999 PST
|
||||
Sat Jan 01 00:00:00 2000 PST
|
||||
(2 rows)
|
||||
|
||||
-- offset not supported atm
|
||||
-- custom origin - not supported due to variable size
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- offset - not supported due to variable size
|
||||
CREATE MATERIALIZED VIEW cagg4 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', "offset":= INTERVAL '15 day') FROM metrics GROUP BY 1;
|
||||
ERROR: continuous aggregate view must include a valid time bucket function
|
||||
\set ON_ERROR_STOP 1
|
||||
@ -506,8 +499,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
----------------+-----------
|
||||
cagg1 | t
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
-- dropping chunk should also remove the catalog data
|
||||
SELECT drop_chunks('metrics', older_than => '2000-01-01 00:00:00-02'::timestamptz);
|
||||
@ -540,8 +532,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
----------------+-----------
|
||||
cagg1 | f
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
-- cagg1 now is in the old format (finalized=false)
|
||||
-- dropping chunk should NOT remove the catalog data
|
||||
@ -573,8 +564,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
user_view_name | finalized
|
||||
----------------+-----------
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(2 rows)
|
||||
(1 row)
|
||||
|
||||
-- dropping chunk should remove the catalog data
|
||||
SELECT drop_chunks('metrics', older_than => '2000-01-25 00:00:00-02'::timestamptz);
|
||||
|
@ -467,18 +467,11 @@ SELECT * FROM cagg2;
|
||||
Sat Jan 01 00:00:00 2000 PST
|
||||
(2 rows)
|
||||
|
||||
-- custom origin
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
NOTICE: refreshing continuous aggregate "cagg3"
|
||||
SELECT * FROM cagg3;
|
||||
time_bucket
|
||||
------------------------------
|
||||
Wed Dec 01 00:00:00 1999 PST
|
||||
Sat Jan 01 00:00:00 2000 PST
|
||||
(2 rows)
|
||||
|
||||
-- offset not supported atm
|
||||
-- custom origin - not supported due to variable size
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
ERROR: cannot create continuous aggregate with variable-width bucket using offset or origin.
|
||||
-- offset - not supported due to variable size
|
||||
CREATE MATERIALIZED VIEW cagg4 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', "offset":= INTERVAL '15 day') FROM metrics GROUP BY 1;
|
||||
ERROR: continuous aggregate view must include a valid time bucket function
|
||||
\set ON_ERROR_STOP 1
|
||||
@ -506,8 +499,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
----------------+-----------
|
||||
cagg1 | t
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
-- dropping chunk should also remove the catalog data
|
||||
SELECT drop_chunks('metrics', older_than => '2000-01-01 00:00:00-02'::timestamptz);
|
||||
@ -540,8 +532,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
----------------+-----------
|
||||
cagg1 | f
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
-- cagg1 now is in the old format (finalized=false)
|
||||
-- dropping chunk should NOT remove the catalog data
|
||||
@ -573,8 +564,7 @@ SELECT user_view_name, finalized FROM _timescaledb_catalog.continuous_agg WHERE
|
||||
user_view_name | finalized
|
||||
----------------+-----------
|
||||
cagg2 | t
|
||||
cagg3 | t
|
||||
(2 rows)
|
||||
(1 row)
|
||||
|
||||
-- dropping chunk should remove the catalog data
|
||||
SELECT drop_chunks('metrics', older_than => '2000-01-25 00:00:00-02'::timestamptz);
|
||||
|
@ -130,23 +130,23 @@ SELECT * FROM cagg_validate_query($$ SELECT relkind, count(*) FROM pg_catalog.pg
|
||||
|
||||
-- time_bucket with offset is not allowed
|
||||
SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", "offset" => '-1 minute'::interval), count(*) FROM metrics GROUP BY 1 $$);
|
||||
is_valid | error_level | error_code | error_message | error_detail | error_hint
|
||||
----------+-------------+------------+---------------------------------------------------------------------+--------------+------------
|
||||
f | ERROR | XX000 | continuous aggregate view must include a valid time bucket function | |
|
||||
is_valid | error_level | error_code | error_message | error_detail | error_hint
|
||||
----------+-------------+------------+---------------+--------------+------------
|
||||
t | | | | |
|
||||
(1 row)
|
||||
|
||||
-- time_bucket with origin is not allowed
|
||||
SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", origin => '2023-01-01'::timestamptz), count(*) FROM metrics GROUP BY 1 $$);
|
||||
is_valid | error_level | error_code | error_message | error_detail | error_hint
|
||||
----------+-------------+------------+---------------------------------------------------------------------+--------------+------------
|
||||
f | ERROR | XX000 | continuous aggregate view must include a valid time bucket function | |
|
||||
is_valid | error_level | error_code | error_message | error_detail | error_hint
|
||||
----------+-------------+------------+---------------+--------------+------------
|
||||
t | | | | |
|
||||
(1 row)
|
||||
|
||||
-- time_bucket with origin is not allowed
|
||||
SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", origin => '2023-01-01'::timestamptz), count(*) FROM metrics GROUP BY 1 $$);
|
||||
is_valid | error_level | error_code | error_message | error_detail | error_hint
|
||||
----------+-------------+------------+---------------------------------------------------------------------+--------------+------------
|
||||
f | ERROR | XX000 | continuous aggregate view must include a valid time bucket function | |
|
||||
is_valid | error_level | error_code | error_message | error_detail | error_hint
|
||||
----------+-------------+------------+---------------+--------------+------------
|
||||
t | | | | |
|
||||
(1 row)
|
||||
|
||||
-- time_bucket_gapfill is not allowed
|
||||
|
@ -1163,17 +1163,22 @@ SELECT * FROM cashflows;
|
||||
-- 3. test named ts
|
||||
-- 4. test named bucket width
|
||||
-- named origin
|
||||
|
||||
-- Currently not supported due to a bug in time_bucket (see comment in cagg_validate_query)
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg_named_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
|
||||
-- named timezone
|
||||
CREATE MATERIALIZED VIEW cagg_named_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT time_bucket('1h', time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
|
||||
-- named ts
|
||||
CREATE MATERIALIZED VIEW cagg_named_ts_tz_origin WITH
|
||||
(timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
@ -1186,6 +1191,7 @@ CREATE MATERIALIZED VIEW cagg_named_all WITH
|
||||
SELECT time_bucket(bucket_width => '1h', ts => time, timezone => 'UTC', origin => '2001-01-03 01:23:45') AS bucket,
|
||||
avg(amount) as avg_amount
|
||||
FROM transactions GROUP BY 1 WITH NO DATA;
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
-- Refreshing from the beginning (NULL) of a CAGG with variable time bucket and
|
||||
-- using an INTERVAL for the end timestamp (issue #5534)
|
||||
|
@ -81,13 +81,6 @@ Select max(temperature)
|
||||
from conditions
|
||||
group by time_bucket('1week', timec) , time_bucket('1month', timec), location WITH NO DATA;
|
||||
|
||||
--time_bucket using additional args
|
||||
CREATE MATERIALIZED VIEW mat_m1 WITH (timescaledb.continuous, timescaledb.materialized_only=false)
|
||||
AS
|
||||
Select max(temperature)
|
||||
from conditions
|
||||
group by time_bucket( INTERVAL '5 minutes', timec, INTERVAL '-2.5 minutes') , location WITH NO DATA;
|
||||
|
||||
--time_bucket using non-const for first argument
|
||||
CREATE MATERIALIZED VIEW mat_m1 WITH (timescaledb.continuous, timescaledb.materialized_only=false)
|
||||
AS
|
||||
|
@ -296,3 +296,638 @@ ORDER BY m1.location COLLATE "C" NULLS LAST, m1.timec DESC NULLS LAST, firsth NU
|
||||
LIMIT 10;
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
-----
|
||||
-- Tests with time_bucket and offset/origin
|
||||
-----
|
||||
CREATE TABLE temperature (
|
||||
time timestamptz NOT NULL,
|
||||
value float
|
||||
);
|
||||
|
||||
SELECT create_hypertable('temperature', 'time');
|
||||
|
||||
INSERT INTO temperature VALUES ('2000-01-01 01:00:00'::timestamptz, 5);
|
||||
|
||||
CREATE TABLE temperature_wo_tz (
|
||||
time timestamp NOT NULL,
|
||||
value float
|
||||
);
|
||||
|
||||
SELECT create_hypertable('temperature_wo_tz', 'time');
|
||||
|
||||
INSERT INTO temperature_wo_tz VALUES ('2000-01-01 01:00:00'::timestamp, 5);
|
||||
|
||||
CREATE TABLE temperature_date (
|
||||
time date NOT NULL,
|
||||
value float
|
||||
);
|
||||
|
||||
SELECT create_hypertable('temperature_date', 'time');
|
||||
|
||||
INSERT INTO temperature_date VALUES ('2000-01-01 01:00:00'::timestamp, 5);
|
||||
|
||||
-- Integer based tables
|
||||
CREATE TABLE table_smallint (
|
||||
time smallint,
|
||||
data smallint
|
||||
);
|
||||
|
||||
CREATE TABLE table_int (
|
||||
time int,
|
||||
data int
|
||||
);
|
||||
|
||||
CREATE TABLE table_bigint (
|
||||
time bigint,
|
||||
data bigint
|
||||
);
|
||||
|
||||
SELECT create_hypertable('table_smallint', 'time', chunk_time_interval => 10);
|
||||
SELECT create_hypertable('table_int', 'time', chunk_time_interval => 10);
|
||||
SELECT create_hypertable('table_bigint', 'time', chunk_time_interval => 10);
|
||||
|
||||
CREATE OR REPLACE FUNCTION integer_now_smallint() returns smallint LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM table_smallint $$;
|
||||
CREATE OR REPLACE FUNCTION integer_now_int() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM table_int $$;
|
||||
CREATE OR REPLACE FUNCTION integer_now_bigint() returns bigint LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM table_bigint $$;
|
||||
|
||||
SELECT set_integer_now_func('table_smallint', 'integer_now_smallint');
|
||||
SELECT set_integer_now_func('table_int', 'integer_now_int');
|
||||
SELECT set_integer_now_func('table_bigint', 'integer_now_bigint');
|
||||
|
||||
INSERT INTO table_smallint VALUES(1,2);
|
||||
INSERT INTO table_int VALUES(1,2);
|
||||
INSERT INTO table_bigint VALUES(1,2);
|
||||
|
||||
---
|
||||
-- Tests with CAgg creation
|
||||
---
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_offset
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, '30m'::interval), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_offset;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_offset2
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, "offset"=>'30m'::interval), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_offset2;
|
||||
|
||||
-- Variable buckets (timezone is provided) with offset are not supported at the moment
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_offset_ts
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, "offset"=>'30m'::interval, timezone=>'UTC'), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_origin
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, '2000-01-01 01:00:00 PST'::timestamptz), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_origin;
|
||||
|
||||
-- Using named parameter
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_origin2
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, origin=>'2000-01-01 01:00:00 PST'::timestamptz), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_origin2;
|
||||
|
||||
-- Variable buckets (timezone is provided) with origin are not supported at the moment
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_origin_ts
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, origin=>'2000-01-01 01:00:00 PST'::timestamptz, timezone=>'UTC'), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
|
||||
-- Without named parameter
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_origin_ts2
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, 'UTC', '2000-01-01 01:00:00 PST'::timestamptz), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
-- Timestamp based CAggs
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_wo_tz
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time), max(value)
|
||||
FROM temperature_wo_tz
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_origin_ts_wo_tz
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, '2000-01-01 01:00:00'::timestamp), max(value)
|
||||
FROM temperature_wo_tz
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_origin_ts_wo_tz;
|
||||
|
||||
-- Variable buckets (timezone is provided) with origin are not supported at the moment
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_origin_ts_wo_tz2
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, origin=>'2000-01-01 01:00:00'::timestamp), max(value)
|
||||
FROM temperature_wo_tz
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_offset_wo_tz
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, "offset"=>'30m'::interval), max(value)
|
||||
FROM temperature_wo_tz
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_offset_wo_tz;
|
||||
|
||||
-- Date based CAggs
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_date
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 days', time), max(value)
|
||||
FROM temperature_date
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_date;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_date_origin
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 days', time, '2000-01-01'::date), max(value)
|
||||
FROM temperature_date
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_date_origin;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_date_origin2
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 days', time, origin=>'2000-01-01'::date), max(value)
|
||||
FROM temperature_date
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_date_origin2;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_date_offset
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 days', time, "offset"=>'30m'::interval), max(value)
|
||||
FROM temperature_date
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_4_hours_date_offset;
|
||||
|
||||
-- Integer based CAggs
|
||||
CREATE MATERIALIZED VIEW cagg_smallint
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
|
||||
AS SELECT time_bucket('2', time), SUM(data) as value
|
||||
FROM table_smallint
|
||||
GROUP BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_smallint;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_smallint_offset
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
|
||||
AS SELECT time_bucket('2', time, "offset"=>1::smallint), SUM(data) as value
|
||||
FROM table_smallint
|
||||
GROUP BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_smallint_offset;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_int
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
|
||||
AS SELECT time_bucket('2', time), SUM(data) as value
|
||||
FROM table_int
|
||||
GROUP BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_int;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_int_offset
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
|
||||
AS SELECT time_bucket('2', time, "offset"=>1::int), SUM(data) as value
|
||||
FROM table_int
|
||||
GROUP BY 1;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_int_offset;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_bigint
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
|
||||
AS SELECT time_bucket('2', time), SUM(data) as value
|
||||
FROM table_bigint
|
||||
GROUP BY 1 WITH NO DATA;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_bigint;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_bigint_offset
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
|
||||
AS SELECT time_bucket('2', time, "offset"=>1::bigint), SUM(data) as value
|
||||
FROM table_bigint
|
||||
GROUP BY 1 WITH NO DATA;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_bigint_offset;
|
||||
|
||||
-- Without named parameter
|
||||
CREATE MATERIALIZED VIEW cagg_bigint_offset2
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
|
||||
AS SELECT time_bucket('2', time, 1::bigint), SUM(data) as value
|
||||
FROM table_bigint
|
||||
GROUP BY 1 WITH NO DATA;
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_bucket_function ORDER BY 1 DESC LIMIT 1;
|
||||
DROP MATERIALIZED VIEW cagg_bigint_offset2;
|
||||
|
||||
-- Test invalid bucket definitions
|
||||
\set ON_ERROR_STOP 0
|
||||
-- Offset and origin at the same time is not allowed (function does not exists)
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_offset_and_origin
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, "offset"=>'30m'::interval, origin=>'2000-01-01 01:00:00 PST'::timestamptz), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Offset and origin at the same time is not allowed (function does exists but invalid parameter combination)
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_offset_and_origin
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, "offset"=>'30m'::interval, origin=>'2000-01-01 01:00:00 PST'::timestamptz, timezone=>'UTC'), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
---
|
||||
-- Tests with CAgg processing
|
||||
---
|
||||
|
||||
-- Check used timezone
|
||||
SHOW timezone;
|
||||
|
||||
-- Populate it
|
||||
INSERT INTO temperature
|
||||
SELECT time, 5
|
||||
FROM generate_series('2000-01-01 01:00:00 PST'::timestamptz,
|
||||
'2000-01-01 23:59:59 PST','1m') time;
|
||||
|
||||
INSERT INTO temperature
|
||||
SELECT time, 6
|
||||
FROM generate_series('2020-01-01 00:00:00 PST'::timestamptz,
|
||||
'2020-01-01 23:59:59 PST','1m') time;
|
||||
|
||||
-- Create CAggs
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_offset
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, '30m'::interval), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Align origin with first value
|
||||
CREATE MATERIALIZED VIEW cagg_4_hours_origin
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('4 hour', time, '2000-01-01 01:00:00 PST'::timestamptz), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Query the CAggs and check that all buckets are materialized
|
||||
SELECT time_bucket('4 hour', time), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM cagg_4_hours;
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours SET (timescaledb.materialized_only=true);
|
||||
SELECT * FROM cagg_4_hours;
|
||||
|
||||
SELECT time_bucket('4 hour', time, '30m'::interval), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM cagg_4_hours_offset;
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_offset SET (timescaledb.materialized_only=true);
|
||||
SELECT * FROM cagg_4_hours_offset;
|
||||
|
||||
SELECT time_bucket('4 hour', time, '2000-01-01 01:00:00 PST'::timestamptz), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM cagg_4_hours_origin;
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_origin SET (timescaledb.materialized_only=true);
|
||||
SELECT * FROM cagg_4_hours_origin;
|
||||
|
||||
-- Update the last bucket and re-materialize
|
||||
INSERT INTO temperature values('2020-01-01 23:55:00 PST', 10);
|
||||
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours', NULL, NULL);
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours_offset', NULL, NULL);
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours_origin', NULL, NULL);
|
||||
|
||||
SELECT * FROM cagg_4_hours;
|
||||
SELECT * FROM cagg_4_hours_offset;
|
||||
SELECT * FROM cagg_4_hours_origin;
|
||||
|
||||
-- Check the real-time functionality
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours SET (timescaledb.materialized_only=false);
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_offset SET (timescaledb.materialized_only=false);
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_origin SET (timescaledb.materialized_only=false);
|
||||
|
||||
-- Check watermarks
|
||||
SELECT continuous_aggs_watermark.*, _timescaledb_functions.to_timestamp(watermark)
|
||||
FROM _timescaledb_catalog.continuous_aggs_watermark
|
||||
JOIN _timescaledb_catalog.continuous_agg USING (mat_hypertable_id)
|
||||
WHERE user_view_name LIKE 'cagg_4_hours%' ORDER BY mat_hypertable_id, watermark;
|
||||
|
||||
-- Insert new data
|
||||
INSERT INTO temperature values('2020-01-02 00:10:00 PST', 2222);
|
||||
INSERT INTO temperature values('2020-01-02 05:35:00 PST', 5555);
|
||||
INSERT INTO temperature values('2020-01-02 09:05:00 PST', 8888);
|
||||
|
||||
-- Watermark is at Thu Jan 02 00:00:00 2020 PST - all inserted tuples should be seen
|
||||
SELECT * FROM cagg_4_hours;
|
||||
|
||||
-- Watermark is at Thu Jan 02 00:30:00 2020 PST - only two inserted tuples should be seen
|
||||
SELECT * FROM cagg_4_hours_offset;
|
||||
|
||||
-- Watermark is at Thu Jan 02 01:00:00 2020 PST - only two inserted tuples should be seen
|
||||
SELECT * FROM cagg_4_hours_origin;
|
||||
|
||||
-- Update materialized data
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours', NULL, NULL);
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours_offset', NULL, NULL);
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours_origin', NULL, NULL);
|
||||
RESET client_min_messages;
|
||||
|
||||
-- Query the CAggs and check that all buckets are materialized
|
||||
SELECT * FROM cagg_4_hours;
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours SET (timescaledb.materialized_only=true);
|
||||
SELECT * FROM cagg_4_hours;
|
||||
SELECT time_bucket('4 hour', time), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
||||
SELECT * FROM cagg_4_hours_offset;
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_offset SET (timescaledb.materialized_only=true);
|
||||
SELECT * FROM cagg_4_hours_offset;
|
||||
SELECT time_bucket('4 hour', time, '30m'::interval), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
||||
SELECT * FROM cagg_4_hours_origin;
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_origin SET (timescaledb.materialized_only=true);
|
||||
SELECT * FROM cagg_4_hours_origin;
|
||||
SELECT time_bucket('4 hour', time, '2000-01-01 01:00:00 PST'::timestamptz), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Test invalidations
|
||||
TRUNCATE temperature;
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours', NULL, NULL);
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours_offset', NULL, NULL);
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours_origin', NULL, NULL);
|
||||
|
||||
INSERT INTO temperature
|
||||
SELECT time, 5
|
||||
FROM generate_series('2000-01-01 01:00:00 PST'::timestamptz,
|
||||
'2000-01-01 23:59:59 PST','1m') time;
|
||||
|
||||
INSERT INTO temperature
|
||||
SELECT time, 6
|
||||
FROM generate_series('2020-01-01 00:00:00 PST'::timestamptz,
|
||||
'2020-01-01 23:59:59 PST','1m') time;
|
||||
|
||||
INSERT INTO temperature values('2020-01-02 01:05:00+01', 2222);
|
||||
INSERT INTO temperature values('2020-01-02 01:35:00+01', 5555);
|
||||
INSERT INTO temperature values('2020-01-02 05:05:00+01', 8888);
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours', NULL, NULL);
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours_offset', NULL, NULL);
|
||||
CALL refresh_continuous_aggregate('cagg_4_hours_origin', NULL, NULL);
|
||||
RESET client_min_messages;
|
||||
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours SET (timescaledb.materialized_only=true);
|
||||
SELECT * FROM cagg_4_hours;
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours SET (timescaledb.materialized_only=false);
|
||||
SELECT * FROM cagg_4_hours;
|
||||
SELECT time_bucket('4 hour', time), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_offset SET (timescaledb.materialized_only=true);
|
||||
SELECT * FROM cagg_4_hours_offset;
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_offset SET (timescaledb.materialized_only=false);
|
||||
SELECT * FROM cagg_4_hours_offset;
|
||||
SELECT time_bucket('4 hour', time, '30m'::interval), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_origin SET (timescaledb.materialized_only=true);
|
||||
SELECT * FROM cagg_4_hours_origin;
|
||||
ALTER MATERIALIZED VIEW cagg_4_hours_origin SET (timescaledb.materialized_only=false);
|
||||
SELECT * FROM cagg_4_hours_origin;
|
||||
SELECT time_bucket('4 hour', time, '2000-01-01 01:00:00 PST'::timestamptz), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
||||
--- Test with variable width buckets (use February, since hourly origins are not supported with variable sized buckets)
|
||||
TRUNCATE temperature;
|
||||
INSERT INTO temperature
|
||||
SELECT time, 5
|
||||
FROM generate_series('2000-02-01 01:00:00 PST'::timestamptz,
|
||||
'2000-02-01 23:59:59 PST','1m') time;
|
||||
|
||||
INSERT INTO temperature
|
||||
SELECT time, 6
|
||||
FROM generate_series('2020-02-01 01:00:00 PST'::timestamptz,
|
||||
'2020-02-01 23:59:59 PST','1m') time;
|
||||
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1, 2, 3;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_1_year
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
|
||||
SELECT time_bucket('1 year', time), max(value)
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1, 2, 3;
|
||||
|
||||
---
|
||||
-- Tests with integer based hypertables
|
||||
---
|
||||
TRUNCATE table_int;
|
||||
|
||||
INSERT INTO table_int
|
||||
SELECT time, 5
|
||||
FROM generate_series(-50, 50) time;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_int
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=false)
|
||||
AS SELECT time_bucket('10', time), SUM(data) as value
|
||||
FROM table_int
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_int_offset
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=false)
|
||||
AS SELECT time_bucket('10', time, "offset"=>5), SUM(data) as value
|
||||
FROM table_int
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Compare bucketing results
|
||||
SELECT time_bucket('10', time), SUM(data) FROM table_int GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM cagg_int;
|
||||
|
||||
SELECT time_bucket('10', time, "offset"=>5), SUM(data) FROM table_int GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM cagg_int_offset;
|
||||
|
||||
-- Update table
|
||||
INSERT INTO table_int VALUES(51, 100);
|
||||
INSERT INTO table_int VALUES(100, 555);
|
||||
|
||||
-- Compare bucketing results
|
||||
SELECT time_bucket('10', time), SUM(data) FROM table_int GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM cagg_int;
|
||||
CALL refresh_continuous_aggregate('cagg_int', NULL, NULL);
|
||||
SELECT * FROM cagg_int;
|
||||
|
||||
SELECT time_bucket('10', time, "offset"=>5), SUM(data) FROM table_int GROUP BY 1 ORDER BY 1;
|
||||
SELECT * FROM cagg_int_offset; -- the value 100 is part of the already serialized bucket, so it should not be visible
|
||||
CALL refresh_continuous_aggregate('cagg_int_offset', NULL, NULL);
|
||||
SELECT * FROM cagg_int_offset;
|
||||
|
||||
-- Ensure everything was materialized
|
||||
ALTER MATERIALIZED VIEW cagg_int SET (timescaledb.materialized_only=true);
|
||||
ALTER MATERIALIZED VIEW cagg_int_offset SET (timescaledb.materialized_only=true);
|
||||
|
||||
SELECT * FROM cagg_int;
|
||||
SELECT * FROM cagg_int_offset;
|
||||
|
||||
-- Check that the refresh is properly aligned
|
||||
INSERT INTO table_int VALUES(114, 0);
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL refresh_continuous_aggregate('cagg_int_offset', 110, 130);
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT * FROM cagg_int_offset;
|
||||
SELECT time_bucket('10', time, "offset"=>5), SUM(data) FROM table_int GROUP BY 1 ORDER BY 1;
|
||||
|
||||
---
|
||||
-- Test with blocking a few broken configurations
|
||||
---
|
||||
\set ON_ERROR_STOP 0
|
||||
|
||||
-- Unfortunately '\set VERBOSITY verbose' cannot be used here to check the error details
|
||||
-- since it also prints the line number of the location, which is depended on the build
|
||||
|
||||
-- Variable sized buckets with origin are known to work incorrect. So, block usage for now.
|
||||
CREATE MATERIALIZED VIEW cagg_1_hour_variable_bucket_fixed_origin
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT time_bucket('1 year', time, origin=>'2000-01-01 01:05:00 UTC'::timestamptz, timezone=>'UTC') AS hour_bucket, max(value) AS max_value
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Variable due to the used timezone
|
||||
CREATE MATERIALIZED VIEW cagg_1_hour_variable_bucket_fixed_origin2
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT time_bucket('1 hour', time, origin=>'2000-01-01 01:05:00 UTC'::timestamptz, timezone=>'UTC') AS hour_bucket, max(value) AS max_value
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Variable with offset
|
||||
CREATE MATERIALIZED VIEW cagg_1_hour_variable_bucket_fixed_origin3
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT time_bucket('1 year', time, "offset"=>'5 minutes'::interval) AS hour_bucket, max(value) AS max_value
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
|
||||
-- Different time origin
|
||||
CREATE MATERIALIZED VIEW cagg_1_hour_origin
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT time_bucket('1 hour', time, origin=>'2000-01-02 01:00:00 PST'::timestamptz) AS hour_bucket, max(value) AS max_value
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_1_week_origin
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT time_bucket('1 week', hour_bucket, origin=>'2022-01-02 01:00:00 PST'::timestamptz) AS week_bucket, max(max_value) AS max_value
|
||||
FROM cagg_1_hour_origin
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Different time offset
|
||||
CREATE MATERIALIZED VIEW cagg_1_hour_offset
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT time_bucket('1 hour', time, "offset"=>'30m'::interval) AS hour_bucket, max(value) AS max_value
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_1_week_offset
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT time_bucket('1 week', hour_bucket, "offset"=>'35m'::interval) AS week_bucket, max(max_value) AS max_value
|
||||
FROM cagg_1_hour_offset
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Different integer offset
|
||||
CREATE MATERIALIZED VIEW cagg_int_offset_5
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=false)
|
||||
AS SELECT time_bucket('10', time, "offset"=>5) AS time, SUM(data) AS value
|
||||
FROM table_int
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_int_offset_10
|
||||
WITH (timescaledb.continuous, timescaledb.materialized_only=false)
|
||||
AS SELECT time_bucket('10', time, "offset"=>10) AS time, SUM(value) AS value
|
||||
FROM cagg_int_offset_5
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
DROP MATERIALIZED VIEW cagg_1_hour_origin;
|
||||
DROP MATERIALIZED VIEW cagg_1_hour_offset;
|
||||
DROP MATERIALIZED VIEW cagg_int_offset_5;
|
||||
|
||||
---
|
||||
-- CAGGs on CAGGs tests
|
||||
---
|
||||
CREATE MATERIALIZED VIEW cagg_1_hour_offset
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT time_bucket('1 hour', time, origin=>'2000-01-02 01:00:00 PST'::timestamptz) AS hour_bucket, max(value) AS max_value
|
||||
FROM temperature
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
CREATE MATERIALIZED VIEW cagg_1_week_offset
|
||||
WITH (timescaledb.continuous) AS
|
||||
SELECT time_bucket('1 week', hour_bucket, origin=>'2000-01-02 01:00:00 PST'::timestamptz) AS week_bucket, max(max_value) AS max_value
|
||||
FROM cagg_1_hour_offset
|
||||
GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Compare output
|
||||
SELECT * FROM cagg_1_week_offset;
|
||||
SELECT time_bucket('1 week', time, origin=>'2000-01-02 01:00:00 PST'::timestamptz), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
||||
INSERT INTO temperature values('2030-01-01 05:05:00 PST', 22222);
|
||||
INSERT INTO temperature values('2030-01-03 05:05:00 PST', 55555);
|
||||
|
||||
-- Compare real-time functionality
|
||||
ALTER MATERIALIZED VIEW cagg_1_hour_offset SET (timescaledb.materialized_only=false);
|
||||
ALTER MATERIALIZED VIEW cagg_1_week_offset SET (timescaledb.materialized_only=false);
|
||||
|
||||
SELECT * FROM cagg_1_week_offset;
|
||||
SELECT time_bucket('1 week', time, origin=>'2000-01-02 01:00:00 PST'::timestamptz), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
||||
-- Test refresh
|
||||
CALL refresh_continuous_aggregate('cagg_1_hour_offset', NULL, NULL);
|
||||
CALL refresh_continuous_aggregate('cagg_1_week_offset', NULL, NULL);
|
||||
|
||||
-- Everything should be now materailized
|
||||
ALTER MATERIALIZED VIEW cagg_1_hour_offset SET (timescaledb.materialized_only=false);
|
||||
ALTER MATERIALIZED VIEW cagg_1_week_offset SET (timescaledb.materialized_only=false);
|
||||
|
||||
SELECT * FROM cagg_1_week_offset;
|
||||
SELECT time_bucket('1 week', time, origin=>'2000-01-02 01:00:00 PST'::timestamptz), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
||||
TRUNCATE temperature;
|
||||
|
||||
SELECT * FROM cagg_1_week_offset;
|
||||
SELECT time_bucket('1 week', time, origin=>'2000-01-02 01:00:00 PST'::timestamptz), max(value) FROM temperature GROUP BY 1 ORDER BY 1;
|
||||
|
@ -301,12 +301,11 @@ SELECT * FROM cagg1;
|
||||
CREATE MATERIALIZED VIEW cagg2 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT') FROM metrics GROUP BY 1;
|
||||
SELECT * FROM cagg2;
|
||||
|
||||
-- custom origin
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
SELECT * FROM cagg3;
|
||||
|
||||
-- offset not supported atm
|
||||
-- custom origin - not supported due to variable size
|
||||
\set ON_ERROR_STOP 0
|
||||
CREATE MATERIALIZED VIEW cagg3 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', '2000-01-01'::timestamptz) FROM metrics GROUP BY 1;
|
||||
|
||||
-- offset - not supported due to variable size
|
||||
CREATE MATERIALIZED VIEW cagg4 WITH (timescaledb.continuous,timescaledb.materialized_only=true) AS SELECT time_bucket('1 month', time, 'PST8PDT', "offset":= INTERVAL '15 day') FROM metrics GROUP BY 1;
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user