From f49492b83dd2b206cff20d5537cb68ab8d0596a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Mon, 7 Sep 2020 17:07:43 +0200 Subject: [PATCH] Cap invalidation threshold at last data bucket When refreshing with an "infinite" refresh window going forward in time, the invalidation threshold is also moved forward to the end of the valid time range. This effectively renders the invalidation threshold useless, leading to unnecessary write amplification. To handle infinite refreshes better, this change caps the refresh window at the end of the last bucket of data in the underlying hypertable, as to not move the invalidation threshold further than necessary. For instance, if the max time value in the hypertable is 11, a refresh command such as: ``` CALL refresh_continuous_aggregate(NULL, NULL); ``` would be turned into ``` CALL refresh_continuous_aggregate(NULL, 20); ``` assuming that a bucket starts at 10 and ends at 20 (exclusive). Thus the invalidation threshold would at most move to 20, allowing the threshold to still do its work once time again moves forward and beyond it. Note that one must never process invalidations beyond the invalidation threshold without also moving it, as that would clear that area from invalidations and thus prohibit refreshing that region once the invalidation threshold is moved forward. Therefore, if we do not move the threshold further than a certain point, we cannot refresh beyond it either. An alternative, and perhaps safer, approach would be to always invalidate the region over which the invalidation threshold is moved (i.e., new_threshold - old_threshold). However, that is left for a future change. It would be possible to also cap non-infinite refreshes, e.g., refreshes that end at a higher time value than the max time value in the hypertable. However, when an explicit end is specified, it might be on purpose so optimizing this case is also left for the future. Closes #2333 --- src/continuous_agg.c | 35 +-- src/hypertable.c | 49 ++++ src/hypertable.h | 2 + src/time_utils.c | 9 + src/time_utils.h | 5 + .../continuous_aggs/invalidation_threshold.c | 72 +++++- .../continuous_aggs/invalidation_threshold.h | 13 +- tsl/src/continuous_aggs/materialize.c | 6 +- tsl/src/continuous_aggs/refresh.c | 55 ++++- .../expected/continuous_aggs_invalidation.out | 227 +++++++++++++++++- tsl/test/expected/continuous_aggs_refresh.out | 3 +- .../continuous_aggs_union_view-11.out | 6 + .../continuous_aggs_union_view-12.out | 6 + tsl/test/sql/continuous_aggs_invalidation.sql | 140 ++++++++++- tsl/test/src/test_continuous_aggs.c | 2 +- 15 files changed, 557 insertions(+), 73 deletions(-) diff --git a/src/continuous_agg.c b/src/continuous_agg.c index 13fe0879e..af784017e 100644 --- a/src/continuous_agg.c +++ b/src/continuous_agg.c @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -1020,7 +1019,6 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS) { const int32 hyper_id = PG_GETARG_INT32(0); ContinuousAgg *cagg; - StringInfo command; Hypertable *ht; Dimension *dim; Datum maxdat; @@ -1028,7 +1026,6 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS) int64 watermark; Oid timetype; AclResult aclresult; - int res; if (PG_ARGISNULL(0)) ereport(ERROR, @@ -1051,34 +1048,7 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS) Assert(NULL != ht); dim = hyperspace_get_open_dimension(ht->space, 0); timetype = ts_dimension_get_partition_type(dim); - - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "could not connect to SPI"); - - /* Query for the last bucket in the materialized hypertable */ - command = makeStringInfo(); - appendStringInfo(command, - "SELECT max(%s) FROM %s.%s", - quote_identifier(NameStr(dim->fd.column_name)), - quote_identifier(NameStr(ht->fd.schema_name)), - quote_identifier(NameStr(ht->fd.table_name))); - - res = SPI_execute_with_args(command->data, - 0 /*=nargs*/, - NULL, - NULL, - NULL /*=Nulls*/, - true /*=read_only*/, - 0 /*count*/); - if (res < 0) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - (errmsg("could not find the maximum time value for hypertable"), - errdetail("SPI error when calculating continuous aggregate watermark: %d.", - res)))); - - Assert(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == timetype); - maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &max_isnull); + maxdat = ts_hypertable_get_open_dim_max_value(ht, 0, &max_isnull); if (!max_isnull) { @@ -1092,8 +1062,5 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS) watermark = ts_time_get_min(timetype); } - res = SPI_finish(); - Assert(res == SPI_OK_FINISH); - PG_RETURN_INT64(watermark); } diff --git a/src/hypertable.c b/src/hypertable.c index f5848e3eb..9d7ea73d0 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -2681,3 +2682,51 @@ ts_hypertable_func_call_on_data_nodes(Hypertable *ht, FunctionCallInfo fcinfo) if (hypertable_is_distributed(ht)) ts_cm_functions->func_call_on_data_nodes(fcinfo, ts_hypertable_get_data_node_name_list(ht)); } + +/* + * Get the max value of an open dimension. + */ +Datum +ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, bool *isnull) +{ + StringInfo command; + Dimension *dim; + int res; + bool max_isnull; + Datum maxdat; + + dim = hyperspace_get_open_dimension(ht->space, dimension_index); + + if (NULL == dim) + elog(ERROR, "invalid open dimension index %d", dimension_index); + + /* Query for the last bucket in the materialized hypertable */ + command = makeStringInfo(); + appendStringInfo(command, + "SELECT max(%s) FROM %s.%s", + quote_identifier(NameStr(dim->fd.column_name)), + quote_identifier(NameStr(ht->fd.schema_name)), + quote_identifier(NameStr(ht->fd.table_name))); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "could not connect to SPI"); + + res = SPI_execute(command->data, true /* read_only */, 0 /*count*/); + + if (res < 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + (errmsg("could not find the maximum time value for hypertable \"%s\"", + get_rel_name(ht->main_table_relid))))); + + Assert(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == ts_dimension_get_partition_type(dim)); + maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &max_isnull); + + if (isnull) + *isnull = max_isnull; + + res = SPI_finish(); + Assert(res == SPI_OK_FINISH); + + return maxdat; +} diff --git a/src/hypertable.h b/src/hypertable.h index 7c0f7b544..d319b49f1 100644 --- a/src/hypertable.h +++ b/src/hypertable.h @@ -156,6 +156,8 @@ extern TSDLLEXPORT void ts_hypertable_func_call_on_data_nodes(Hypertable *ht, FunctionCallInfo fcinfo); extern TSDLLEXPORT int16 ts_validate_replication_factor(int32 replication_factor, bool is_null, bool is_dist_call); +extern TSDLLEXPORT Datum ts_hypertable_get_open_dim_max_value(const Hypertable *ht, + int dimension_index, bool *isnull); #define hypertable_scan(schema, table, tuple_found, data, lockmode, tuplock) \ ts_hypertable_scan_with_memory_context(schema, \ diff --git a/src/time_utils.c b/src/time_utils.c index 140292814..c504f6cec 100644 --- a/src/time_utils.c +++ b/src/time_utils.c @@ -436,6 +436,15 @@ ts_time_get_noend(Oid timetype) return ts_time_get_noend(coerce_to_time_type(timetype)); } +int64 +ts_time_get_noend_or_max(Oid timetype) +{ + if (TS_TIME_IS_INTEGER_TIME(timetype)) + return ts_time_get_max(timetype); + + return ts_time_get_noend(timetype); +} + /* * Add an interval to a time value in a saturating way. * diff --git a/src/time_utils.h b/src/time_utils.h index 25777b2eb..82d8b3682 100644 --- a/src/time_utils.h +++ b/src/time_utils.h @@ -54,6 +54,10 @@ (TS_TIME_IS_INTEGER_TIME(type) || TS_TIME_DATUM_IS_NOBEGIN(timeval, type) || \ TS_TIME_DATUM_IS_NOEND(timeval, type)) +#define TS_TIME_IS_MIN(timeval, type) (timeval == ts_time_get_min(type)) +#define TS_TIME_IS_MAX(timeval, type) (timeval == ts_time_get_max(type)) +#define TS_TIME_IS_END(timeval, type) \ + (!TS_TIME_IS_INTEGER_TIME(type) && timeval == ts_time_get_end(type)) #define TS_TIME_IS_NOBEGIN(timeval, type) \ (!TS_TIME_IS_INTEGER_TIME(type) && timeval == ts_time_get_nobegin(type)) #define TS_TIME_IS_NOEND(timeval, type) \ @@ -76,6 +80,7 @@ extern TSDLLEXPORT int64 ts_time_get_end(Oid timetype); extern TSDLLEXPORT int64 ts_time_get_end_or_max(Oid timetype); extern TSDLLEXPORT int64 ts_time_get_nobegin(Oid timetype); extern TSDLLEXPORT int64 ts_time_get_noend(Oid timetype); +extern TSDLLEXPORT int64 ts_time_get_noend_or_max(Oid timetype); extern TSDLLEXPORT int64 ts_time_saturating_add(int64 timeval, int64 interval, Oid timetype); extern TSDLLEXPORT int64 ts_time_saturating_sub(int64 timeval, int64 interval, Oid timetype); diff --git a/tsl/src/continuous_aggs/invalidation_threshold.c b/tsl/src/continuous_aggs/invalidation_threshold.c index 010d9a0d8..578805a8c 100644 --- a/tsl/src/continuous_aggs/invalidation_threshold.c +++ b/tsl/src/continuous_aggs/invalidation_threshold.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,8 @@ #include #include #include +#include +#include #include "continuous_agg.h" #include "continuous_aggs/materialize.h" @@ -84,11 +87,12 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data) else { elog(DEBUG1, - "hypertable %d existing watermark >= new invalidation threshold " INT64_FORMAT + "hypertable %d existing watermark >= new invalidation threshold " INT64_FORMAT " " INT64_FORMAT, form->hypertable_id, form->watermark, invthresh->threshold); + invthresh->threshold = form->watermark; } if (should_free) @@ -97,12 +101,17 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data) return SCAN_DONE; } -/* every cont. agg calculates its invalidation_threshold point based on its - *refresh_lag etc. We update the raw hypertable's invalidation threshold - * only if this new value is greater than the existsing one. +/* + * Set a new invalidation threshold. + * + * The threshold is only updated if the new threshold is greater than the old + * one. + * + * On success, the new threshold is returned, otherwise the existing threshold + * is returned instead. */ -bool -continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold) +int64 +invalidation_threshold_set_or_get(int32 raw_hypertable_id, int64 invalidation_threshold) { bool threshold_found; InvalidationThresholdData data = { @@ -156,10 +165,9 @@ continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, int64 invalid ts_catalog_insert_values(rel, desc, values, nulls); table_close(rel, NoLock); - data.was_updated = true; } - return data.was_updated; + return data.threshold; } static ScanTupleResult @@ -177,7 +185,7 @@ invalidation_threshold_tuple_found(TupleInfo *ti, void *data) } int64 -continuous_agg_invalidation_threshold_get(int32 hypertable_id) +invalidation_threshold_get(int32 hypertable_id) { int64 threshold = 0; ScanKeyData scankey[1]; @@ -220,7 +228,7 @@ invalidation_threshold_htid_found(TupleInfo *tinfo, void *data) * block till lock is acquired. */ void -continuous_agg_invalidation_threshold_lock(int32 raw_hypertable_id) +invalidation_threshold_lock(int32 raw_hypertable_id) { ScanTupLock scantuplock = { .waitpolicy = LockWaitBlock, @@ -259,3 +267,47 @@ continuous_agg_invalidation_threshold_lock(int32 raw_hypertable_id) errmsg("found multiple invalidation rows for hypertable %d", raw_hypertable_id))); } } + +/* + * Compute a new invalidation threshold. + * + * The new invalidation threshold returned is the end of the given refresh + * window, unless it ends at "infinity" in which case the threshold is capped + * at the end of the last bucket materialized. + */ +int64 +invalidation_threshold_compute(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window) +{ + bool max_refresh = false; + Hypertable *ht = ts_hypertable_get_by_id(cagg->data.raw_hypertable_id); + + if (TS_TIME_IS_INTEGER_TIME(refresh_window->type)) + max_refresh = TS_TIME_IS_MAX(refresh_window->end, refresh_window->type); + else + max_refresh = TS_TIME_IS_END(refresh_window->end, refresh_window->type) || + TS_TIME_IS_NOEND(refresh_window->end, refresh_window->type); + + if (max_refresh) + { + bool isnull; + Datum maxdat = ts_hypertable_get_open_dim_max_value(ht, 0, &isnull); + + if (isnull) + { + /* No data in hypertable, so return min (start of time) */ + return ts_time_get_min(refresh_window->type); + } + else + { + int64 maxval = ts_time_value_to_internal(maxdat, refresh_window->type); + int64 bucket_start = + ts_time_bucket_by_type(cagg->data.bucket_width, maxval, refresh_window->type); + /* Add one bucket to get to the end of the last bucket */ + return ts_time_saturating_add(bucket_start, + cagg->data.bucket_width, + refresh_window->type); + } + } + + return refresh_window->end; +} diff --git a/tsl/src/continuous_aggs/invalidation_threshold.h b/tsl/src/continuous_aggs/invalidation_threshold.h index 483bd20fe..8ab6940fa 100644 --- a/tsl/src/continuous_aggs/invalidation_threshold.h +++ b/tsl/src/continuous_aggs/invalidation_threshold.h @@ -8,9 +8,14 @@ #include -extern int64 continuous_agg_invalidation_threshold_get(int32 hypertable_id); -extern bool continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, - int64 invalidation_threshold); -extern void continuous_agg_invalidation_threshold_lock(int32 raw_hypertable_id); +typedef struct InternalTimeRange InternalTimeRange; +typedef struct ContinuousAgg ContinuousAgg; + +extern int64 invalidation_threshold_get(int32 hypertable_id); +extern int64 invalidation_threshold_set_or_get(int32 raw_hypertable_id, + int64 invalidation_threshold); +extern void invalidation_threshold_lock(int32 raw_hypertable_id); +extern int64 invalidation_threshold_compute(const ContinuousAgg *cagg, + const InternalTimeRange *refresh_window); #endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_H */ diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index 9b4f7d21c..749038765 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -222,7 +222,7 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op * (github issue 1940) * Prevent this by serializing on the raw hypertable row */ - continuous_agg_invalidation_threshold_lock(cagg_data.raw_hypertable_id); + invalidation_threshold_lock(cagg_data.raw_hypertable_id); drain_invalidation_log(cagg_data.raw_hypertable_id, &invalidations); materialization_invalidation_log_table_relation = table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG), @@ -292,8 +292,8 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op { LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), AccessExclusiveLock); - continuous_agg_invalidation_threshold_set(cagg_data.raw_hypertable_id, - materialization_invalidation_threshold); + invalidation_threshold_set_or_get(cagg_data.raw_hypertable_id, + materialization_invalidation_threshold); } table_close(materialization_invalidation_log_table_relation, NoLock); diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index 756fa1db2..31d07f1e4 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -253,9 +253,8 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) Dimension *time_dim; InternalTimeRange refresh_window = { .type = InvalidOid, - .start = PG_INT64_MIN, - .end = PG_INT64_MAX, }; + if (!OidIsValid(cagg_relid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid continuous aggregate"))); @@ -275,6 +274,7 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errmsg("relation \"%s\" is not a continuous aggregate", relname)))); } + cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id); Assert(cagg_ht != NULL); time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0); @@ -285,15 +285,28 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) refresh_window.start = ts_time_value_from_arg(PG_GETARG_DATUM(1), get_fn_expr_argtype(fcinfo->flinfo, 1), refresh_window.type); + else + refresh_window.start = ts_time_get_min(refresh_window.type); if (!PG_ARGISNULL(2)) refresh_window.end = ts_time_value_from_arg(PG_GETARG_DATUM(2), get_fn_expr_argtype(fcinfo->flinfo, 2), refresh_window.type); + else + refresh_window.end = ts_time_get_noend_or_max(refresh_window.type); + continuous_agg_refresh_internal(cagg, &refresh_window); PG_RETURN_VOID(); } +static void +emit_up_to_date_notice(const ContinuousAgg *cagg) +{ + elog(NOTICE, + "continuous aggregate \"%s\" is already up-to-date", + NameStr(cagg->data.user_view_name)); +} + void continuous_agg_refresh_internal(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window_arg) @@ -303,6 +316,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, int32 mat_id = cagg->data.mat_hypertable_id; InternalTimeRange refresh_window; InvalidationStore *invalidations = NULL; + int64 computed_invalidation_threshold; + int64 invalidation_threshold; PreventCommandIfReadOnly(REFRESH_FUNCTION_NAME); @@ -342,7 +357,37 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, */ LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), AccessExclusiveLock); - continuous_agg_invalidation_threshold_set(cagg->data.raw_hypertable_id, refresh_window.end); + + /* Compute new invalidation threshold. Note that this computation caps the + * threshold at the end of the last bucket that holds data in the + * underlying hypertable. */ + computed_invalidation_threshold = invalidation_threshold_compute(cagg, &refresh_window); + + /* Set the new invalidation threshold. Note that this only updates the + * threshold if the new value is greater than the old one. Otherwise, the + * existing threshold is returned. */ + invalidation_threshold = invalidation_threshold_set_or_get(cagg->data.raw_hypertable_id, + computed_invalidation_threshold); + + /* We must also cap the refresh window at the invalidation threshold. If + * we process invalidations after the threshold, the continuous aggregates + * won't be refreshed when the threshold is moved forward in the + * future. The invalidation threshold should already be aligned on bucket + * boundary. */ + if (refresh_window.end > invalidation_threshold) + { + refresh_window.end = invalidation_threshold; + + /* Capping the end might have made the window 0, or negative, so + * nothing to refresh in that case */ + if (refresh_window.start >= refresh_window.end) + { + emit_up_to_date_notice(cagg); + return; + } + } + + /* Process invalidations in the hypertable invalidation log */ invalidation_process_hypertable_log(cagg, &refresh_window); /* Start a new transaction. Note that this invalidates previous memory @@ -370,7 +415,5 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, invalidation_store_free(invalidations); } else - elog(NOTICE, - "continuous aggregate \"%s\" is already up-to-date", - NameStr(cagg->data.user_view_name)); + emit_up_to_date_notice(cagg); } diff --git a/tsl/test/expected/continuous_aggs_invalidation.out b/tsl/test/expected/continuous_aggs_invalidation.out index 6d3fb135c..6b88005fa 100644 --- a/tsl/test/expected/continuous_aggs_invalidation.out +++ b/tsl/test/expected/continuous_aggs_invalidation.out @@ -26,25 +26,25 @@ SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10); (2,public,measurements,t) (1 row) -CREATE OR REPLACE FUNCTION cond_now() +CREATE OR REPLACE FUNCTION bigint_now() RETURNS bigint LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM conditions $$; -CREATE OR REPLACE FUNCTION measure_now() +CREATE OR REPLACE FUNCTION int_now() RETURNS int LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM measurements $$; -SELECT set_integer_now_func('conditions', 'cond_now'); +SELECT set_integer_now_func('conditions', 'bigint_now'); set_integer_now_func ---------------------- (1 row) -SELECT set_integer_now_func('measurements', 'measure_now'); +SELECT set_integer_now_func('measurements', 'int_now'); set_integer_now_func ---------------------- @@ -527,7 +527,7 @@ SELECT materialization_id AS cagg_id, cagg_id | start | end ---------+----------------------+---------------------- 3 | -9223372036854775808 | -9223372036854775801 - 3 | 9223372036854775807 | 9223372036854775807 + 3 | 110 | 9223372036854775807 4 | -9223372036854775808 | 19 4 | 15 | 42 4 | 20 | 25 @@ -851,11 +851,11 @@ SELECT materialization_id AS cagg_id, FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log WHERE materialization_id = :cond_1_id ORDER BY 1,2,3; - cagg_id | start | end ----------+---------------------+--------------------- - 6 | 0 | 0 - 6 | 2 | 2 - 6 | 9223372036854775807 | 9223372036854775807 + cagg_id | start | end +---------+-------+--------------------- + 6 | 0 | 0 + 6 | 2 | 2 + 6 | 110 | 9223372036854775807 (3 rows) -- Refresh the two remaining invalidations @@ -894,3 +894,210 @@ ORDER BY 1,2; 2 | 1 | 3 (3 rows) +---------------------------------------------- +-- Test that invalidation threshold is capped +---------------------------------------------- +CREATE table threshold_test (time int, value int); +SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4); +NOTICE: adding not-null constraint to column "time" + create_hypertable +----------------------------- + (7,public,threshold_test,t) +(1 row) + +SELECT set_integer_now_func('threshold_test', 'int_now'); + set_integer_now_func +---------------------- + +(1 row) + +CREATE MATERIALIZED VIEW thresh_2 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1; +SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'thresh_2' \gset +-- There's no invalidation threshold initially +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- +(0 rows) + +-- Test that threshold is initilized to min value when there's no data +-- and we specify an infinite end. Note that the min value may differ +-- depending on time type. +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +NOTICE: continuous aggregate "thresh_2" is already up-to-date +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+------------- + 7 | -2147483648 +(1 row) + +INSERT INTO threshold_test +SELECT v, v FROM generate_series(1, 10) v; +CALL refresh_continuous_aggregate('thresh_2', 0, 5); +-- Threshold should move to end of refresh window (note that window +-- expands to end of bucket). +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 6 +(1 row) + +-- Refresh where both the start and end of the window is above the +-- max data value +CALL refresh_continuous_aggregate('thresh_2', 14, NULL); +NOTICE: continuous aggregate "thresh_2" is already up-to-date +SELECT watermark AS thresh_hyper_id_watermark +FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id \gset +-- Refresh where we start from the current watermark to infinity +CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL); +NOTICE: continuous aggregate "thresh_2" is already up-to-date +-- Now refresh with max end of the window to test that the +-- invalidation threshold is capped at the last bucket of data +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 12 +(1 row) + +-- Should not have processed invalidations beyond the invalidation +-- threshold. +SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + WHERE materialization_id = :thresh_cagg_id + ORDER BY 1,2,3; + cagg_id | start | end +---------+----------------------+--------------------- + 8 | -9223372036854775808 | -1 + 8 | 12 | 9223372036854775807 +(2 rows) + +-- Check that things are properly materialized +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 10 +(6 rows) + +-- Delete the last data +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('threshold_test') +ORDER BY 1 DESC +LIMIT 1 \gset +DELETE FROM threshold_test +WHERE time > 6; +-- The last data in the hypertable is gone +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 6 +(4 rows) + +-- The aggregate still holds data +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 10 +(6 rows) + +-- Refresh the aggregate to bring it up-to-date +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +-- Data also gone from the aggregate +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 6 +(4 rows) + +-- The invalidation threshold remains the same +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 12 +(1 row) + +-- Insert new data beyond the invalidation threshold to move it +-- forward +INSERT INTO threshold_test +SELECT v, v FROM generate_series(7, 15) v; +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +-- Aggregate now updated to reflect newly aggregated data +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 11 + 12 | 13 + 14 | 15 +(8 rows) + +-- The invalidation threshold should have moved forward to the end of +-- the new data +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 16 +(1 row) + +-- The aggregate remains invalid beyond the invalidation threshold +SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + WHERE materialization_id = :thresh_cagg_id + ORDER BY 1,2,3; + cagg_id | start | end +---------+----------------------+--------------------- + 8 | -9223372036854775808 | -1 + 8 | 16 | 9223372036854775807 +(2 rows) + diff --git a/tsl/test/expected/continuous_aggs_refresh.out b/tsl/test/expected/continuous_aggs_refresh.out index 7a7c3b363..cb0df7492 100644 --- a/tsl/test/expected/continuous_aggs_refresh.out +++ b/tsl/test/expected/continuous_aggs_refresh.out @@ -83,7 +83,7 @@ ORDER BY day DESC, device; SET client_min_messages TO DEBUG1; CALL refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-03'); DEBUG: refreshing continuous aggregate "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sun May 03 17:00:00 2020 PDT ] -DEBUG: hypertable 1 existing watermark >= new invalidation threshold 1588723200000000 1588550400000000 +DEBUG: hypertable 1 existing watermark >= new invalidation threshold 1588723200000000 1588550400000000 DEBUG: invalidation refresh on "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sat May 02 17:00:00 2020 PDT ] RESET client_min_messages; -- Compare the aggregate to the equivalent query on the source table @@ -148,6 +148,7 @@ CALL refresh_continuous_aggregate('daily_temp', '2020-05-01'::date, '2020-05-03' NOTICE: continuous aggregate "daily_temp" is already up-to-date -- Unbounded window forward in time CALL refresh_continuous_aggregate('daily_temp', '2020-05-03', NULL); +NOTICE: continuous aggregate "daily_temp" is already up-to-date CALL refresh_continuous_aggregate('daily_temp', NULL, NULL); -- Unbounded window back in time CALL refresh_continuous_aggregate('daily_temp', NULL, '2020-05-01'); diff --git a/tsl/test/expected/continuous_aggs_union_view-11.out b/tsl/test/expected/continuous_aggs_union_view-11.out index 9aed030af..42aa83f44 100644 --- a/tsl/test/expected/continuous_aggs_union_view-11.out +++ b/tsl/test/expected/continuous_aggs_union_view-11.out @@ -766,11 +766,17 @@ FROM timestamptz_table GROUP BY 1; -- Refresh first without data CALL refresh_continuous_aggregate('int_agg', NULL, NULL); +NOTICE: continuous aggregate "int_agg" is already up-to-date CALL refresh_continuous_aggregate('smallint_agg', NULL, NULL); +NOTICE: continuous aggregate "smallint_agg" is already up-to-date CALL refresh_continuous_aggregate('bigint_agg', NULL, NULL); +NOTICE: continuous aggregate "bigint_agg" is already up-to-date CALL refresh_continuous_aggregate('date_agg', NULL, NULL); +NOTICE: continuous aggregate "date_agg" is already up-to-date CALL refresh_continuous_aggregate('timestamp_agg', NULL, NULL); +NOTICE: continuous aggregate "timestamp_agg" is already up-to-date CALL refresh_continuous_aggregate('timestamptz_agg', NULL, NULL); +NOTICE: continuous aggregate "timestamptz_agg" is already up-to-date -- Watermarks at min for the above caggs: SELECT user_view_name, _timescaledb_internal.cagg_watermark(mat_hypertable_id) FROM _timescaledb_catalog.continuous_agg diff --git a/tsl/test/expected/continuous_aggs_union_view-12.out b/tsl/test/expected/continuous_aggs_union_view-12.out index 42b0f9d8c..b7b70c251 100644 --- a/tsl/test/expected/continuous_aggs_union_view-12.out +++ b/tsl/test/expected/continuous_aggs_union_view-12.out @@ -769,11 +769,17 @@ FROM timestamptz_table GROUP BY 1; -- Refresh first without data CALL refresh_continuous_aggregate('int_agg', NULL, NULL); +NOTICE: continuous aggregate "int_agg" is already up-to-date CALL refresh_continuous_aggregate('smallint_agg', NULL, NULL); +NOTICE: continuous aggregate "smallint_agg" is already up-to-date CALL refresh_continuous_aggregate('bigint_agg', NULL, NULL); +NOTICE: continuous aggregate "bigint_agg" is already up-to-date CALL refresh_continuous_aggregate('date_agg', NULL, NULL); +NOTICE: continuous aggregate "date_agg" is already up-to-date CALL refresh_continuous_aggregate('timestamp_agg', NULL, NULL); +NOTICE: continuous aggregate "timestamp_agg" is already up-to-date CALL refresh_continuous_aggregate('timestamptz_agg', NULL, NULL); +NOTICE: continuous aggregate "timestamptz_agg" is already up-to-date -- Watermarks at min for the above caggs: SELECT user_view_name, _timescaledb_internal.cagg_watermark(mat_hypertable_id) FROM _timescaledb_catalog.continuous_agg diff --git a/tsl/test/sql/continuous_aggs_invalidation.sql b/tsl/test/sql/continuous_aggs_invalidation.sql index 07898336e..7b86bf112 100644 --- a/tsl/test/sql/continuous_aggs_invalidation.sql +++ b/tsl/test/sql/continuous_aggs_invalidation.sql @@ -15,22 +15,22 @@ SELECT create_hypertable('conditions', 'time', chunk_time_interval => 10); CREATE TABLE measurements (time int NOT NULL, device int, temp float); SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10); -CREATE OR REPLACE FUNCTION cond_now() +CREATE OR REPLACE FUNCTION bigint_now() RETURNS bigint LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM conditions $$; -CREATE OR REPLACE FUNCTION measure_now() +CREATE OR REPLACE FUNCTION int_now() RETURNS int LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM measurements $$; -SELECT set_integer_now_func('conditions', 'cond_now'); -SELECT set_integer_now_func('measurements', 'measure_now'); +SELECT set_integer_now_func('conditions', 'bigint_now'); +SELECT set_integer_now_func('measurements', 'int_now'); INSERT INTO conditions SELECT t, ceil(abs(timestamp_hash(to_timestamp(t)::timestamp))%4)::int, @@ -515,3 +515,135 @@ INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0); CALL refresh_continuous_aggregate('cond_1', 0, 3); SELECT * FROM cond_1 ORDER BY 1,2; + +---------------------------------------------- +-- Test that invalidation threshold is capped +---------------------------------------------- +CREATE table threshold_test (time int, value int); +SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4); +SELECT set_integer_now_func('threshold_test', 'int_now'); + +CREATE MATERIALIZED VIEW thresh_2 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1; + +SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'thresh_2' \gset + +-- There's no invalidation threshold initially +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- Test that threshold is initilized to min value when there's no data +-- and we specify an infinite end. Note that the min value may differ +-- depending on time type. +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); + +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +INSERT INTO threshold_test +SELECT v, v FROM generate_series(1, 10) v; + +CALL refresh_continuous_aggregate('thresh_2', 0, 5); + +-- Threshold should move to end of refresh window (note that window +-- expands to end of bucket). +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- Refresh where both the start and end of the window is above the +-- max data value +CALL refresh_continuous_aggregate('thresh_2', 14, NULL); + +SELECT watermark AS thresh_hyper_id_watermark +FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id \gset + +-- Refresh where we start from the current watermark to infinity +CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL); + +-- Now refresh with max end of the window to test that the +-- invalidation threshold is capped at the last bucket of data +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); + +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- Should not have processed invalidations beyond the invalidation +-- threshold. +SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + WHERE materialization_id = :thresh_cagg_id + ORDER BY 1,2,3; + +-- Check that things are properly materialized +SELECT * FROM thresh_2 +ORDER BY 1; + +-- Delete the last data +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('threshold_test') +ORDER BY 1 DESC +LIMIT 1 \gset + +DELETE FROM threshold_test +WHERE time > 6; + +-- The last data in the hypertable is gone +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1 +ORDER BY 1; + +-- The aggregate still holds data +SELECT * FROM thresh_2 +ORDER BY 1; + +-- Refresh the aggregate to bring it up-to-date +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); + +-- Data also gone from the aggregate +SELECT * FROM thresh_2 +ORDER BY 1; + +-- The invalidation threshold remains the same +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- Insert new data beyond the invalidation threshold to move it +-- forward +INSERT INTO threshold_test +SELECT v, v FROM generate_series(7, 15) v; + +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); + +-- Aggregate now updated to reflect newly aggregated data +SELECT * FROM thresh_2 +ORDER BY 1; + +-- The invalidation threshold should have moved forward to the end of +-- the new data +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- The aggregate remains invalid beyond the invalidation threshold +SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + WHERE materialization_id = :thresh_cagg_id + ORDER BY 1,2,3; diff --git a/tsl/test/src/test_continuous_aggs.c b/tsl/test/src/test_continuous_aggs.c index 816b9265e..7a8626acf 100644 --- a/tsl/test/src/test_continuous_aggs.c +++ b/tsl/test/src/test_continuous_aggs.c @@ -55,7 +55,7 @@ ts_run_continuous_agg_materialization(PG_FUNCTION_ARGS) if (partial_view.name == NULL) elog(ERROR, "view cannot be NULL"); - invalidation_threshold = continuous_agg_invalidation_threshold_get(hypertable_id); + invalidation_threshold = invalidation_threshold_get(hypertable_id); completed_threshold = ts_continuous_agg_get_completed_threshold(materialization_id); if (lmv > completed_threshold)