diff --git a/src/continuous_agg.c b/src/continuous_agg.c index b68f42a3e..5c6caa5ad 100644 --- a/src/continuous_agg.c +++ b/src/continuous_agg.c @@ -30,6 +30,29 @@ #include #endif +static const WithClauseDefinition continuous_aggregate_with_clause_def[] = { + [ContinuousEnabled] = { + .arg_name = "continuous", + .type_id = BOOLOID, + .default_val = BoolGetDatum(false), + }, + [ContinuousViewOptionRefreshInterval] = { + .arg_name = "refresh_interval", + .type_id = INTERVALOID, + }, + [ContinuousViewOptionRefreshLag] = { + .arg_name = "refresh_lag", + .type_id = TEXTOID, + }, +}; + +WithClauseResult * +ts_continuous_agg_with_clause_parse(const List *defelems) +{ + return ts_with_clauses_parse(defelems, + continuous_aggregate_with_clause_def, + TS_ARRAY_LEN(continuous_aggregate_with_clause_def)); +} static void init_scan_by_mat_hypertable_id(ScanIterator *iterator, const int32 mat_hypertable_id) { diff --git a/src/continuous_agg.h b/src/continuous_agg.h index cba7829e7..958ec5df1 100644 --- a/src/continuous_agg.h +++ b/src/continuous_agg.h @@ -6,25 +6,38 @@ #ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_CONTIGUOUS_AGG_H #define TIMESCALEDB_TSL_CONTINUOUS_AGGS_CONTIGUOUS_AGG_H #include +#include #include +#include "with_clause_parser.h" +#include "compat.h" #define CAGGINVAL_TRIGGER_NAME "ts_cagg_invalidation_trigger" +typedef enum ContinuousAggViewOption +{ + ContinuousEnabled = 0, + ContinuousViewOptionRefreshLag, + ContinuousViewOptionRefreshInterval, +} ContinuousAggViewOption; + +extern TSDLLEXPORT WithClauseResult *ts_continuous_agg_with_clause_parse(const List *defelems); + typedef struct ContinuousAgg { FormData_continuous_agg data; } ContinuousAgg; -extern ContinuousAgg *ts_continuous_agg_find_by_view_name(const char *schema, const char *name); +extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char *schema, + const char *name); extern void ts_continuous_agg_drop_view_callback(ContinuousAgg *ca, const char *schema, const char *name); extern void ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id); -extern bool ts_continuous_agg_is_user_view(FormData_continuous_agg *data, const char *schema, - const char *name); -extern bool ts_continuous_agg_is_partial_view(FormData_continuous_agg *data, const char *schema, - const char *name); +extern TSDLLEXPORT bool ts_continuous_agg_is_user_view(FormData_continuous_agg *data, + const char *schema, const char *name); +extern TSDLLEXPORT bool ts_continuous_agg_is_partial_view(FormData_continuous_agg *data, + const char *schema, const char *name); extern void ts_continuous_agg_rename_schema_name(char *old_schema, char *new_schema); extern void ts_continuous_agg_rename_view(char *old_schema, char *name, char *new_schema, diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 703459c46..48ec5a29d 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -188,11 +188,17 @@ error_no_default_fn_pg_enterprise(PG_FUNCTION_ARGS) } static bool -process_cagg_viewstmt_default(ViewStmt *stmt, const char *query_string, void *pstmt) +process_cagg_viewstmt_default(ViewStmt *stmt, const char *query_string, void *pstmt, + WithClauseResult *with_clause_options) { return error_no_default_fn_bool_void_community(); } - +static void +continuous_agg_update_options_default(ContinuousAgg *cagg, WithClauseResult *with_clause_options) +{ + error_no_default_fn_community(); + pg_unreachable(); +} /* * Define cross-module functions' default values: * If the submodule isn't activated, using one of the cm functions will throw an @@ -230,6 +236,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .finalize_agg_ffunc = error_no_default_fn_pg_community, .process_cagg_viewstmt = process_cagg_viewstmt_default, .continuous_agg_trigfn = error_no_default_fn_pg_community, + .continuous_agg_update_options = continuous_agg_update_options_default, }; TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default; diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 14fe44ddf..c25722a16 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -20,6 +20,8 @@ #include "export.h" #include "bgw/job.h" #include "process_utility.h" +#include "with_clause_parser.h" +#include "continuous_agg.h" /* * To define a cross-module function add it to this struct, add a default @@ -60,8 +62,11 @@ typedef struct CrossModuleFunctions PGFunction partialize_agg; PGFunction finalize_agg_sfunc; PGFunction finalize_agg_ffunc; - bool (*process_cagg_viewstmt)(ViewStmt *stmt, const char *query_string, void *pstmt); + bool (*process_cagg_viewstmt)(ViewStmt *stmt, const char *query_string, void *pstmt, + WithClauseResult *with_clause_options); PGFunction continuous_agg_trigfn; + void (*continuous_agg_update_options)(ContinuousAgg *cagg, + WithClauseResult *with_clause_options); } CrossModuleFunctions; extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions; diff --git a/src/process_utility.c b/src/process_utility.c index 0a7d80205..2f557b48d 100644 --- a/src/process_utility.c +++ b/src/process_utility.c @@ -2205,6 +2205,75 @@ process_altertable_start_table(ProcessUtilityArgs *args) } static void +process_altercontinuousagg_set_with(ContinuousAgg *cagg, const List *defelems) +{ + WithClauseResult *parse_results; + List *pg_options = NIL, *cagg_options = NIL; + + ts_with_clause_filter(defelems, &cagg_options, &pg_options); + if (list_length(pg_options) > 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("only timescaledb parameters allowed in WITH clause for continuous " + "aggregate"))); + + if (list_length(cagg_options) > 0) + { + parse_results = ts_continuous_agg_with_clause_parse(cagg_options); + ts_cm_functions->continuous_agg_update_options(cagg, parse_results); + } +} + +static bool +process_altertable_start_view(ProcessUtilityArgs *args) +{ + AlterTableStmt *stmt = (AlterTableStmt *) args->parsetree; + Oid view_relid = AlterTableLookupRelation(stmt, NoLock); + NameData view_name; + NameData view_schema; + ContinuousAgg *cagg; + ListCell *lc; + + if (!OidIsValid(view_relid)) + return false; + + namestrcpy(&view_name, get_rel_name(view_relid)); + namestrcpy(&view_schema, get_namespace_name(get_rel_namespace(view_relid))); + cagg = ts_continuous_agg_find_by_view_name(NameStr(view_schema), NameStr(view_name)); + + if (cagg == NULL) + return false; + + if (ts_continuous_agg_is_partial_view(&cagg->data, NameStr(view_schema), NameStr(view_name))) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter the internal view of a continuous aggregate"))); + + foreach (lc, stmt->cmds) + { + AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lc); + + switch (cmd->subtype) + { + case AT_SetRelOptions: + if (!IsA(cmd->def, List)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("expected set options to contain a list"))); + process_altercontinuousagg_set_with(cagg, (List *) cmd->def); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter only SET options of a continuous " + "aggregate"))); + } + } + /* All commands processed by us, nothing for postgres to do.*/ + return true; +} + +static bool process_altertable_start(ProcessUtilityArgs *args) { AlterTableStmt *stmt = (AlterTableStmt *) args->parsetree; @@ -2213,9 +2282,11 @@ process_altertable_start(ProcessUtilityArgs *args) { case OBJECT_TABLE: process_altertable_start_table(args); - break; + return false; + case OBJECT_VIEW: + return process_altertable_start_view(args); default: - break; + return false; } } @@ -2540,18 +2611,9 @@ process_create_trigger_end(Node *parsetree) foreach_chunk_relation(stmt->relation, create_trigger_chunk, stmt); } -/* TODO need to decide on additional options for create view for continuous aggregate */ -typedef enum CaggViewOptions -{ - CaggViewState = 0 -} CaggViewOptions; - static bool process_viewstmt(ProcessUtilityArgs *args) { - WithClauseDefinition caggview_with_clauses[] = { - [CaggViewState] = { .arg_name = "continuous_agg", .type_id = TEXTOID, .default_val = 'f' } - }; WithClauseResult *parse_results; bool is_cagg = false; Node *parsetree = args->parsetree; @@ -2562,23 +2624,26 @@ process_viewstmt(ProcessUtilityArgs *args) ts_with_clause_filter(stmt->options, &cagg_options, &pg_options); if (cagg_options) { - parse_results = ts_with_clauses_parse(cagg_options, - caggview_with_clauses, - TS_ARRAY_LEN(caggview_with_clauses)); - if (parse_results[CaggViewState].is_default == false) - is_cagg = true; + parse_results = ts_continuous_agg_with_clause_parse(cagg_options); + is_cagg = DatumGetBool(parse_results[ContinuousEnabled].parsed); } + if (!is_cagg) return false; + if (pg_options != NIL) - elog(ERROR, - "only timescaledb namespace parameters allowed in WITH clause for continuous " - "aggregate"); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("only timescaledb parameters allowed in WITH clause for continuous " + "aggregate"))); #if !PG96 - return ts_cm_functions->process_cagg_viewstmt(stmt, args->query_string, args->pstmt); + return ts_cm_functions->process_cagg_viewstmt(stmt, + args->query_string, + args->pstmt, + parse_results); #else - return ts_cm_functions->process_cagg_viewstmt(stmt, args->query_string, NULL); + return ts_cm_functions->process_cagg_viewstmt(stmt, args->query_string, NULL, parse_results); #endif } @@ -2652,7 +2717,7 @@ process_ddl_command_start(ProcessUtilityArgs *args) handled = process_truncate(args); break; case T_AlterTableStmt: - process_altertable_start(args); + handled = process_altertable_start(args); break; case T_RenameStmt: process_rename(args); diff --git a/src/with_clause_parser.c b/src/with_clause_parser.c index 20e253c98..125127560 100644 --- a/src/with_clause_parser.c +++ b/src/with_clause_parser.c @@ -141,7 +141,7 @@ parse_arg(WithClauseDefinition arg, DefElem *def) PG_TRY(); { - val = OidInputFunctionCall(in_fn, value, typIOParam, 0); + val = OidInputFunctionCall(in_fn, value, typIOParam, -1); } PG_CATCH(); { diff --git a/src/with_clause_parser.h b/src/with_clause_parser.h index c14fd511d..b68a6193f 100644 --- a/src/with_clause_parser.h +++ b/src/with_clause_parser.h @@ -3,12 +3,16 @@ * Please see the included NOTICE for copyright information and * LICENSE-APACHE for a copy of the license. */ +#ifndef TIMESCALEDB_WITH_CLAUSE_PARSER_H +#define TIMESCALEDB_WITH_CLAUSE_PARSER_H #include #include #include #include +#include "compat.h" + typedef struct WithClauseDefinition { const char *arg_name; @@ -22,8 +26,9 @@ typedef struct WithClauseResult Datum parsed; } WithClauseResult; -extern void ts_with_clause_filter(const List *def_elems, List **within_namespace, - List **not_within_namespace); +extern TSDLLEXPORT void ts_with_clause_filter(const List *def_elems, List **within_namespace, + List **not_within_namespace); -extern WithClauseResult *ts_with_clauses_parse(const List *def_elems, - const WithClauseDefinition *args, Size nargs); +extern TSDLLEXPORT WithClauseResult * +ts_with_clauses_parse(const List *def_elems, const WithClauseDefinition *args, Size nargs); +#endif /* TIMESCALEDB_WITH_CLAUSE_PARSER_H */ diff --git a/tsl/src/continuous_aggs/CMakeLists.txt b/tsl/src/continuous_aggs/CMakeLists.txt index a272d9883..1a1c6fa54 100644 --- a/tsl/src/continuous_aggs/CMakeLists.txt +++ b/tsl/src/continuous_aggs/CMakeLists.txt @@ -3,5 +3,6 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/insert.c ${CMAKE_CURRENT_SOURCE_DIR}/job.c ${CMAKE_CURRENT_SOURCE_DIR}/materialize.c + ${CMAKE_CURRENT_SOURCE_DIR}/options.c ) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/continuous_aggs/cagg_create.c b/tsl/src/continuous_aggs/cagg_create.c index 010753cd2..45dc69703 100644 --- a/tsl/src/continuous_aggs/cagg_create.c +++ b/tsl/src/continuous_aggs/cagg_create.c @@ -41,6 +41,7 @@ #include #include #include +#include #include "cagg_create.h" @@ -53,6 +54,7 @@ #include "continuous_aggs/job.h" #include "dimension.h" #include "continuous_agg.h" +#include "options.h" #define FINALFN "finalize_agg" #define PARTIALFN "partialize_agg" @@ -144,10 +146,11 @@ typedef struct FinalizeQueryInfo typedef struct CAggTimebucketInfo { - int32 htid; /* hypertable id */ - Oid htoid; /* hypertable oid */ - AttrNumber htpartcolno; /*primary partitioning column */ - /* This should also be the column used by time_bucket */ + int32 htid; /* hypertable id */ + Oid htoid; /* hypertable oid */ + AttrNumber htpartcolno; /*primary partitioning column */ + /* This should also be the column used by time_bucket */ + Oid htpartcoltype; int64 htpartcol_interval_len; /* interval length setting for primary partitioning column */ int64 bucket_width; /*bucket_width of time_bucket */ Index sortref; /*sortref index of the group by clause for @@ -176,6 +179,7 @@ static Query *mattablecolumninfo_get_partial_select_query(MatTableColumnInfo *ma static void caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id, Oid hypertable_oid, AttrNumber hypertable_partition_colno, + Oid hypertable_partition_coltype, int64 hypertable_partition_col_interval); static void caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList); @@ -187,8 +191,8 @@ static Query *finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matco /* create a entry for the materialization table in table CONTINUOUS_AGGS */ static void create_cagg_catlog_entry(int32 matht_id, int32 rawht_id, char *user_schema, char *user_view, - char *partial_schema, char *partial_view, int64 bucket_width, int32 job_id, - Query *userquery_parse) + char *partial_schema, char *partial_view, int64 bucket_width, + int64 refresh_lag, int32 job_id, Query *userquery_parse) { Catalog *catalog = ts_catalog_get(); Relation rel; @@ -219,8 +223,7 @@ create_cagg_catlog_entry(int32 matht_id, int32 rawht_id, char *user_schema, char NameGetDatum(&partial_viewnm); values[AttrNumberGetAttrOffset(Anum_continuous_agg_bucket_width)] = bucket_width; values[AttrNumberGetAttrOffset(Anum_continuous_agg_job_id)] = job_id; - values[AttrNumberGetAttrOffset(Anum_continuous_agg_refresh_lag)] = - ts_continuous_agg_job_get_default_refresh_lag(bucket_width); + values[AttrNumberGetAttrOffset(Anum_continuous_agg_refresh_lag)] = refresh_lag; values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_query)] = CStringGetTextDatum(userview_query); @@ -449,12 +452,13 @@ get_timebucketfnoid() /* initialize caggtimebucket */ static void caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id, Oid hypertable_oid, - AttrNumber hypertable_partition_colno, + AttrNumber hypertable_partition_colno, Oid hypertable_partition_coltype, int64 hypertable_partition_col_interval) { src->htid = hypertable_id; src->htoid = hypertable_oid; src->htpartcolno = hypertable_partition_colno; + src->htpartcoltype = hypertable_partition_coltype; src->htpartcol_interval_len = hypertable_partition_col_interval; src->bucket_width = 0; /*invalid value */ } @@ -531,6 +535,14 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar } } +static int64 +get_refresh_lag(Oid column_type, int64 bucket_width, WithClauseResult *with_clause_options) +{ + if (with_clause_options[ContinuousViewOptionRefreshLag].is_default) + return bucket_width * 2; + return continuous_agg_parse_refresh_lag(column_type, with_clause_options); +} + static bool cagg_agg_validate(Node *node, void *context) { @@ -668,6 +680,7 @@ cagg_validate_query(Query *query) ht->fd.id, ht->main_table_relid, part_dimension->column_attno, + part_dimension->fd.column_type, part_dimension->fd.interval_length); } ts_cache_release(hcache); @@ -1352,7 +1365,8 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist, * panquery is the output of running parse_anlayze( ViewStmt->query) */ static void -cagg_create(ViewStmt *stmt, Query *panquery, CAggTimebucketInfo *origquery_ht) +cagg_create(ViewStmt *stmt, Query *panquery, CAggTimebucketInfo *origquery_ht, + WithClauseResult *with_clause_options) { ObjectAddress mataddress; char relnamebuf[NAMEDATALEN]; @@ -1368,6 +1382,11 @@ cagg_create(ViewStmt *stmt, Query *panquery, CAggTimebucketInfo *origquery_ht) int32 job_id; char trigarg[NAMEDATALEN]; int ret; + Interval *refresh_interval = + DatumGetIntervalP(with_clause_options[ContinuousViewOptionRefreshInterval].parsed); + int64 refresh_lag = get_refresh_lag(origquery_ht->htpartcoltype, + origquery_ht->bucket_width, + with_clause_options); mattablecolumninfo_init(&mattblinfo, NIL, NIL, panquery->groupClause); finalizequery_init(&finalqinfo, panquery, stmt->aliases, &mattblinfo); @@ -1408,7 +1427,8 @@ cagg_create(ViewStmt *stmt, Query *panquery, CAggTimebucketInfo *origquery_ht) Assert(mat_rel != NULL); /* Step 4a register the BGW job */ - job_id = ts_continuous_agg_job_add(origquery_ht->htid, origquery_ht->bucket_width); + job_id = + ts_continuous_agg_job_add(origquery_ht->htid, origquery_ht->bucket_width, refresh_interval); /* Step 4 add catalog table entry for the objects we just created */ nspid = RangeVarGetCreationNamespace(stmt->view); @@ -1419,6 +1439,7 @@ cagg_create(ViewStmt *stmt, Query *panquery, CAggTimebucketInfo *origquery_ht) part_rel->schemaname, part_rel->relname, origquery_ht->bucket_width, + refresh_lag, job_id, panquery); @@ -1438,7 +1459,8 @@ cagg_create(ViewStmt *stmt, Query *panquery, CAggTimebucketInfo *origquery_ht) * step 2: create underlying tables and views */ bool -tsl_process_continuous_agg_viewstmt(ViewStmt *stmt, const char *query_string, void *pstmt) +tsl_process_continuous_agg_viewstmt(ViewStmt *stmt, const char *query_string, void *pstmt, + WithClauseResult *with_clause_options) { Query *query = NULL; CAggTimebucketInfo timebucket_exprinfo; @@ -1468,6 +1490,6 @@ tsl_process_continuous_agg_viewstmt(ViewStmt *stmt, const char *query_string, vo return true; } timebucket_exprinfo = cagg_validate_query(query); - cagg_create(stmt, query, &timebucket_exprinfo); + cagg_create(stmt, query, &timebucket_exprinfo, with_clause_options); return true; } diff --git a/tsl/src/continuous_aggs/cagg_create.h b/tsl/src/continuous_aggs/cagg_create.h index fbee3b482..df60cd404 100644 --- a/tsl/src/continuous_aggs/cagg_create.h +++ b/tsl/src/continuous_aggs/cagg_create.h @@ -8,5 +8,8 @@ #include #include -bool tsl_process_continuous_agg_viewstmt(ViewStmt *stmt, const char *query_string, void *pstmt); +#include "with_clause_parser.h" + +bool tsl_process_continuous_agg_viewstmt(ViewStmt *stmt, const char *query_string, void *pstmt, + WithClauseResult *with_clause_options); #endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_CAGG_CREATE_H */ diff --git a/tsl/src/continuous_aggs/job.c b/tsl/src/continuous_aggs/job.c index 43c4bb4f9..214ab7587 100644 --- a/tsl/src/continuous_aggs/job.c +++ b/tsl/src/continuous_aggs/job.c @@ -85,7 +85,7 @@ continuous_agg_job_get_default_schedule_interval(int32 raw_table_id, int64 bucke } int32 -ts_continuous_agg_job_add(int32 raw_table_id, int64 bucket_width) +ts_continuous_agg_job_add(int32 raw_table_id, int64 bucket_width, Interval *refresh_interval) { NameData application_name; NameData job_type; @@ -94,14 +94,16 @@ ts_continuous_agg_job_add(int32 raw_table_id, int64 bucket_width) namestrcpy(&job_type, "continuous_aggregate"); namestrcpy(&application_name, "Continuous Aggregate Background Job"); - job_id = - ts_bgw_job_insert_relation(&application_name, - &job_type, - continuous_agg_job_get_default_schedule_interval(raw_table_id, - bucket_width), - DEFAULT_MAX_RUNTIME, - DEFAULT_MAX_RETRIES, - DEFAULT_RETRY_PERIOD); + if (refresh_interval == NULL) + refresh_interval = + continuous_agg_job_get_default_schedule_interval(raw_table_id, bucket_width); + + job_id = ts_bgw_job_insert_relation(&application_name, + &job_type, + refresh_interval, + DEFAULT_MAX_RUNTIME, + DEFAULT_MAX_RETRIES, + DEFAULT_RETRY_PERIOD); return job_id; } diff --git a/tsl/src/continuous_aggs/job.h b/tsl/src/continuous_aggs/job.h index a6d3970ce..13cdd060e 100644 --- a/tsl/src/continuous_aggs/job.h +++ b/tsl/src/continuous_aggs/job.h @@ -8,14 +8,10 @@ #include #include -int32 ts_continuous_agg_job_add(int32 raw_table_id, int64 bucket_width); +#include "bgw/job.h" + +int32 ts_continuous_agg_job_add(int32 raw_table_id, int64 bucket_width, Interval *refresh_interval); int32 ts_continuous_agg_job_find_materializtion_by_job_id(int32 job_id); -static inline int64 -ts_continuous_agg_job_get_default_refresh_lag(int64 bucket_width) -{ - return bucket_width * 2; -} - #endif diff --git a/tsl/src/continuous_aggs/options.c b/tsl/src/continuous_aggs/options.c new file mode 100644 index 000000000..371ee1c09 --- /dev/null +++ b/tsl/src/continuous_aggs/options.c @@ -0,0 +1,123 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include + +#include "options.h" +#include "continuous_agg.h" +#include "hypertable_cache.h" +#include "cache.h" +#include "scan_iterator.h" +#include "job.h" + +static inline int64 +parse_int_lag(const char *value, int64 min, int64 max) +{ + int64 result; + if (!scanint8(value, true, &result)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("parameter timescaledb.refresh_lag must be an integer for " + "hypertables with integer time values"))); + if (result < min || result > max) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("timescaledb.refresh_lag out of range"))); + return result; +} + +int64 +continuous_agg_parse_refresh_lag(Oid column_type, WithClauseResult *with_clause_options) +{ + char *value; + Datum interval; + Oid in_fn; + Oid typIOParam; + + Assert(!with_clause_options[ContinuousViewOptionRefreshLag].is_default); + + value = TextDatumGetCString(with_clause_options[ContinuousViewOptionRefreshLag].parsed); + + switch (column_type) + { + case INT2OID: + return parse_int_lag(value, PG_INT16_MIN, PG_INT16_MAX); + case INT4OID: + return parse_int_lag(value, PG_INT32_MIN, PG_INT32_MAX); + case INT8OID: + return parse_int_lag(value, PG_INT64_MIN, PG_INT64_MAX); + case TIMESTAMPTZOID: + case TIMESTAMPOID: + case DATEOID: + getTypeInputInfo(INTERVALOID, &in_fn, &typIOParam); + Assert(OidIsValid(in_fn)); + interval = OidInputFunctionCall(in_fn, value, typIOParam, -1); + return ts_interval_value_to_internal(interval, INTERVALOID); + default: + elog(ERROR, "unknown time type"); + } +} + +static void +update_refresh_lag(ContinuousAgg *agg, int64 new_lag) +{ + ScanIterator iterator = + ts_scan_iterator_create(CONTINUOUS_AGG, RowExclusiveLock, CurrentMemoryContext); + iterator.ctx.index = catalog_get_index(ts_catalog_get(), CONTINUOUS_AGG, CONTINUOUS_AGG_PKEY); + + ts_scan_iterator_scan_key_init(&iterator, + Anum_continuous_agg_pkey_mat_hypertable_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(agg->data.mat_hypertable_id)); + + ts_scanner_foreach(&iterator) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); + bool nulls[Natts_continuous_agg]; + Datum values[Natts_continuous_agg]; + bool repl[Natts_continuous_agg] = { false }; + HeapTuple new; + + heap_deform_tuple(ti->tuple, ti->desc, values, nulls); + + repl[AttrNumberGetAttrOffset(Anum_continuous_agg_refresh_lag)] = true; + values[AttrNumberGetAttrOffset(Anum_continuous_agg_refresh_lag)] = Int64GetDatum(new_lag); + + new = heap_modify_tuple(ti->tuple, ti->desc, values, nulls, repl); + + ts_catalog_update(ti->scanrel, new); + break; + } + ts_scan_iterator_close(&iterator); +} + +void +continuous_agg_update_options(ContinuousAgg *agg, WithClauseResult *with_clause_options) +{ + if (!with_clause_options[ContinuousEnabled].is_default) + elog(ERROR, "cannot disable continuous aggregates"); + + if (!with_clause_options[ContinuousViewOptionRefreshLag].is_default) + { + Cache *hcache = ts_hypertable_cache_pin(); + Hypertable *ht = ts_hypertable_cache_get_entry_by_id(hcache, agg->data.raw_hypertable_id); + Dimension *time_dimension = hyperspace_get_open_dimension(ht->space, 0); + int64 lag = + continuous_agg_parse_refresh_lag(time_dimension->fd.column_type, with_clause_options); + update_refresh_lag(agg, lag); + ts_cache_release(hcache); + } + + if (!with_clause_options[ContinuousViewOptionRefreshInterval].is_default) + { + BgwJob *job = ts_bgw_job_find(agg->data.job_id, CurrentMemoryContext, true); + job->fd.schedule_interval = + *DatumGetIntervalP(with_clause_options[ContinuousViewOptionRefreshInterval].parsed); + ts_bgw_job_update_by_id(agg->data.job_id, job); + } +} diff --git a/tsl/src/continuous_aggs/options.h b/tsl/src/continuous_aggs/options.h new file mode 100644 index 000000000..06cf53031 --- /dev/null +++ b/tsl/src/continuous_aggs/options.h @@ -0,0 +1,20 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_OPTIONS_H +#define TIMESCALEDB_TSL_CONTINUOUS_AGGS_OPTIONS_H +#include +#include + +#include "with_clause_parser.h" +#include "continuous_agg.h" + +extern int64 continuous_agg_parse_refresh_lag(Oid column_type, + WithClauseResult *with_clause_options); + +extern void continuous_agg_update_options(ContinuousAgg *cagg, + WithClauseResult *with_clause_options); + +#endif diff --git a/tsl/src/init.c b/tsl/src/init.c index 69a7d3e03..fc79a4a13 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -23,6 +23,8 @@ #include "continuous_aggs/cagg_create.h" #include "continuous_aggs/insert.h" #include "continuous_aggs/materialize.h" +#include "continuous_aggs/options.h" +#include "process_utility.h" #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; @@ -77,6 +79,7 @@ CrossModuleFunctions tsl_cm_functions = { .finalize_agg_ffunc = tsl_finalize_agg_ffunc, .process_cagg_viewstmt = tsl_process_continuous_agg_viewstmt, .continuous_agg_trigfn = continuous_agg_trigfn, + .continuous_agg_update_options = continuous_agg_update_options, }; TS_FUNCTION_INFO_V1(ts_module_init); diff --git a/tsl/test/expected/bgw_continuous_aggs.out b/tsl/test/expected/bgw_continuous_aggs.out index 58a758399..e028fea52 100644 --- a/tsl/test/expected/bgw_continuous_aggs.out +++ b/tsl/test/expected/bgw_continuous_aggs.out @@ -79,7 +79,7 @@ NOTICE: adding not-null constraint to column "time" (1 row) CREATE VIEW test_continuous_agg_view - WITH ( timescaledb.continuous_agg = 'start') + WITH ( timescaledb.continuous) AS SELECT time_bucket('2', time), SUM(data) as value FROM test_continuous_agg_table GROUP BY 1; diff --git a/tsl/test/expected/contaggviews.out b/tsl/test/expected/contaggviews.out index e55174733..f27f2af8c 100644 --- a/tsl/test/expected/contaggviews.out +++ b/tsl/test/expected/contaggviews.out @@ -43,7 +43,7 @@ insert into foo values( 2 , 14 , 20); insert into foo values( 2 , 15 , 20); insert into foo values( 2 , 16 , 20); create or replace view mat_m1( a, countb ) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select a, count(b) from foo @@ -108,7 +108,7 @@ insert into conditions values ( '2010-01-02 09:00:00-08', 'NYC', 65, 45); insert into conditions values ( '2018-11-01 09:00:00-08', 'NYC', 45, 35); insert into conditions values ( '2018-11-02 09:00:00-08', 'NYC', 35, 15); create or replace view mat_m1( timec, minl, sumt , sumh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) from conditions @@ -148,7 +148,6 @@ order by 1; -- TEST3 -- -- drop on table conditions should cascade to materialized mat_v1 --- Oid used in timescaledb_catalog table from catalog table -- TODO drop table conditions cascade; NOTICE: drop cascades to view _timescaledb_internal.ts_internal_mat_m1view NOTICE: drop cascades to 2 other objects @@ -174,7 +173,7 @@ insert into conditions values ( '2018-11-01 09:00:00-08', 'NYC', 45, 35); insert into conditions values ( '2018-11-02 09:00:00-08', 'NYC', 35, 15); insert into conditions values ( '2018-11-03 09:00:00-08', 'NYC', 35, 25); create or replace view mat_m1( timec, minl, sumth, stddevh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1week', timec) , min(location), sum(temperature)+sum(humidity), stddev(humidity) @@ -216,12 +215,11 @@ order by time_bucket('1week', timec); --materialized view with group by clause + expression in SELECT -- use previous data from conditions --drop only the view. --- TODO catalog entry should get deleted? -- apply where clause on result of mat_m1 -- drop view mat_m1 cascade; NOTICE: drop cascades to 2 other objects create or replace view mat_m1( timec, minl, sumth, stddevh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1week', timec) , min(location), sum(temperature)+sum(humidity), stddev(humidity) @@ -267,7 +265,7 @@ order by time_bucket('1week', timec); drop view mat_m1 cascade; NOTICE: drop cascades to 2 other objects create or replace view mat_m1( timec, minl, sumth, stddevh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1week', timec) , min(location), sum(temperature)+sum(humidity), stddev(humidity) @@ -333,7 +331,7 @@ insert into conditions select generate_series('2018-11-01 00:00'::timestamp, '2018-12-15 00:00'::timestamp, '1 day'), 'LA', 73, 55, 71, 28; --drop view mat_m1 cascade; create or replace view mat_m1( timec, minl, sumth, stddevh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1week', timec) , min(location), sum(temperature)+sum(humidity), stddev(humidity) @@ -548,7 +546,7 @@ select table_name from create_hypertable( 'conditions', 'timec'); --no data in hyper table on purpose so that CASCADE is not required because of chunks create or replace view mat_drop_test( timec, minl, sumt , sumh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) from conditions @@ -652,3 +650,95 @@ select count(*) from pg_class where relname = 'mat_drop_test'; 0 (1 row) +--TEST With options +CREATE TABLE conditions ( + timec TIMESTAMPTZ NOT NULL, + location TEXT NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL, + lowp double precision NULL, + highp double precision null, + allnull double precision null + ); +select table_name from create_hypertable( 'conditions', 'timec'); + table_name +------------ + conditions +(1 row) + +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '5 hours', timescaledb.refresh_interval = '1h') +as +select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket('1day', timec); +NOTICE: adding not-null constraint to column "time_partition_col" +SELECT schedule_interval FROM _timescaledb_config.bgw_job; + schedule_interval +------------------- + @ 1 hour +(1 row) + +SELECT _timescaledb_internal.to_interval(refresh_lag) FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'mat_with_test'; + to_interval +------------- + @ 5 hours +(1 row) + +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '6 h', timescaledb.refresh_interval = '2h'); +SELECT _timescaledb_internal.to_interval(refresh_lag) FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'mat_with_test'; + to_interval +------------- + @ 6 hours +(1 row) + +SELECT schedule_interval FROM _timescaledb_config.bgw_job; + schedule_interval +------------------- + @ 2 hours +(1 row) + +DROP TABLE conditions CASCADE; +NOTICE: drop cascades to view _timescaledb_internal.ts_internal_mat_with_testview +--test WITH using a hypertable with an integer time dimension +CREATE TABLE conditions ( + timec INT NOT NULL, + location TEXT NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL, + lowp double precision NULL, + highp double precision null, + allnull double precision null + ); +select table_name from create_hypertable( 'conditions', 'timec', chunk_time_interval=> 100); + table_name +------------ + conditions +(1 row) + +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '500', timescaledb.refresh_interval = '2h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); +NOTICE: adding not-null constraint to column "time_partition_col" +SELECT schedule_interval FROM _timescaledb_config.bgw_job; + schedule_interval +------------------- + @ 2 hours +(1 row) + +SELECT refresh_lag FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'mat_with_test'; + refresh_lag +------------- + 500 +(1 row) + +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '100'); +SELECT refresh_lag FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'mat_with_test'; + refresh_lag +------------- + 100 +(1 row) + diff --git a/tsl/test/expected/contaggviews_ddl-10.out b/tsl/test/expected/contaggviews_ddl-10.out index f40d1f7eb..1283eda27 100644 --- a/tsl/test/expected/contaggviews_ddl-10.out +++ b/tsl/test/expected/contaggviews_ddl-10.out @@ -19,7 +19,7 @@ select table_name from create_hypertable('conditions', 'timec'); -- check that GRANTS work correctly \c :TEST_DBNAME :ROLE_SUPERUSER -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select sum( temperature ), min(location) from conditions @@ -49,7 +49,7 @@ NOTICE: adding not-null constraint to column "time" (3,public,foo,t) (1 row) -CREATE VIEW rename_test WITH ( timescaledb.continuous_agg = 'start') +CREATE VIEW rename_test WITH ( timescaledb.continuous) AS SELECT time_bucket('1week', time), COUNT(data) FROM foo GROUP BY 1; diff --git a/tsl/test/expected/contaggviews_ddl-11.out b/tsl/test/expected/contaggviews_ddl-11.out index 219b478e7..0202435e6 100644 --- a/tsl/test/expected/contaggviews_ddl-11.out +++ b/tsl/test/expected/contaggviews_ddl-11.out @@ -19,7 +19,7 @@ select table_name from create_hypertable('conditions', 'timec'); -- check that GRANTS work correctly \c :TEST_DBNAME :ROLE_SUPERUSER -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select sum( temperature ), min(location) from conditions @@ -49,7 +49,7 @@ NOTICE: adding not-null constraint to column "time" (3,public,foo,t) (1 row) -CREATE VIEW rename_test WITH ( timescaledb.continuous_agg = 'start') +CREATE VIEW rename_test WITH ( timescaledb.continuous) AS SELECT time_bucket('1week', time), COUNT(data) FROM foo GROUP BY 1; diff --git a/tsl/test/expected/contaggviews_ddl-9.6.out b/tsl/test/expected/contaggviews_ddl-9.6.out index f40d1f7eb..1283eda27 100644 --- a/tsl/test/expected/contaggviews_ddl-9.6.out +++ b/tsl/test/expected/contaggviews_ddl-9.6.out @@ -19,7 +19,7 @@ select table_name from create_hypertable('conditions', 'timec'); -- check that GRANTS work correctly \c :TEST_DBNAME :ROLE_SUPERUSER -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select sum( temperature ), min(location) from conditions @@ -49,7 +49,7 @@ NOTICE: adding not-null constraint to column "time" (3,public,foo,t) (1 row) -CREATE VIEW rename_test WITH ( timescaledb.continuous_agg = 'start') +CREATE VIEW rename_test WITH ( timescaledb.continuous) AS SELECT time_bucket('1week', time), COUNT(data) FROM foo GROUP BY 1; diff --git a/tsl/test/expected/contaggviews_neg.out b/tsl/test/expected/contaggviews_neg.out index f480bb936..c845edb78 100644 --- a/tsl/test/expected/contaggviews_neg.out +++ b/tsl/test/expected/contaggviews_neg.out @@ -10,7 +10,7 @@ CREATE TABLE conditions ( temperature integer NULL, humidity DOUBLE PRECISION NULL, timemeasure TIMESTAMPTZ, - timeinterval INTERVAL + timeinterval INTERVAL ); select table_name from create_hypertable( 'conditions', 'timec'); table_name @@ -18,76 +18,76 @@ select table_name from create_hypertable( 'conditions', 'timec'); conditions (1 row) -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start', timescaledb.myfill = 1) +create view mat_m1 WITH ( timescaledb.continuous, timescaledb.myfill = 1) as select location , min(temperature) -from conditions +from conditions group by time_bucket('1d', timec), location; ERROR: unrecognized parameter "timescaledb.myfill" --valid PG option -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start', check_option = LOCAL ) +create view mat_m1 WITH ( timescaledb.continuous, check_option = LOCAL ) as select * from conditions , matt1; -ERROR: only timescaledb namespace parameters allowed in WITH clause for continuous aggregate +ERROR: only timescaledb parameters allowed in WITH clause for continuous aggregate -- join multiple tables -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select location, count(*) from conditions , mat_t1 where conditions.location = mat_t1.c group by location; ERROR: only 1 hypertable is permitted in SELECT query permitted for continuous aggregate -- LATERAL multiple tables -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select location, count(*) from conditions, LATERAL (Select * from mat_t1 where c = conditions.location) q group by location; ERROR: only 1 hypertable is permitted in SELECT query permitted for continuous aggregate --non-hypertable -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select a, count(*) from mat_t1 group by a; ERROR: can create continuous aggregate only on hypertables -- no group by -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select count(*) from conditions ; ERROR: SELECT query for continuous aggregate should have at least 1 aggregate function and a GROUP BY clause with time_bucket -- no time_bucket in group by -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select count(*) from conditions group by location; ERROR: time_bucket function missing from GROUP BY clause for continuous aggregate query -- with valid query in a CTE -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS with m1 as ( Select location, count(*) from conditions - group by time_bucket('1week', timec) , location) + group by time_bucket('1week', timec) , location) select * from m1; ERROR: invalid SELECT query for continuous aggregate --with DISTINCT ON -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select distinct on ( location ) count(*) from conditions group by location, time_bucket('1week', timec) ; ERROR: invalid SELECT query for continuous aggregate --aggregate with DISTINCT -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select time_bucket('1week', timec), count(location) , sum(distinct temperature) from conditions group by time_bucket('1week', timec) , location; ERROR: aggregates with FILTER / DISTINCT / ORDER BY are not supported for continuous aggregate query --aggregate with FILTER -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select time_bucket('1week', timec), sum(temperature) filter ( where humidity > 20 ) from conditions group by time_bucket('1week', timec) , location; ERROR: aggregates with FILTER / DISTINCT / ORDER BY are not supported for continuous aggregate query -- aggregate with filter in having clause -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select time_bucket('1week', timec), max(temperature) from conditions @@ -95,65 +95,65 @@ from conditions having sum(temperature) filter ( where humidity > 20 ) > 50; ERROR: aggregates with FILTER / DISTINCT / ORDER BY are not supported for continuous aggregate query -- time_bucket on non partitioning column of hypertable -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket('1week', timemeasure) , location; ERROR: time_bucket function for continuous aggregate query should be called on the dimension column of the hypertable --time_bucket on expression -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket('1week', timec+ '10 minutes'::interval) , location; ERROR: time_bucket function for continuous aggregate query should be called on the dimension column of the hypertable --multiple time_bucket functions -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket('1week', timec) , time_bucket('1month', timec), location; ERROR: multiple time_bucket functions not permitted in continuous aggregate query --time_bucket using additional args -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket( INTERVAL '5 minutes', timec, INTERVAL '-2.5 minutes') , location; ERROR: time_bucket function for continuous aggregate query cannot use optional arguments --time_bucket using non-const for first argument -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket( timeinterval, timec) , location; ERROR: first argument to time_bucket function should be a constant for continuous aggregate query -- ordered set aggr -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select mode() within group( order by humidity) from conditions group by time_bucket('1week', timec) ; ERROR: aggregates with FILTER / DISTINCT / ORDER BY are not supported for continuous aggregate query --window function -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select avg(temperature) over( order by humidity) from conditions ; ERROR: invalid SELECT query for continuous aggregate --aggregate without combine function -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select json_agg(location) +Select json_agg(location) from conditions group by time_bucket('1week', timec) , location; ERROR: aggregates which are not parallelizable are not supported by continuous aggregate query ; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature), array_agg(location) +Select sum(humidity), avg(temperature), array_agg(location) from conditions group by time_bucket('1week', timec) , location; ERROR: aggregates which are not parallelizable are not supported by continuous aggregate query @@ -164,23 +164,23 @@ CREATE AGGREGATE newavg ( finalfunc = int8_avg, initcond1 = '{0,0}' ); -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), newavg(temperature::int4) +Select sum(humidity), newavg(temperature::int4) from conditions group by time_bucket('1week', timec) , location; ERROR: aggregates which are not parallelizable are not supported by continuous aggregate query ; -- using subqueries -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from -( select humidity, temperature, location, timec +( select humidity, temperature, location, timec from conditions ) q group by time_bucket('1week', timec) , location ; ERROR: invalid SELECT query for continuous aggregate -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS select * from ( Select sum(humidity), avg(temperature::int4) @@ -188,125 +188,125 @@ from conditions group by time_bucket('1week', timec) , location ) q; ERROR: SELECT query for continuous aggregate should have at least 1 aggregate function and a GROUP BY clause with time_bucket --using limit /limit offset -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location limit 10 ; ERROR: invalid SELECT query for continuous aggregate -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location offset 10; ERROR: invalid SELECT query for continuous aggregate --using ORDER BY in view defintion -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location ORDER BY 1; ERROR: invalid SELECT query for continuous aggregate --using FETCH -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location fetch first 10 rows only; ERROR: invalid SELECT query for continuous aggregate --using locking clauses FOR clause ---all should be disabled. we cannot guarntee locks on the hypertable -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +--all should be disabled. we cannot guarntee locks on the hypertable +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location FOR KEY SHARE; ERROR: FOR KEY SHARE is not allowed with GROUP BY clause -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location FOR SHARE; ERROR: FOR SHARE is not allowed with GROUP BY clause -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location FOR UPDATE; ERROR: FOR UPDATE is not allowed with GROUP BY clause -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location FOR NO KEY UPDATE; ERROR: FOR NO KEY UPDATE is not allowed with GROUP BY clause --tablesample clause -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions tablesample bernoulli(0.2) group by time_bucket('1week', timec) , location ; ERROR: invalid SELECT query for continuous aggregate -- ONLY in from clause -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from ONLY conditions group by time_bucket('1week', timec) , location ; ERROR: invalid SELECT query for continuous aggregate --grouping sets and variants -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by grouping sets(time_bucket('1week', timec) , location ) ; ERROR: invalid SELECT query for continuous aggregate -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by rollup(time_bucket('1week', timec) , location ) ; ERROR: invalid SELECT query for continuous aggregate --NO immutable functions -- check all clauses CREATE FUNCTION test_stablefunc(int) RETURNS int LANGUAGE 'sql' STABLE AS 'SELECT $1 + 10'; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select sum(humidity), max(timec + INTERVAL '1h') from conditions group by time_bucket('1week', timec) , location ; ERROR: only immutable functions are supported for continuous aggregate query -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), min(location) +Select sum(humidity), min(location) from conditions -group by time_bucket('1week', timec) +group by time_bucket('1week', timec) having max(timec + INTERVAL '1h') > '2010-01-01 09:00:00-08'; ERROR: only immutable functions are supported for continuous aggregate query -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum( test_stablefunc(humidity::int) ), min(location) +Select sum( test_stablefunc(humidity::int) ), min(location) from conditions group by time_bucket('1week', timec); ERROR: only immutable functions are supported for continuous aggregate query -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum( temperature ), min(location) +Select sum( temperature ), min(location) from conditions group by time_bucket('1week', timec), test_stablefunc(humidity::int); ERROR: only immutable functions are supported for continuous aggregate query -- row security on table create table rowsec_tab( a bigint, b integer, c integer); -select table_name from create_hypertable( 'rowsec_tab', 'a', chunk_time_interval=>10); +select table_name from create_hypertable( 'rowsec_tab', 'a', chunk_time_interval=>10); NOTICE: adding not-null constraint to column "a" table_name ------------ @@ -315,9 +315,133 @@ NOTICE: adding not-null constraint to column "a" alter table rowsec_tab ENABLE ROW LEVEL SECURITY; create policy rowsec_tab_allview ON rowsec_tab FOR SELECT USING(true); -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select sum( b), min(c) from rowsec_tab group by time_bucket('1', a); ERROR: continuous aggregate query cannot be created on table with row security +drop table conditions cascade; +--negative tests for WITH options +CREATE TABLE conditions ( + timec TIMESTAMPTZ NOT NULL, + location TEXT NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL, + lowp double precision NULL, + highp double precision null, + allnull double precision null + ); +select table_name from create_hypertable( 'conditions', 'timec'); + table_name +------------ + conditions +(1 row) + +\set ON_ERROR_STOP 0 +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '5 joules', timescaledb.refresh_interval = '1h') +as +select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket('1day', timec); +ERROR: invalid input syntax for type interval: "5 joules" +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '5h', timescaledb.refresh_interval = '1 joule') +as +select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket('1day', timec); +ERROR: invalid value for timescaledb.refresh_interval '1 joule' +\set ON_ERROR_STOP 1 +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '5 hours', timescaledb.refresh_interval = '1h') +as +select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket('1day', timec); +NOTICE: adding not-null constraint to column "time_partition_col" +SELECT h.schema_name AS "MAT_SCHEMA_NAME", + h.table_name AS "MAT_TABLE_NAME", + partial_view_name as "PART_VIEW_NAME", + partial_view_schema as "PART_VIEW_SCHEMA" +FROM _timescaledb_catalog.continuous_agg ca +INNER JOIN _timescaledb_catalog.hypertable h ON(h.id = ca.mat_hypertable_id) +WHERE user_view_name = 'mat_with_test' +\gset +\set ON_ERROR_STOP 0 +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '1 joule'); +ERROR: invalid input syntax for type interval: "1 joule" +ALTER VIEW mat_with_test RESET(timescaledb.refresh_lag); +ERROR: cannot alter only SET options of a continuous aggregate +ALTER VIEW mat_with_test ALTER timec DROP default; +ERROR: cannot alter only SET options of a continuous aggregate +ALTER VIEW :"PART_VIEW_SCHEMA".:"PART_VIEW_NAME" SET(timescaledb.refresh_lag = '1 hour'); +ERROR: cannot alter the internal view of a continuous aggregate +\set ON_ERROR_STOP 1 +DROP TABLE conditions CASCADE; +NOTICE: drop cascades to view _timescaledb_internal.ts_internal_mat_with_testview +--test WITH using a hypertable with an integer time dimension +CREATE TABLE conditions ( + timec SMALLINT NOT NULL, + location TEXT NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL, + lowp double precision NULL, + highp double precision null, + allnull double precision null + ); +select table_name from create_hypertable( 'conditions', 'timec', chunk_time_interval=> 100); + table_name +------------ + conditions +(1 row) + +\set ON_ERROR_STOP 0 +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '1 hour', timescaledb.refresh_interval = '1h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); +ERROR: time_bucket function for continuous aggregate query should be called on the dimension column of the hypertable +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '32768', timescaledb.refresh_interval = '1h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); +ERROR: time_bucket function for continuous aggregate query should be called on the dimension column of the hypertable +ALTER TABLE conditions ALTER timec type int; +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '2147483648', timescaledb.refresh_interval = '1h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); +ERROR: timescaledb.refresh_lag out of range +\set ON_ERROR_STOP 1 +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '2147483647', timescaledb.refresh_interval = '2h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); +NOTICE: adding not-null constraint to column "time_partition_col" +\set ON_ERROR_STOP 0 +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '1h'); +ERROR: parameter timescaledb.refresh_lag must be an integer for hypertables with integer time values +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '2147483648'); +ERROR: timescaledb.refresh_lag out of range +ALTER TABLE conditions ALTER timec type bigint; +ERROR: cannot alter type of a column used by a view or rule +\set ON_ERROR_STOP +drop view mat_with_test CASCADE; +ALTER TABLE conditions ALTER timec type bigint; +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '2147483647', timescaledb.refresh_interval = '2h') +as +select time_bucket(BIGINT '100', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by 1; +NOTICE: adding not-null constraint to column "time_partition_col" diff --git a/tsl/test/expected/continuous_aggs_materialize.out b/tsl/test/expected/continuous_aggs_materialize.out index 0022d3566..01caf38c6 100644 --- a/tsl/test/expected/continuous_aggs_materialize.out +++ b/tsl/test/expected/continuous_aggs_materialize.out @@ -608,7 +608,7 @@ SELECT * FROM continuous_agg_test_t; (4 rows) CREATE VIEW test_t_mat_view - WITH ( timescaledb.continuous_agg = 'start') + WITH ( timescaledb.continuous) AS SELECT time_bucket('2 hours', time), COUNT(data) as value FROM continuous_agg_test_t GROUP BY 1; @@ -820,7 +820,7 @@ NOTICE: adding not-null constraint to column "time" -- TODO we should be able to use time_bucket(5, ...) (note lack of '), but that is currently not -- recognized as a constant CREATE VIEW extreme_view - WITH ( timescaledb.continuous_agg = 'start') + WITH ( timescaledb.continuous) AS SELECT time_bucket('1', time), SUM(data) as value FROM continuous_agg_extreme GROUP BY 1; diff --git a/tsl/test/sql/bgw_continuous_aggs.sql b/tsl/test/sql/bgw_continuous_aggs.sql index 0a420dd7f..49115f405 100644 --- a/tsl/test/sql/bgw_continuous_aggs.sql +++ b/tsl/test/sql/bgw_continuous_aggs.sql @@ -62,7 +62,7 @@ SELECT * FROM _timescaledb_catalog.continuous_agg; CREATE TABLE test_continuous_agg_table(time int, data int); SELECT create_hypertable('test_continuous_agg_table', 'time', chunk_time_interval => 10); CREATE VIEW test_continuous_agg_view - WITH ( timescaledb.continuous_agg = 'start') + WITH ( timescaledb.continuous) AS SELECT time_bucket('2', time), SUM(data) as value FROM test_continuous_agg_table GROUP BY 1; diff --git a/tsl/test/sql/contaggviews.sql b/tsl/test/sql/contaggviews.sql index 36342954d..bf0014282 100644 --- a/tsl/test/sql/contaggviews.sql +++ b/tsl/test/sql/contaggviews.sql @@ -37,7 +37,7 @@ insert into foo values( 2 , 15 , 20); insert into foo values( 2 , 16 , 20); create or replace view mat_m1( a, countb ) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select a, count(b) from foo @@ -85,7 +85,7 @@ insert into conditions values ( '2018-11-02 09:00:00-08', 'NYC', 35, 15); create or replace view mat_m1( timec, minl, sumt , sumh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) from conditions @@ -112,7 +112,6 @@ order by 1; -- TEST3 -- -- drop on table conditions should cascade to materialized mat_v1 --- Oid used in timescaledb_catalog table from catalog table -- TODO drop table conditions cascade; @@ -137,7 +136,7 @@ insert into conditions values ( '2018-11-03 09:00:00-08', 'NYC', 35, 25); create or replace view mat_m1( timec, minl, sumth, stddevh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1week', timec) , min(location), sum(temperature)+sum(humidity), stddev(humidity) @@ -168,12 +167,11 @@ order by time_bucket('1week', timec); --materialized view with group by clause + expression in SELECT -- use previous data from conditions --drop only the view. --- TODO catalog entry should get deleted? -- apply where clause on result of mat_m1 -- drop view mat_m1 cascade; create or replace view mat_m1( timec, minl, sumth, stddevh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1week', timec) , min(location), sum(temperature)+sum(humidity), stddev(humidity) @@ -209,7 +207,7 @@ order by time_bucket('1week', timec); ---------test with having clause ---------------------- drop view mat_m1 cascade; create or replace view mat_m1( timec, minl, sumth, stddevh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1week', timec) , min(location), sum(temperature)+sum(humidity), stddev(humidity) @@ -264,7 +262,7 @@ select generate_series('2018-11-01 00:00'::timestamp, '2018-12-15 00:00'::timest --drop view mat_m1 cascade; create or replace view mat_m1( timec, minl, sumth, stddevh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1week', timec) , min(location), sum(temperature)+sum(humidity), stddev(humidity) @@ -416,7 +414,7 @@ select table_name from create_hypertable( 'conditions', 'timec'); --no data in hyper table on purpose so that CASCADE is not required because of chunks create or replace view mat_drop_test( timec, minl, sumt , sumh) -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) from conditions @@ -472,3 +470,59 @@ SELECT * FROM _timescaledb_config.bgw_job; select count(*) from pg_class where relname = :'PART_VIEW_NAME'; select count(*) from pg_class where relname = :'MAT_TABLE_NAME'; select count(*) from pg_class where relname = 'mat_drop_test'; + +--TEST With options + +CREATE TABLE conditions ( + timec TIMESTAMPTZ NOT NULL, + location TEXT NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL, + lowp double precision NULL, + highp double precision null, + allnull double precision null + ); + +select table_name from create_hypertable( 'conditions', 'timec'); + +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '5 hours', timescaledb.refresh_interval = '1h') +as +select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket('1day', timec); + +SELECT schedule_interval FROM _timescaledb_config.bgw_job; +SELECT _timescaledb_internal.to_interval(refresh_lag) FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'mat_with_test'; + +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '6 h', timescaledb.refresh_interval = '2h'); +SELECT _timescaledb_internal.to_interval(refresh_lag) FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'mat_with_test'; +SELECT schedule_interval FROM _timescaledb_config.bgw_job; + +DROP TABLE conditions CASCADE; + +--test WITH using a hypertable with an integer time dimension +CREATE TABLE conditions ( + timec INT NOT NULL, + location TEXT NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL, + lowp double precision NULL, + highp double precision null, + allnull double precision null + ); + +select table_name from create_hypertable( 'conditions', 'timec', chunk_time_interval=> 100); + +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '500', timescaledb.refresh_interval = '2h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); + +SELECT schedule_interval FROM _timescaledb_config.bgw_job; +SELECT refresh_lag FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'mat_with_test'; + +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '100'); +SELECT refresh_lag FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'mat_with_test'; diff --git a/tsl/test/sql/contaggviews_ddl.sql.in b/tsl/test/sql/contaggviews_ddl.sql.in index c72c929f9..b6f4df2bb 100644 --- a/tsl/test/sql/contaggviews_ddl.sql.in +++ b/tsl/test/sql/contaggviews_ddl.sql.in @@ -19,7 +19,7 @@ select table_name from create_hypertable('conditions', 'timec'); -- check that GRANTS work correctly \c :TEST_DBNAME :ROLE_SUPERUSER -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select sum( temperature ), min(location) from conditions @@ -43,7 +43,7 @@ GRANT ALL ON SCHEMA rename_schema TO :ROLE_DEFAULT_PERM_USER; CREATE TABLE foo(time TIMESTAMPTZ, data INTEGER); SELECT create_hypertable('foo', 'time'); -CREATE VIEW rename_test WITH ( timescaledb.continuous_agg = 'start') +CREATE VIEW rename_test WITH ( timescaledb.continuous) AS SELECT time_bucket('1week', time), COUNT(data) FROM foo GROUP BY 1; diff --git a/tsl/test/sql/contaggviews_neg.sql b/tsl/test/sql/contaggviews_neg.sql index faeac65af..8f62daae2 100644 --- a/tsl/test/sql/contaggviews_neg.sql +++ b/tsl/test/sql/contaggviews_neg.sql @@ -13,30 +13,30 @@ CREATE TABLE conditions ( temperature integer NULL, humidity DOUBLE PRECISION NULL, timemeasure TIMESTAMPTZ, - timeinterval INTERVAL + timeinterval INTERVAL ); select table_name from create_hypertable( 'conditions', 'timec'); -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start', timescaledb.myfill = 1) +create view mat_m1 WITH ( timescaledb.continuous, timescaledb.myfill = 1) as select location , min(temperature) -from conditions +from conditions group by time_bucket('1d', timec), location; --valid PG option -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start', check_option = LOCAL ) +create view mat_m1 WITH ( timescaledb.continuous, check_option = LOCAL ) as select * from conditions , matt1; -- join multiple tables -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select location, count(*) from conditions , mat_t1 where conditions.location = mat_t1.c group by location; -- LATERAL multiple tables -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select location, count(*) from conditions, LATERAL (Select * from mat_t1 where c = conditions.location) q @@ -44,51 +44,51 @@ group by location; --non-hypertable -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select a, count(*) from mat_t1 group by a; -- no group by -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select count(*) from conditions ; -- no time_bucket in group by -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select count(*) from conditions group by location; -- with valid query in a CTE -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS with m1 as ( Select location, count(*) from conditions - group by time_bucket('1week', timec) , location) + group by time_bucket('1week', timec) , location) select * from m1; --with DISTINCT ON -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) as select distinct on ( location ) count(*) from conditions group by location, time_bucket('1week', timec) ; --aggregate with DISTINCT -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select time_bucket('1week', timec), count(location) , sum(distinct temperature) from conditions group by time_bucket('1week', timec) , location; --aggregate with FILTER -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select time_bucket('1week', timec), sum(temperature) filter ( where humidity > 20 ) from conditions group by time_bucket('1week', timec) , location; -- aggregate with filter in having clause -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select time_bucket('1week', timec), max(temperature) from conditions @@ -96,65 +96,65 @@ from conditions having sum(temperature) filter ( where humidity > 20 ) > 50; -- time_bucket on non partitioning column of hypertable -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket('1week', timemeasure) , location; --time_bucket on expression -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket('1week', timec+ '10 minutes'::interval) , location; --multiple time_bucket functions -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket('1week', timec) , time_bucket('1month', timec), location; --time_bucket using additional args -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket( INTERVAL '5 minutes', timec, INTERVAL '-2.5 minutes') , location; --time_bucket using non-const for first argument -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select max(temperature) from conditions group by time_bucket( timeinterval, timec) , location; -- ordered set aggr -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select mode() within group( order by humidity) from conditions group by time_bucket('1week', timec) ; --window function -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select avg(temperature) over( order by humidity) from conditions ; --aggregate without combine function -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select json_agg(location) +Select json_agg(location) from conditions group by time_bucket('1week', timec) , location; ; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature), array_agg(location) +Select sum(humidity), avg(temperature), array_agg(location) from conditions group by time_bucket('1week', timec) , location; ; @@ -165,23 +165,23 @@ CREATE AGGREGATE newavg ( finalfunc = int8_avg, initcond1 = '{0,0}' ); -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), newavg(temperature::int4) +Select sum(humidity), newavg(temperature::int4) from conditions group by time_bucket('1week', timec) , location; ; -- using subqueries -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from -( select humidity, temperature, location, timec +( select humidity, temperature, location, timec from conditions ) q group by time_bucket('1week', timec) , location ; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS select * from ( Select sum(humidity), avg(temperature::int4) @@ -189,92 +189,92 @@ from conditions group by time_bucket('1week', timec) , location ) q; --using limit /limit offset -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location limit 10 ; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location offset 10; --using ORDER BY in view defintion -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location ORDER BY 1; --using FETCH -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location fetch first 10 rows only; --using locking clauses FOR clause ---all should be disabled. we cannot guarntee locks on the hypertable -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +--all should be disabled. we cannot guarntee locks on the hypertable +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location FOR KEY SHARE; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location FOR SHARE; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location FOR UPDATE; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by time_bucket('1week', timec) , location FOR NO KEY UPDATE; --tablesample clause -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions tablesample bernoulli(0.2) group by time_bucket('1week', timec) , location ; -- ONLY in from clause -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from ONLY conditions group by time_bucket('1week', timec) , location ; --grouping sets and variants -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by grouping sets(time_bucket('1week', timec) , location ) ; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), avg(temperature::int4) +Select sum(humidity), avg(temperature::int4) from conditions group by rollup(time_bucket('1week', timec) , location ) ; @@ -282,39 +282,156 @@ group by rollup(time_bucket('1week', timec) , location ) ; CREATE FUNCTION test_stablefunc(int) RETURNS int LANGUAGE 'sql' STABLE AS 'SELECT $1 + 10'; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select sum(humidity), max(timec + INTERVAL '1h') from conditions group by time_bucket('1week', timec) , location ; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum(humidity), min(location) +Select sum(humidity), min(location) from conditions -group by time_bucket('1week', timec) +group by time_bucket('1week', timec) having max(timec + INTERVAL '1h') > '2010-01-01 09:00:00-08'; -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum( test_stablefunc(humidity::int) ), min(location) +Select sum( test_stablefunc(humidity::int) ), min(location) from conditions group by time_bucket('1week', timec); -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS -Select sum( temperature ), min(location) +Select sum( temperature ), min(location) from conditions group by time_bucket('1week', timec), test_stablefunc(humidity::int); -- row security on table create table rowsec_tab( a bigint, b integer, c integer); -select table_name from create_hypertable( 'rowsec_tab', 'a', chunk_time_interval=>10); +select table_name from create_hypertable( 'rowsec_tab', 'a', chunk_time_interval=>10); alter table rowsec_tab ENABLE ROW LEVEL SECURITY; create policy rowsec_tab_allview ON rowsec_tab FOR SELECT USING(true); -create view mat_m1 WITH ( timescaledb.continuous_agg = 'start') +create view mat_m1 WITH ( timescaledb.continuous) AS Select sum( b), min(c) from rowsec_tab group by time_bucket('1', a); + +drop table conditions cascade; + +--negative tests for WITH options +CREATE TABLE conditions ( + timec TIMESTAMPTZ NOT NULL, + location TEXT NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL, + lowp double precision NULL, + highp double precision null, + allnull double precision null + ); + +select table_name from create_hypertable( 'conditions', 'timec'); + +\set ON_ERROR_STOP 0 +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '5 joules', timescaledb.refresh_interval = '1h') +as +select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket('1day', timec); + +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '5h', timescaledb.refresh_interval = '1 joule') +as +select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket('1day', timec); +\set ON_ERROR_STOP 1 + +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '5 hours', timescaledb.refresh_interval = '1h') +as +select time_bucket('1day', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket('1day', timec); + +SELECT h.schema_name AS "MAT_SCHEMA_NAME", + h.table_name AS "MAT_TABLE_NAME", + partial_view_name as "PART_VIEW_NAME", + partial_view_schema as "PART_VIEW_SCHEMA" +FROM _timescaledb_catalog.continuous_agg ca +INNER JOIN _timescaledb_catalog.hypertable h ON(h.id = ca.mat_hypertable_id) +WHERE user_view_name = 'mat_with_test' +\gset + +\set ON_ERROR_STOP 0 +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '1 joule'); +ALTER VIEW mat_with_test RESET(timescaledb.refresh_lag); +ALTER VIEW mat_with_test ALTER timec DROP default; +ALTER VIEW :"PART_VIEW_SCHEMA".:"PART_VIEW_NAME" SET(timescaledb.refresh_lag = '1 hour'); +\set ON_ERROR_STOP 1 + +DROP TABLE conditions CASCADE; + + +--test WITH using a hypertable with an integer time dimension +CREATE TABLE conditions ( + timec SMALLINT NOT NULL, + location TEXT NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL, + lowp double precision NULL, + highp double precision null, + allnull double precision null + ); + +select table_name from create_hypertable( 'conditions', 'timec', chunk_time_interval=> 100); + +\set ON_ERROR_STOP 0 +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '1 hour', timescaledb.refresh_interval = '1h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); + +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '32768', timescaledb.refresh_interval = '1h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); + +ALTER TABLE conditions ALTER timec type int; + +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '2147483648', timescaledb.refresh_interval = '1h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); +\set ON_ERROR_STOP 1 + +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '2147483647', timescaledb.refresh_interval = '2h') +as +select time_bucket(100, timec), min(location), sum(temperature),sum(humidity) +from conditions +group by time_bucket(100, timec); + +\set ON_ERROR_STOP 0 +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '1h'); +ALTER VIEW mat_with_test SET(timescaledb.refresh_lag = '2147483648'); +ALTER TABLE conditions ALTER timec type bigint; +\set ON_ERROR_STOP +drop view mat_with_test CASCADE; + +ALTER TABLE conditions ALTER timec type bigint; +create or replace view mat_with_test( timec, minl, sumt , sumh) +WITH ( timescaledb.continuous, timescaledb.refresh_lag = '2147483647', timescaledb.refresh_interval = '2h') +as +select time_bucket(BIGINT '100', timec), min(location), sum(temperature),sum(humidity) +from conditions +group by 1; diff --git a/tsl/test/sql/continuous_aggs_materialize.sql b/tsl/test/sql/continuous_aggs_materialize.sql index da7f06894..6c19dfdbf 100644 --- a/tsl/test/sql/continuous_aggs_materialize.sql +++ b/tsl/test/sql/continuous_aggs_materialize.sql @@ -260,7 +260,7 @@ SET SESSION datestyle TO 'ISO'; SELECT * FROM continuous_agg_test_t; CREATE VIEW test_t_mat_view - WITH ( timescaledb.continuous_agg = 'start') + WITH ( timescaledb.continuous) AS SELECT time_bucket('2 hours', time), COUNT(data) as value FROM continuous_agg_test_t GROUP BY 1; @@ -348,7 +348,7 @@ SELECT create_hypertable('continuous_agg_extreme', 'time', chunk_time_interval=> -- TODO we should be able to use time_bucket(5, ...) (note lack of '), but that is currently not -- recognized as a constant CREATE VIEW extreme_view - WITH ( timescaledb.continuous_agg = 'start') + WITH ( timescaledb.continuous) AS SELECT time_bucket('1', time), SUM(data) as value FROM continuous_agg_extreme GROUP BY 1; diff --git a/tsl/test/sql/include/cont_agg_equal.sql b/tsl/test/sql/include/cont_agg_equal.sql index 0b2de063e..b7f148787 100644 --- a/tsl/test/sql/include/cont_agg_equal.sql +++ b/tsl/test/sql/include/cont_agg_equal.sql @@ -8,7 +8,7 @@ drop view if exists mat_test cascade; create view mat_test -WITH ( timescaledb.continuous_agg = 'start') +WITH ( timescaledb.continuous) as :QUERY ;