diff --git a/src/continuous_agg.c b/src/continuous_agg.c index 13fe0879e..af784017e 100644 --- a/src/continuous_agg.c +++ b/src/continuous_agg.c @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -1020,7 +1019,6 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS) { const int32 hyper_id = PG_GETARG_INT32(0); ContinuousAgg *cagg; - StringInfo command; Hypertable *ht; Dimension *dim; Datum maxdat; @@ -1028,7 +1026,6 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS) int64 watermark; Oid timetype; AclResult aclresult; - int res; if (PG_ARGISNULL(0)) ereport(ERROR, @@ -1051,34 +1048,7 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS) Assert(NULL != ht); dim = hyperspace_get_open_dimension(ht->space, 0); timetype = ts_dimension_get_partition_type(dim); - - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "could not connect to SPI"); - - /* Query for the last bucket in the materialized hypertable */ - command = makeStringInfo(); - appendStringInfo(command, - "SELECT max(%s) FROM %s.%s", - quote_identifier(NameStr(dim->fd.column_name)), - quote_identifier(NameStr(ht->fd.schema_name)), - quote_identifier(NameStr(ht->fd.table_name))); - - res = SPI_execute_with_args(command->data, - 0 /*=nargs*/, - NULL, - NULL, - NULL /*=Nulls*/, - true /*=read_only*/, - 0 /*count*/); - if (res < 0) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - (errmsg("could not find the maximum time value for hypertable"), - errdetail("SPI error when calculating continuous aggregate watermark: %d.", - res)))); - - Assert(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == timetype); - maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &max_isnull); + maxdat = ts_hypertable_get_open_dim_max_value(ht, 0, &max_isnull); if (!max_isnull) { @@ -1092,8 +1062,5 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS) watermark = ts_time_get_min(timetype); } - res = SPI_finish(); - Assert(res == SPI_OK_FINISH); - PG_RETURN_INT64(watermark); } diff --git a/src/hypertable.c b/src/hypertable.c index f5848e3eb..9d7ea73d0 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -2681,3 +2682,51 @@ ts_hypertable_func_call_on_data_nodes(Hypertable *ht, FunctionCallInfo fcinfo) if (hypertable_is_distributed(ht)) ts_cm_functions->func_call_on_data_nodes(fcinfo, ts_hypertable_get_data_node_name_list(ht)); } + +/* + * Get the max value of an open dimension. + */ +Datum +ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, bool *isnull) +{ + StringInfo command; + Dimension *dim; + int res; + bool max_isnull; + Datum maxdat; + + dim = hyperspace_get_open_dimension(ht->space, dimension_index); + + if (NULL == dim) + elog(ERROR, "invalid open dimension index %d", dimension_index); + + /* Query for the last bucket in the materialized hypertable */ + command = makeStringInfo(); + appendStringInfo(command, + "SELECT max(%s) FROM %s.%s", + quote_identifier(NameStr(dim->fd.column_name)), + quote_identifier(NameStr(ht->fd.schema_name)), + quote_identifier(NameStr(ht->fd.table_name))); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "could not connect to SPI"); + + res = SPI_execute(command->data, true /* read_only */, 0 /*count*/); + + if (res < 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + (errmsg("could not find the maximum time value for hypertable \"%s\"", + get_rel_name(ht->main_table_relid))))); + + Assert(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == ts_dimension_get_partition_type(dim)); + maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &max_isnull); + + if (isnull) + *isnull = max_isnull; + + res = SPI_finish(); + Assert(res == SPI_OK_FINISH); + + return maxdat; +} diff --git a/src/hypertable.h b/src/hypertable.h index 7c0f7b544..d319b49f1 100644 --- a/src/hypertable.h +++ b/src/hypertable.h @@ -156,6 +156,8 @@ extern TSDLLEXPORT void ts_hypertable_func_call_on_data_nodes(Hypertable *ht, FunctionCallInfo fcinfo); extern TSDLLEXPORT int16 ts_validate_replication_factor(int32 replication_factor, bool is_null, bool is_dist_call); +extern TSDLLEXPORT Datum ts_hypertable_get_open_dim_max_value(const Hypertable *ht, + int dimension_index, bool *isnull); #define hypertable_scan(schema, table, tuple_found, data, lockmode, tuplock) \ ts_hypertable_scan_with_memory_context(schema, \ diff --git a/src/time_utils.c b/src/time_utils.c index 140292814..c504f6cec 100644 --- a/src/time_utils.c +++ b/src/time_utils.c @@ -436,6 +436,15 @@ ts_time_get_noend(Oid timetype) return ts_time_get_noend(coerce_to_time_type(timetype)); } +int64 +ts_time_get_noend_or_max(Oid timetype) +{ + if (TS_TIME_IS_INTEGER_TIME(timetype)) + return ts_time_get_max(timetype); + + return ts_time_get_noend(timetype); +} + /* * Add an interval to a time value in a saturating way. * diff --git a/src/time_utils.h b/src/time_utils.h index 25777b2eb..82d8b3682 100644 --- a/src/time_utils.h +++ b/src/time_utils.h @@ -54,6 +54,10 @@ (TS_TIME_IS_INTEGER_TIME(type) || TS_TIME_DATUM_IS_NOBEGIN(timeval, type) || \ TS_TIME_DATUM_IS_NOEND(timeval, type)) +#define TS_TIME_IS_MIN(timeval, type) (timeval == ts_time_get_min(type)) +#define TS_TIME_IS_MAX(timeval, type) (timeval == ts_time_get_max(type)) +#define TS_TIME_IS_END(timeval, type) \ + (!TS_TIME_IS_INTEGER_TIME(type) && timeval == ts_time_get_end(type)) #define TS_TIME_IS_NOBEGIN(timeval, type) \ (!TS_TIME_IS_INTEGER_TIME(type) && timeval == ts_time_get_nobegin(type)) #define TS_TIME_IS_NOEND(timeval, type) \ @@ -76,6 +80,7 @@ extern TSDLLEXPORT int64 ts_time_get_end(Oid timetype); extern TSDLLEXPORT int64 ts_time_get_end_or_max(Oid timetype); extern TSDLLEXPORT int64 ts_time_get_nobegin(Oid timetype); extern TSDLLEXPORT int64 ts_time_get_noend(Oid timetype); +extern TSDLLEXPORT int64 ts_time_get_noend_or_max(Oid timetype); extern TSDLLEXPORT int64 ts_time_saturating_add(int64 timeval, int64 interval, Oid timetype); extern TSDLLEXPORT int64 ts_time_saturating_sub(int64 timeval, int64 interval, Oid timetype); diff --git a/tsl/src/continuous_aggs/invalidation_threshold.c b/tsl/src/continuous_aggs/invalidation_threshold.c index 010d9a0d8..578805a8c 100644 --- a/tsl/src/continuous_aggs/invalidation_threshold.c +++ b/tsl/src/continuous_aggs/invalidation_threshold.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,8 @@ #include #include #include +#include +#include #include "continuous_agg.h" #include "continuous_aggs/materialize.h" @@ -84,11 +87,12 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data) else { elog(DEBUG1, - "hypertable %d existing watermark >= new invalidation threshold " INT64_FORMAT + "hypertable %d existing watermark >= new invalidation threshold " INT64_FORMAT " " INT64_FORMAT, form->hypertable_id, form->watermark, invthresh->threshold); + invthresh->threshold = form->watermark; } if (should_free) @@ -97,12 +101,17 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data) return SCAN_DONE; } -/* every cont. agg calculates its invalidation_threshold point based on its - *refresh_lag etc. We update the raw hypertable's invalidation threshold - * only if this new value is greater than the existsing one. +/* + * Set a new invalidation threshold. + * + * The threshold is only updated if the new threshold is greater than the old + * one. + * + * On success, the new threshold is returned, otherwise the existing threshold + * is returned instead. */ -bool -continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold) +int64 +invalidation_threshold_set_or_get(int32 raw_hypertable_id, int64 invalidation_threshold) { bool threshold_found; InvalidationThresholdData data = { @@ -156,10 +165,9 @@ continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, int64 invalid ts_catalog_insert_values(rel, desc, values, nulls); table_close(rel, NoLock); - data.was_updated = true; } - return data.was_updated; + return data.threshold; } static ScanTupleResult @@ -177,7 +185,7 @@ invalidation_threshold_tuple_found(TupleInfo *ti, void *data) } int64 -continuous_agg_invalidation_threshold_get(int32 hypertable_id) +invalidation_threshold_get(int32 hypertable_id) { int64 threshold = 0; ScanKeyData scankey[1]; @@ -220,7 +228,7 @@ invalidation_threshold_htid_found(TupleInfo *tinfo, void *data) * block till lock is acquired. */ void -continuous_agg_invalidation_threshold_lock(int32 raw_hypertable_id) +invalidation_threshold_lock(int32 raw_hypertable_id) { ScanTupLock scantuplock = { .waitpolicy = LockWaitBlock, @@ -259,3 +267,47 @@ continuous_agg_invalidation_threshold_lock(int32 raw_hypertable_id) errmsg("found multiple invalidation rows for hypertable %d", raw_hypertable_id))); } } + +/* + * Compute a new invalidation threshold. + * + * The new invalidation threshold returned is the end of the given refresh + * window, unless it ends at "infinity" in which case the threshold is capped + * at the end of the last bucket materialized. + */ +int64 +invalidation_threshold_compute(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window) +{ + bool max_refresh = false; + Hypertable *ht = ts_hypertable_get_by_id(cagg->data.raw_hypertable_id); + + if (TS_TIME_IS_INTEGER_TIME(refresh_window->type)) + max_refresh = TS_TIME_IS_MAX(refresh_window->end, refresh_window->type); + else + max_refresh = TS_TIME_IS_END(refresh_window->end, refresh_window->type) || + TS_TIME_IS_NOEND(refresh_window->end, refresh_window->type); + + if (max_refresh) + { + bool isnull; + Datum maxdat = ts_hypertable_get_open_dim_max_value(ht, 0, &isnull); + + if (isnull) + { + /* No data in hypertable, so return min (start of time) */ + return ts_time_get_min(refresh_window->type); + } + else + { + int64 maxval = ts_time_value_to_internal(maxdat, refresh_window->type); + int64 bucket_start = + ts_time_bucket_by_type(cagg->data.bucket_width, maxval, refresh_window->type); + /* Add one bucket to get to the end of the last bucket */ + return ts_time_saturating_add(bucket_start, + cagg->data.bucket_width, + refresh_window->type); + } + } + + return refresh_window->end; +} diff --git a/tsl/src/continuous_aggs/invalidation_threshold.h b/tsl/src/continuous_aggs/invalidation_threshold.h index 483bd20fe..8ab6940fa 100644 --- a/tsl/src/continuous_aggs/invalidation_threshold.h +++ b/tsl/src/continuous_aggs/invalidation_threshold.h @@ -8,9 +8,14 @@ #include -extern int64 continuous_agg_invalidation_threshold_get(int32 hypertable_id); -extern bool continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, - int64 invalidation_threshold); -extern void continuous_agg_invalidation_threshold_lock(int32 raw_hypertable_id); +typedef struct InternalTimeRange InternalTimeRange; +typedef struct ContinuousAgg ContinuousAgg; + +extern int64 invalidation_threshold_get(int32 hypertable_id); +extern int64 invalidation_threshold_set_or_get(int32 raw_hypertable_id, + int64 invalidation_threshold); +extern void invalidation_threshold_lock(int32 raw_hypertable_id); +extern int64 invalidation_threshold_compute(const ContinuousAgg *cagg, + const InternalTimeRange *refresh_window); #endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_H */ diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index 9b4f7d21c..749038765 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -222,7 +222,7 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op * (github issue 1940) * Prevent this by serializing on the raw hypertable row */ - continuous_agg_invalidation_threshold_lock(cagg_data.raw_hypertable_id); + invalidation_threshold_lock(cagg_data.raw_hypertable_id); drain_invalidation_log(cagg_data.raw_hypertable_id, &invalidations); materialization_invalidation_log_table_relation = table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG), @@ -292,8 +292,8 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op { LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), AccessExclusiveLock); - continuous_agg_invalidation_threshold_set(cagg_data.raw_hypertable_id, - materialization_invalidation_threshold); + invalidation_threshold_set_or_get(cagg_data.raw_hypertable_id, + materialization_invalidation_threshold); } table_close(materialization_invalidation_log_table_relation, NoLock); diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index 756fa1db2..31d07f1e4 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -253,9 +253,8 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) Dimension *time_dim; InternalTimeRange refresh_window = { .type = InvalidOid, - .start = PG_INT64_MIN, - .end = PG_INT64_MAX, }; + if (!OidIsValid(cagg_relid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid continuous aggregate"))); @@ -275,6 +274,7 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errmsg("relation \"%s\" is not a continuous aggregate", relname)))); } + cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id); Assert(cagg_ht != NULL); time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0); @@ -285,15 +285,28 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) refresh_window.start = ts_time_value_from_arg(PG_GETARG_DATUM(1), get_fn_expr_argtype(fcinfo->flinfo, 1), refresh_window.type); + else + refresh_window.start = ts_time_get_min(refresh_window.type); if (!PG_ARGISNULL(2)) refresh_window.end = ts_time_value_from_arg(PG_GETARG_DATUM(2), get_fn_expr_argtype(fcinfo->flinfo, 2), refresh_window.type); + else + refresh_window.end = ts_time_get_noend_or_max(refresh_window.type); + continuous_agg_refresh_internal(cagg, &refresh_window); PG_RETURN_VOID(); } +static void +emit_up_to_date_notice(const ContinuousAgg *cagg) +{ + elog(NOTICE, + "continuous aggregate \"%s\" is already up-to-date", + NameStr(cagg->data.user_view_name)); +} + void continuous_agg_refresh_internal(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window_arg) @@ -303,6 +316,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, int32 mat_id = cagg->data.mat_hypertable_id; InternalTimeRange refresh_window; InvalidationStore *invalidations = NULL; + int64 computed_invalidation_threshold; + int64 invalidation_threshold; PreventCommandIfReadOnly(REFRESH_FUNCTION_NAME); @@ -342,7 +357,37 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, */ LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), AccessExclusiveLock); - continuous_agg_invalidation_threshold_set(cagg->data.raw_hypertable_id, refresh_window.end); + + /* Compute new invalidation threshold. Note that this computation caps the + * threshold at the end of the last bucket that holds data in the + * underlying hypertable. */ + computed_invalidation_threshold = invalidation_threshold_compute(cagg, &refresh_window); + + /* Set the new invalidation threshold. Note that this only updates the + * threshold if the new value is greater than the old one. Otherwise, the + * existing threshold is returned. */ + invalidation_threshold = invalidation_threshold_set_or_get(cagg->data.raw_hypertable_id, + computed_invalidation_threshold); + + /* We must also cap the refresh window at the invalidation threshold. If + * we process invalidations after the threshold, the continuous aggregates + * won't be refreshed when the threshold is moved forward in the + * future. The invalidation threshold should already be aligned on bucket + * boundary. */ + if (refresh_window.end > invalidation_threshold) + { + refresh_window.end = invalidation_threshold; + + /* Capping the end might have made the window 0, or negative, so + * nothing to refresh in that case */ + if (refresh_window.start >= refresh_window.end) + { + emit_up_to_date_notice(cagg); + return; + } + } + + /* Process invalidations in the hypertable invalidation log */ invalidation_process_hypertable_log(cagg, &refresh_window); /* Start a new transaction. Note that this invalidates previous memory @@ -370,7 +415,5 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, invalidation_store_free(invalidations); } else - elog(NOTICE, - "continuous aggregate \"%s\" is already up-to-date", - NameStr(cagg->data.user_view_name)); + emit_up_to_date_notice(cagg); } diff --git a/tsl/test/expected/continuous_aggs_invalidation.out b/tsl/test/expected/continuous_aggs_invalidation.out index 6d3fb135c..6b88005fa 100644 --- a/tsl/test/expected/continuous_aggs_invalidation.out +++ b/tsl/test/expected/continuous_aggs_invalidation.out @@ -26,25 +26,25 @@ SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10); (2,public,measurements,t) (1 row) -CREATE OR REPLACE FUNCTION cond_now() +CREATE OR REPLACE FUNCTION bigint_now() RETURNS bigint LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM conditions $$; -CREATE OR REPLACE FUNCTION measure_now() +CREATE OR REPLACE FUNCTION int_now() RETURNS int LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM measurements $$; -SELECT set_integer_now_func('conditions', 'cond_now'); +SELECT set_integer_now_func('conditions', 'bigint_now'); set_integer_now_func ---------------------- (1 row) -SELECT set_integer_now_func('measurements', 'measure_now'); +SELECT set_integer_now_func('measurements', 'int_now'); set_integer_now_func ---------------------- @@ -527,7 +527,7 @@ SELECT materialization_id AS cagg_id, cagg_id | start | end ---------+----------------------+---------------------- 3 | -9223372036854775808 | -9223372036854775801 - 3 | 9223372036854775807 | 9223372036854775807 + 3 | 110 | 9223372036854775807 4 | -9223372036854775808 | 19 4 | 15 | 42 4 | 20 | 25 @@ -851,11 +851,11 @@ SELECT materialization_id AS cagg_id, FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log WHERE materialization_id = :cond_1_id ORDER BY 1,2,3; - cagg_id | start | end ----------+---------------------+--------------------- - 6 | 0 | 0 - 6 | 2 | 2 - 6 | 9223372036854775807 | 9223372036854775807 + cagg_id | start | end +---------+-------+--------------------- + 6 | 0 | 0 + 6 | 2 | 2 + 6 | 110 | 9223372036854775807 (3 rows) -- Refresh the two remaining invalidations @@ -894,3 +894,210 @@ ORDER BY 1,2; 2 | 1 | 3 (3 rows) +---------------------------------------------- +-- Test that invalidation threshold is capped +---------------------------------------------- +CREATE table threshold_test (time int, value int); +SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4); +NOTICE: adding not-null constraint to column "time" + create_hypertable +----------------------------- + (7,public,threshold_test,t) +(1 row) + +SELECT set_integer_now_func('threshold_test', 'int_now'); + set_integer_now_func +---------------------- + +(1 row) + +CREATE MATERIALIZED VIEW thresh_2 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1; +SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'thresh_2' \gset +-- There's no invalidation threshold initially +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- +(0 rows) + +-- Test that threshold is initilized to min value when there's no data +-- and we specify an infinite end. Note that the min value may differ +-- depending on time type. +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +NOTICE: continuous aggregate "thresh_2" is already up-to-date +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+------------- + 7 | -2147483648 +(1 row) + +INSERT INTO threshold_test +SELECT v, v FROM generate_series(1, 10) v; +CALL refresh_continuous_aggregate('thresh_2', 0, 5); +-- Threshold should move to end of refresh window (note that window +-- expands to end of bucket). +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 6 +(1 row) + +-- Refresh where both the start and end of the window is above the +-- max data value +CALL refresh_continuous_aggregate('thresh_2', 14, NULL); +NOTICE: continuous aggregate "thresh_2" is already up-to-date +SELECT watermark AS thresh_hyper_id_watermark +FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id \gset +-- Refresh where we start from the current watermark to infinity +CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL); +NOTICE: continuous aggregate "thresh_2" is already up-to-date +-- Now refresh with max end of the window to test that the +-- invalidation threshold is capped at the last bucket of data +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 12 +(1 row) + +-- Should not have processed invalidations beyond the invalidation +-- threshold. +SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + WHERE materialization_id = :thresh_cagg_id + ORDER BY 1,2,3; + cagg_id | start | end +---------+----------------------+--------------------- + 8 | -9223372036854775808 | -1 + 8 | 12 | 9223372036854775807 +(2 rows) + +-- Check that things are properly materialized +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 10 +(6 rows) + +-- Delete the last data +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('threshold_test') +ORDER BY 1 DESC +LIMIT 1 \gset +DELETE FROM threshold_test +WHERE time > 6; +-- The last data in the hypertable is gone +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 6 +(4 rows) + +-- The aggregate still holds data +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 10 +(6 rows) + +-- Refresh the aggregate to bring it up-to-date +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +-- Data also gone from the aggregate +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 6 +(4 rows) + +-- The invalidation threshold remains the same +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 12 +(1 row) + +-- Insert new data beyond the invalidation threshold to move it +-- forward +INSERT INTO threshold_test +SELECT v, v FROM generate_series(7, 15) v; +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +-- Aggregate now updated to reflect newly aggregated data +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 11 + 12 | 13 + 14 | 15 +(8 rows) + +-- The invalidation threshold should have moved forward to the end of +-- the new data +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 16 +(1 row) + +-- The aggregate remains invalid beyond the invalidation threshold +SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + WHERE materialization_id = :thresh_cagg_id + ORDER BY 1,2,3; + cagg_id | start | end +---------+----------------------+--------------------- + 8 | -9223372036854775808 | -1 + 8 | 16 | 9223372036854775807 +(2 rows) + diff --git a/tsl/test/expected/continuous_aggs_refresh.out b/tsl/test/expected/continuous_aggs_refresh.out index 7a7c3b363..cb0df7492 100644 --- a/tsl/test/expected/continuous_aggs_refresh.out +++ b/tsl/test/expected/continuous_aggs_refresh.out @@ -83,7 +83,7 @@ ORDER BY day DESC, device; SET client_min_messages TO DEBUG1; CALL refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-03'); DEBUG: refreshing continuous aggregate "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sun May 03 17:00:00 2020 PDT ] -DEBUG: hypertable 1 existing watermark >= new invalidation threshold 1588723200000000 1588550400000000 +DEBUG: hypertable 1 existing watermark >= new invalidation threshold 1588723200000000 1588550400000000 DEBUG: invalidation refresh on "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sat May 02 17:00:00 2020 PDT ] RESET client_min_messages; -- Compare the aggregate to the equivalent query on the source table @@ -148,6 +148,7 @@ CALL refresh_continuous_aggregate('daily_temp', '2020-05-01'::date, '2020-05-03' NOTICE: continuous aggregate "daily_temp" is already up-to-date -- Unbounded window forward in time CALL refresh_continuous_aggregate('daily_temp', '2020-05-03', NULL); +NOTICE: continuous aggregate "daily_temp" is already up-to-date CALL refresh_continuous_aggregate('daily_temp', NULL, NULL); -- Unbounded window back in time CALL refresh_continuous_aggregate('daily_temp', NULL, '2020-05-01'); diff --git a/tsl/test/expected/continuous_aggs_union_view-11.out b/tsl/test/expected/continuous_aggs_union_view-11.out index 9aed030af..42aa83f44 100644 --- a/tsl/test/expected/continuous_aggs_union_view-11.out +++ b/tsl/test/expected/continuous_aggs_union_view-11.out @@ -766,11 +766,17 @@ FROM timestamptz_table GROUP BY 1; -- Refresh first without data CALL refresh_continuous_aggregate('int_agg', NULL, NULL); +NOTICE: continuous aggregate "int_agg" is already up-to-date CALL refresh_continuous_aggregate('smallint_agg', NULL, NULL); +NOTICE: continuous aggregate "smallint_agg" is already up-to-date CALL refresh_continuous_aggregate('bigint_agg', NULL, NULL); +NOTICE: continuous aggregate "bigint_agg" is already up-to-date CALL refresh_continuous_aggregate('date_agg', NULL, NULL); +NOTICE: continuous aggregate "date_agg" is already up-to-date CALL refresh_continuous_aggregate('timestamp_agg', NULL, NULL); +NOTICE: continuous aggregate "timestamp_agg" is already up-to-date CALL refresh_continuous_aggregate('timestamptz_agg', NULL, NULL); +NOTICE: continuous aggregate "timestamptz_agg" is already up-to-date -- Watermarks at min for the above caggs: SELECT user_view_name, _timescaledb_internal.cagg_watermark(mat_hypertable_id) FROM _timescaledb_catalog.continuous_agg diff --git a/tsl/test/expected/continuous_aggs_union_view-12.out b/tsl/test/expected/continuous_aggs_union_view-12.out index 42b0f9d8c..b7b70c251 100644 --- a/tsl/test/expected/continuous_aggs_union_view-12.out +++ b/tsl/test/expected/continuous_aggs_union_view-12.out @@ -769,11 +769,17 @@ FROM timestamptz_table GROUP BY 1; -- Refresh first without data CALL refresh_continuous_aggregate('int_agg', NULL, NULL); +NOTICE: continuous aggregate "int_agg" is already up-to-date CALL refresh_continuous_aggregate('smallint_agg', NULL, NULL); +NOTICE: continuous aggregate "smallint_agg" is already up-to-date CALL refresh_continuous_aggregate('bigint_agg', NULL, NULL); +NOTICE: continuous aggregate "bigint_agg" is already up-to-date CALL refresh_continuous_aggregate('date_agg', NULL, NULL); +NOTICE: continuous aggregate "date_agg" is already up-to-date CALL refresh_continuous_aggregate('timestamp_agg', NULL, NULL); +NOTICE: continuous aggregate "timestamp_agg" is already up-to-date CALL refresh_continuous_aggregate('timestamptz_agg', NULL, NULL); +NOTICE: continuous aggregate "timestamptz_agg" is already up-to-date -- Watermarks at min for the above caggs: SELECT user_view_name, _timescaledb_internal.cagg_watermark(mat_hypertable_id) FROM _timescaledb_catalog.continuous_agg diff --git a/tsl/test/sql/continuous_aggs_invalidation.sql b/tsl/test/sql/continuous_aggs_invalidation.sql index 07898336e..7b86bf112 100644 --- a/tsl/test/sql/continuous_aggs_invalidation.sql +++ b/tsl/test/sql/continuous_aggs_invalidation.sql @@ -15,22 +15,22 @@ SELECT create_hypertable('conditions', 'time', chunk_time_interval => 10); CREATE TABLE measurements (time int NOT NULL, device int, temp float); SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10); -CREATE OR REPLACE FUNCTION cond_now() +CREATE OR REPLACE FUNCTION bigint_now() RETURNS bigint LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM conditions $$; -CREATE OR REPLACE FUNCTION measure_now() +CREATE OR REPLACE FUNCTION int_now() RETURNS int LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM measurements $$; -SELECT set_integer_now_func('conditions', 'cond_now'); -SELECT set_integer_now_func('measurements', 'measure_now'); +SELECT set_integer_now_func('conditions', 'bigint_now'); +SELECT set_integer_now_func('measurements', 'int_now'); INSERT INTO conditions SELECT t, ceil(abs(timestamp_hash(to_timestamp(t)::timestamp))%4)::int, @@ -515,3 +515,135 @@ INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0); CALL refresh_continuous_aggregate('cond_1', 0, 3); SELECT * FROM cond_1 ORDER BY 1,2; + +---------------------------------------------- +-- Test that invalidation threshold is capped +---------------------------------------------- +CREATE table threshold_test (time int, value int); +SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4); +SELECT set_integer_now_func('threshold_test', 'int_now'); + +CREATE MATERIALIZED VIEW thresh_2 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1; + +SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'thresh_2' \gset + +-- There's no invalidation threshold initially +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- Test that threshold is initilized to min value when there's no data +-- and we specify an infinite end. Note that the min value may differ +-- depending on time type. +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); + +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +INSERT INTO threshold_test +SELECT v, v FROM generate_series(1, 10) v; + +CALL refresh_continuous_aggregate('thresh_2', 0, 5); + +-- Threshold should move to end of refresh window (note that window +-- expands to end of bucket). +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- Refresh where both the start and end of the window is above the +-- max data value +CALL refresh_continuous_aggregate('thresh_2', 14, NULL); + +SELECT watermark AS thresh_hyper_id_watermark +FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id \gset + +-- Refresh where we start from the current watermark to infinity +CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL); + +-- Now refresh with max end of the window to test that the +-- invalidation threshold is capped at the last bucket of data +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); + +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- Should not have processed invalidations beyond the invalidation +-- threshold. +SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + WHERE materialization_id = :thresh_cagg_id + ORDER BY 1,2,3; + +-- Check that things are properly materialized +SELECT * FROM thresh_2 +ORDER BY 1; + +-- Delete the last data +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('threshold_test') +ORDER BY 1 DESC +LIMIT 1 \gset + +DELETE FROM threshold_test +WHERE time > 6; + +-- The last data in the hypertable is gone +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1 +ORDER BY 1; + +-- The aggregate still holds data +SELECT * FROM thresh_2 +ORDER BY 1; + +-- Refresh the aggregate to bring it up-to-date +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); + +-- Data also gone from the aggregate +SELECT * FROM thresh_2 +ORDER BY 1; + +-- The invalidation threshold remains the same +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- Insert new data beyond the invalidation threshold to move it +-- forward +INSERT INTO threshold_test +SELECT v, v FROM generate_series(7, 15) v; + +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); + +-- Aggregate now updated to reflect newly aggregated data +SELECT * FROM thresh_2 +ORDER BY 1; + +-- The invalidation threshold should have moved forward to the end of +-- the new data +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + +-- The aggregate remains invalid beyond the invalidation threshold +SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + WHERE materialization_id = :thresh_cagg_id + ORDER BY 1,2,3; diff --git a/tsl/test/src/test_continuous_aggs.c b/tsl/test/src/test_continuous_aggs.c index 816b9265e..7a8626acf 100644 --- a/tsl/test/src/test_continuous_aggs.c +++ b/tsl/test/src/test_continuous_aggs.c @@ -55,7 +55,7 @@ ts_run_continuous_agg_materialization(PG_FUNCTION_ARGS) if (partial_view.name == NULL) elog(ERROR, "view cannot be NULL"); - invalidation_threshold = continuous_agg_invalidation_threshold_get(hypertable_id); + invalidation_threshold = invalidation_threshold_get(hypertable_id); completed_threshold = ts_continuous_agg_get_completed_threshold(materialization_id); if (lmv > completed_threshold)