Cap invalidation threshold at last data bucket

When refreshing with an "infinite" refresh window going forward in
time, the invalidation threshold is also moved forward to the end of
the valid time range. This effectively renders the invalidation
threshold useless, leading to unnecessary write amplification.

To handle infinite refreshes better, this change caps the refresh
window at the end of the last bucket of data in the underlying
hypertable, as to not move the invalidation threshold further than
necessary. For instance, if the max time value in the hypertable is
11, a refresh command such as:

```
CALL refresh_continuous_aggregate(NULL, NULL);
```
would be turned into
```
CALL refresh_continuous_aggregate(NULL, 20);
```

assuming that a bucket starts at 10 and ends at 20 (exclusive). Thus
the invalidation threshold would at most move to 20, allowing the
threshold to still do its work once time again moves forward and
beyond it.

Note that one must never process invalidations beyond the invalidation
threshold without also moving it, as that would clear that area from
invalidations and thus prohibit refreshing that region once the
invalidation threshold is moved forward. Therefore, if we do not move
the threshold further than a certain point, we cannot refresh beyond
it either. An alternative, and perhaps safer, approach would be to
always invalidate the region over which the invalidation threshold is
moved (i.e., new_threshold - old_threshold). However, that is left for
a future change.

It would be possible to also cap non-infinite refreshes, e.g.,
refreshes that end at a higher time value than the max time value in
the hypertable. However, when an explicit end is specified, it might
be on purpose so optimizing this case is also left for the future.

Closes #2333
This commit is contained in:
Erik Nordström 2020-09-07 17:07:43 +02:00 committed by Erik Nordström
parent 4f32439362
commit f49492b83d
15 changed files with 557 additions and 73 deletions

View File

@ -15,7 +15,6 @@
#include <catalog/namespace.h>
#include <catalog/pg_trigger.h>
#include <commands/trigger.h>
#include <executor/spi.h>
#include <fmgr.h>
#include <storage/lmgr.h>
#include <utils/acl.h>
@ -1020,7 +1019,6 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
{
const int32 hyper_id = PG_GETARG_INT32(0);
ContinuousAgg *cagg;
StringInfo command;
Hypertable *ht;
Dimension *dim;
Datum maxdat;
@ -1028,7 +1026,6 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
int64 watermark;
Oid timetype;
AclResult aclresult;
int res;
if (PG_ARGISNULL(0))
ereport(ERROR,
@ -1051,34 +1048,7 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
Assert(NULL != ht);
dim = hyperspace_get_open_dimension(ht->space, 0);
timetype = ts_dimension_get_partition_type(dim);
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");
/* Query for the last bucket in the materialized hypertable */
command = makeStringInfo();
appendStringInfo(command,
"SELECT max(%s) FROM %s.%s",
quote_identifier(NameStr(dim->fd.column_name)),
quote_identifier(NameStr(ht->fd.schema_name)),
quote_identifier(NameStr(ht->fd.table_name)));
res = SPI_execute_with_args(command->data,
0 /*=nargs*/,
NULL,
NULL,
NULL /*=Nulls*/,
true /*=read_only*/,
0 /*count*/);
if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not find the maximum time value for hypertable"),
errdetail("SPI error when calculating continuous aggregate watermark: %d.",
res))));
Assert(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == timetype);
maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &max_isnull);
maxdat = ts_hypertable_get_open_dim_max_value(ht, 0, &max_isnull);
if (!max_isnull)
{
@ -1092,8 +1062,5 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
watermark = ts_time_get_min(timetype);
}
res = SPI_finish();
Assert(res == SPI_OK_FINISH);
PG_RETURN_INT64(watermark);
}

View File

@ -19,6 +19,7 @@
#include <commands/tablecmds.h>
#include <commands/tablespace.h>
#include <commands/trigger.h>
#include <executor/spi.h>
#include <funcapi.h>
#include <miscadmin.h>
#include <nodes/makefuncs.h>
@ -2681,3 +2682,51 @@ ts_hypertable_func_call_on_data_nodes(Hypertable *ht, FunctionCallInfo fcinfo)
if (hypertable_is_distributed(ht))
ts_cm_functions->func_call_on_data_nodes(fcinfo, ts_hypertable_get_data_node_name_list(ht));
}
/*
* Get the max value of an open dimension.
*/
Datum
ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, bool *isnull)
{
StringInfo command;
Dimension *dim;
int res;
bool max_isnull;
Datum maxdat;
dim = hyperspace_get_open_dimension(ht->space, dimension_index);
if (NULL == dim)
elog(ERROR, "invalid open dimension index %d", dimension_index);
/* Query for the last bucket in the materialized hypertable */
command = makeStringInfo();
appendStringInfo(command,
"SELECT max(%s) FROM %s.%s",
quote_identifier(NameStr(dim->fd.column_name)),
quote_identifier(NameStr(ht->fd.schema_name)),
quote_identifier(NameStr(ht->fd.table_name)));
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");
res = SPI_execute(command->data, true /* read_only */, 0 /*count*/);
if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("could not find the maximum time value for hypertable \"%s\"",
get_rel_name(ht->main_table_relid)))));
Assert(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == ts_dimension_get_partition_type(dim));
maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &max_isnull);
if (isnull)
*isnull = max_isnull;
res = SPI_finish();
Assert(res == SPI_OK_FINISH);
return maxdat;
}

View File

@ -156,6 +156,8 @@ extern TSDLLEXPORT void ts_hypertable_func_call_on_data_nodes(Hypertable *ht,
FunctionCallInfo fcinfo);
extern TSDLLEXPORT int16 ts_validate_replication_factor(int32 replication_factor, bool is_null,
bool is_dist_call);
extern TSDLLEXPORT Datum ts_hypertable_get_open_dim_max_value(const Hypertable *ht,
int dimension_index, bool *isnull);
#define hypertable_scan(schema, table, tuple_found, data, lockmode, tuplock) \
ts_hypertable_scan_with_memory_context(schema, \

View File

@ -436,6 +436,15 @@ ts_time_get_noend(Oid timetype)
return ts_time_get_noend(coerce_to_time_type(timetype));
}
int64
ts_time_get_noend_or_max(Oid timetype)
{
if (TS_TIME_IS_INTEGER_TIME(timetype))
return ts_time_get_max(timetype);
return ts_time_get_noend(timetype);
}
/*
* Add an interval to a time value in a saturating way.
*

View File

@ -54,6 +54,10 @@
(TS_TIME_IS_INTEGER_TIME(type) || TS_TIME_DATUM_IS_NOBEGIN(timeval, type) || \
TS_TIME_DATUM_IS_NOEND(timeval, type))
#define TS_TIME_IS_MIN(timeval, type) (timeval == ts_time_get_min(type))
#define TS_TIME_IS_MAX(timeval, type) (timeval == ts_time_get_max(type))
#define TS_TIME_IS_END(timeval, type) \
(!TS_TIME_IS_INTEGER_TIME(type) && timeval == ts_time_get_end(type))
#define TS_TIME_IS_NOBEGIN(timeval, type) \
(!TS_TIME_IS_INTEGER_TIME(type) && timeval == ts_time_get_nobegin(type))
#define TS_TIME_IS_NOEND(timeval, type) \
@ -76,6 +80,7 @@ extern TSDLLEXPORT int64 ts_time_get_end(Oid timetype);
extern TSDLLEXPORT int64 ts_time_get_end_or_max(Oid timetype);
extern TSDLLEXPORT int64 ts_time_get_nobegin(Oid timetype);
extern TSDLLEXPORT int64 ts_time_get_noend(Oid timetype);
extern TSDLLEXPORT int64 ts_time_get_noend_or_max(Oid timetype);
extern TSDLLEXPORT int64 ts_time_saturating_add(int64 timeval, int64 interval, Oid timetype);
extern TSDLLEXPORT int64 ts_time_saturating_sub(int64 timeval, int64 interval, Oid timetype);

View File

@ -10,6 +10,7 @@
#include <nodes/memnodes.h>
#include <storage/lockdefs.h>
#include <storage/lmgr.h>
#include <utils/builtins.h>
#include <utils/memutils.h>
#include <utils/snapmgr.h>
@ -17,6 +18,8 @@
#include <scanner.h>
#include <scan_iterator.h>
#include <compat.h>
#include <time_utils.h>
#include <time_bucket.h>
#include "continuous_agg.h"
#include "continuous_aggs/materialize.h"
@ -84,11 +87,12 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data)
else
{
elog(DEBUG1,
"hypertable %d existing watermark >= new invalidation threshold " INT64_FORMAT
"hypertable %d existing watermark >= new invalidation threshold " INT64_FORMAT
" " INT64_FORMAT,
form->hypertable_id,
form->watermark,
invthresh->threshold);
invthresh->threshold = form->watermark;
}
if (should_free)
@ -97,12 +101,17 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data)
return SCAN_DONE;
}
/* every cont. agg calculates its invalidation_threshold point based on its
*refresh_lag etc. We update the raw hypertable's invalidation threshold
* only if this new value is greater than the existsing one.
/*
* Set a new invalidation threshold.
*
* The threshold is only updated if the new threshold is greater than the old
* one.
*
* On success, the new threshold is returned, otherwise the existing threshold
* is returned instead.
*/
bool
continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold)
int64
invalidation_threshold_set_or_get(int32 raw_hypertable_id, int64 invalidation_threshold)
{
bool threshold_found;
InvalidationThresholdData data = {
@ -156,10 +165,9 @@ continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, int64 invalid
ts_catalog_insert_values(rel, desc, values, nulls);
table_close(rel, NoLock);
data.was_updated = true;
}
return data.was_updated;
return data.threshold;
}
static ScanTupleResult
@ -177,7 +185,7 @@ invalidation_threshold_tuple_found(TupleInfo *ti, void *data)
}
int64
continuous_agg_invalidation_threshold_get(int32 hypertable_id)
invalidation_threshold_get(int32 hypertable_id)
{
int64 threshold = 0;
ScanKeyData scankey[1];
@ -220,7 +228,7 @@ invalidation_threshold_htid_found(TupleInfo *tinfo, void *data)
* block till lock is acquired.
*/
void
continuous_agg_invalidation_threshold_lock(int32 raw_hypertable_id)
invalidation_threshold_lock(int32 raw_hypertable_id)
{
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
@ -259,3 +267,47 @@ continuous_agg_invalidation_threshold_lock(int32 raw_hypertable_id)
errmsg("found multiple invalidation rows for hypertable %d", raw_hypertable_id)));
}
}
/*
* Compute a new invalidation threshold.
*
* The new invalidation threshold returned is the end of the given refresh
* window, unless it ends at "infinity" in which case the threshold is capped
* at the end of the last bucket materialized.
*/
int64
invalidation_threshold_compute(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window)
{
bool max_refresh = false;
Hypertable *ht = ts_hypertable_get_by_id(cagg->data.raw_hypertable_id);
if (TS_TIME_IS_INTEGER_TIME(refresh_window->type))
max_refresh = TS_TIME_IS_MAX(refresh_window->end, refresh_window->type);
else
max_refresh = TS_TIME_IS_END(refresh_window->end, refresh_window->type) ||
TS_TIME_IS_NOEND(refresh_window->end, refresh_window->type);
if (max_refresh)
{
bool isnull;
Datum maxdat = ts_hypertable_get_open_dim_max_value(ht, 0, &isnull);
if (isnull)
{
/* No data in hypertable, so return min (start of time) */
return ts_time_get_min(refresh_window->type);
}
else
{
int64 maxval = ts_time_value_to_internal(maxdat, refresh_window->type);
int64 bucket_start =
ts_time_bucket_by_type(cagg->data.bucket_width, maxval, refresh_window->type);
/* Add one bucket to get to the end of the last bucket */
return ts_time_saturating_add(bucket_start,
cagg->data.bucket_width,
refresh_window->type);
}
}
return refresh_window->end;
}

View File

@ -8,9 +8,14 @@
#include <postgres.h>
extern int64 continuous_agg_invalidation_threshold_get(int32 hypertable_id);
extern bool continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id,
int64 invalidation_threshold);
extern void continuous_agg_invalidation_threshold_lock(int32 raw_hypertable_id);
typedef struct InternalTimeRange InternalTimeRange;
typedef struct ContinuousAgg ContinuousAgg;
extern int64 invalidation_threshold_get(int32 hypertable_id);
extern int64 invalidation_threshold_set_or_get(int32 raw_hypertable_id,
int64 invalidation_threshold);
extern void invalidation_threshold_lock(int32 raw_hypertable_id);
extern int64 invalidation_threshold_compute(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window);
#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_H */

View File

@ -222,7 +222,7 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op
* (github issue 1940)
* Prevent this by serializing on the raw hypertable row
*/
continuous_agg_invalidation_threshold_lock(cagg_data.raw_hypertable_id);
invalidation_threshold_lock(cagg_data.raw_hypertable_id);
drain_invalidation_log(cagg_data.raw_hypertable_id, &invalidations);
materialization_invalidation_log_table_relation =
table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG),
@ -292,8 +292,8 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op
{
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
AccessExclusiveLock);
continuous_agg_invalidation_threshold_set(cagg_data.raw_hypertable_id,
materialization_invalidation_threshold);
invalidation_threshold_set_or_get(cagg_data.raw_hypertable_id,
materialization_invalidation_threshold);
}
table_close(materialization_invalidation_log_table_relation, NoLock);

View File

@ -253,9 +253,8 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
Dimension *time_dim;
InternalTimeRange refresh_window = {
.type = InvalidOid,
.start = PG_INT64_MIN,
.end = PG_INT64_MAX,
};
if (!OidIsValid(cagg_relid))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid continuous aggregate")));
@ -275,6 +274,7 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
(errmsg("relation \"%s\" is not a continuous aggregate", relname))));
}
cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
Assert(cagg_ht != NULL);
time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0);
@ -285,15 +285,28 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
refresh_window.start = ts_time_value_from_arg(PG_GETARG_DATUM(1),
get_fn_expr_argtype(fcinfo->flinfo, 1),
refresh_window.type);
else
refresh_window.start = ts_time_get_min(refresh_window.type);
if (!PG_ARGISNULL(2))
refresh_window.end = ts_time_value_from_arg(PG_GETARG_DATUM(2),
get_fn_expr_argtype(fcinfo->flinfo, 2),
refresh_window.type);
else
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);
continuous_agg_refresh_internal(cagg, &refresh_window);
PG_RETURN_VOID();
}
static void
emit_up_to_date_notice(const ContinuousAgg *cagg)
{
elog(NOTICE,
"continuous aggregate \"%s\" is already up-to-date",
NameStr(cagg->data.user_view_name));
}
void
continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window_arg)
@ -303,6 +316,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
int32 mat_id = cagg->data.mat_hypertable_id;
InternalTimeRange refresh_window;
InvalidationStore *invalidations = NULL;
int64 computed_invalidation_threshold;
int64 invalidation_threshold;
PreventCommandIfReadOnly(REFRESH_FUNCTION_NAME);
@ -342,7 +357,37 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
*/
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
AccessExclusiveLock);
continuous_agg_invalidation_threshold_set(cagg->data.raw_hypertable_id, refresh_window.end);
/* Compute new invalidation threshold. Note that this computation caps the
* threshold at the end of the last bucket that holds data in the
* underlying hypertable. */
computed_invalidation_threshold = invalidation_threshold_compute(cagg, &refresh_window);
/* Set the new invalidation threshold. Note that this only updates the
* threshold if the new value is greater than the old one. Otherwise, the
* existing threshold is returned. */
invalidation_threshold = invalidation_threshold_set_or_get(cagg->data.raw_hypertable_id,
computed_invalidation_threshold);
/* We must also cap the refresh window at the invalidation threshold. If
* we process invalidations after the threshold, the continuous aggregates
* won't be refreshed when the threshold is moved forward in the
* future. The invalidation threshold should already be aligned on bucket
* boundary. */
if (refresh_window.end > invalidation_threshold)
{
refresh_window.end = invalidation_threshold;
/* Capping the end might have made the window 0, or negative, so
* nothing to refresh in that case */
if (refresh_window.start >= refresh_window.end)
{
emit_up_to_date_notice(cagg);
return;
}
}
/* Process invalidations in the hypertable invalidation log */
invalidation_process_hypertable_log(cagg, &refresh_window);
/* Start a new transaction. Note that this invalidates previous memory
@ -370,7 +415,5 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
invalidation_store_free(invalidations);
}
else
elog(NOTICE,
"continuous aggregate \"%s\" is already up-to-date",
NameStr(cagg->data.user_view_name));
emit_up_to_date_notice(cagg);
}

View File

@ -26,25 +26,25 @@ SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10);
(2,public,measurements,t)
(1 row)
CREATE OR REPLACE FUNCTION cond_now()
CREATE OR REPLACE FUNCTION bigint_now()
RETURNS bigint LANGUAGE SQL STABLE AS
$$
SELECT coalesce(max(time), 0)
FROM conditions
$$;
CREATE OR REPLACE FUNCTION measure_now()
CREATE OR REPLACE FUNCTION int_now()
RETURNS int LANGUAGE SQL STABLE AS
$$
SELECT coalesce(max(time), 0)
FROM measurements
$$;
SELECT set_integer_now_func('conditions', 'cond_now');
SELECT set_integer_now_func('conditions', 'bigint_now');
set_integer_now_func
----------------------
(1 row)
SELECT set_integer_now_func('measurements', 'measure_now');
SELECT set_integer_now_func('measurements', 'int_now');
set_integer_now_func
----------------------
@ -527,7 +527,7 @@ SELECT materialization_id AS cagg_id,
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 9223372036854775807 | 9223372036854775807
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 15 | 42
4 | 20 | 25
@ -851,11 +851,11 @@ SELECT materialization_id AS cagg_id,
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE materialization_id = :cond_1_id
ORDER BY 1,2,3;
cagg_id | start | end
---------+---------------------+---------------------
6 | 0 | 0
6 | 2 | 2
6 | 9223372036854775807 | 9223372036854775807
cagg_id | start | end
---------+-------+---------------------
6 | 0 | 0
6 | 2 | 2
6 | 110 | 9223372036854775807
(3 rows)
-- Refresh the two remaining invalidations
@ -894,3 +894,210 @@ ORDER BY 1,2;
2 | 1 | 3
(3 rows)
----------------------------------------------
-- Test that invalidation threshold is capped
----------------------------------------------
CREATE table threshold_test (time int, value int);
SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4);
NOTICE: adding not-null constraint to column "time"
create_hypertable
-----------------------------
(7,public,threshold_test,t)
(1 row)
SELECT set_integer_now_func('threshold_test', 'int_now');
set_integer_now_func
----------------------
(1 row)
CREATE MATERIALIZED VIEW thresh_2
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket(2, time) AS bucket, max(value) AS max
FROM threshold_test
GROUP BY 1;
SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'thresh_2' \gset
-- There's no invalidation threshold initially
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
(0 rows)
-- Test that threshold is initilized to min value when there's no data
-- and we specify an infinite end. Note that the min value may differ
-- depending on time type.
CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
NOTICE: continuous aggregate "thresh_2" is already up-to-date
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
hypertable_id | watermark
---------------+-------------
7 | -2147483648
(1 row)
INSERT INTO threshold_test
SELECT v, v FROM generate_series(1, 10) v;
CALL refresh_continuous_aggregate('thresh_2', 0, 5);
-- Threshold should move to end of refresh window (note that window
-- expands to end of bucket).
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
7 | 6
(1 row)
-- Refresh where both the start and end of the window is above the
-- max data value
CALL refresh_continuous_aggregate('thresh_2', 14, NULL);
NOTICE: continuous aggregate "thresh_2" is already up-to-date
SELECT watermark AS thresh_hyper_id_watermark
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id \gset
-- Refresh where we start from the current watermark to infinity
CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL);
NOTICE: continuous aggregate "thresh_2" is already up-to-date
-- Now refresh with max end of the window to test that the
-- invalidation threshold is capped at the last bucket of data
CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
7 | 12
(1 row)
-- Should not have processed invalidations beyond the invalidation
-- threshold.
SELECT materialization_id AS cagg_id,
lowest_modified_value AS start,
greatest_modified_value AS end
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE materialization_id = :thresh_cagg_id
ORDER BY 1,2,3;
cagg_id | start | end
---------+----------------------+---------------------
8 | -9223372036854775808 | -1
8 | 12 | 9223372036854775807
(2 rows)
-- Check that things are properly materialized
SELECT * FROM thresh_2
ORDER BY 1;
bucket | max
--------+-----
0 | 1
2 | 3
4 | 5
6 | 7
8 | 9
10 | 10
(6 rows)
-- Delete the last data
SELECT show_chunks AS chunk_to_drop
FROM show_chunks('threshold_test')
ORDER BY 1 DESC
LIMIT 1 \gset
DELETE FROM threshold_test
WHERE time > 6;
-- The last data in the hypertable is gone
SELECT time_bucket(2, time) AS bucket, max(value) AS max
FROM threshold_test
GROUP BY 1
ORDER BY 1;
bucket | max
--------+-----
0 | 1
2 | 3
4 | 5
6 | 6
(4 rows)
-- The aggregate still holds data
SELECT * FROM thresh_2
ORDER BY 1;
bucket | max
--------+-----
0 | 1
2 | 3
4 | 5
6 | 7
8 | 9
10 | 10
(6 rows)
-- Refresh the aggregate to bring it up-to-date
CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
-- Data also gone from the aggregate
SELECT * FROM thresh_2
ORDER BY 1;
bucket | max
--------+-----
0 | 1
2 | 3
4 | 5
6 | 6
(4 rows)
-- The invalidation threshold remains the same
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
7 | 12
(1 row)
-- Insert new data beyond the invalidation threshold to move it
-- forward
INSERT INTO threshold_test
SELECT v, v FROM generate_series(7, 15) v;
CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
-- Aggregate now updated to reflect newly aggregated data
SELECT * FROM thresh_2
ORDER BY 1;
bucket | max
--------+-----
0 | 1
2 | 3
4 | 5
6 | 7
8 | 9
10 | 11
12 | 13
14 | 15
(8 rows)
-- The invalidation threshold should have moved forward to the end of
-- the new data
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
7 | 16
(1 row)
-- The aggregate remains invalid beyond the invalidation threshold
SELECT materialization_id AS cagg_id,
lowest_modified_value AS start,
greatest_modified_value AS end
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE materialization_id = :thresh_cagg_id
ORDER BY 1,2,3;
cagg_id | start | end
---------+----------------------+---------------------
8 | -9223372036854775808 | -1
8 | 16 | 9223372036854775807
(2 rows)

View File

@ -83,7 +83,7 @@ ORDER BY day DESC, device;
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-03');
DEBUG: refreshing continuous aggregate "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sun May 03 17:00:00 2020 PDT ]
DEBUG: hypertable 1 existing watermark >= new invalidation threshold 1588723200000000 1588550400000000
DEBUG: hypertable 1 existing watermark >= new invalidation threshold 1588723200000000 1588550400000000
DEBUG: invalidation refresh on "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sat May 02 17:00:00 2020 PDT ]
RESET client_min_messages;
-- Compare the aggregate to the equivalent query on the source table
@ -148,6 +148,7 @@ CALL refresh_continuous_aggregate('daily_temp', '2020-05-01'::date, '2020-05-03'
NOTICE: continuous aggregate "daily_temp" is already up-to-date
-- Unbounded window forward in time
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03', NULL);
NOTICE: continuous aggregate "daily_temp" is already up-to-date
CALL refresh_continuous_aggregate('daily_temp', NULL, NULL);
-- Unbounded window back in time
CALL refresh_continuous_aggregate('daily_temp', NULL, '2020-05-01');

View File

@ -766,11 +766,17 @@ FROM timestamptz_table
GROUP BY 1;
-- Refresh first without data
CALL refresh_continuous_aggregate('int_agg', NULL, NULL);
NOTICE: continuous aggregate "int_agg" is already up-to-date
CALL refresh_continuous_aggregate('smallint_agg', NULL, NULL);
NOTICE: continuous aggregate "smallint_agg" is already up-to-date
CALL refresh_continuous_aggregate('bigint_agg', NULL, NULL);
NOTICE: continuous aggregate "bigint_agg" is already up-to-date
CALL refresh_continuous_aggregate('date_agg', NULL, NULL);
NOTICE: continuous aggregate "date_agg" is already up-to-date
CALL refresh_continuous_aggregate('timestamp_agg', NULL, NULL);
NOTICE: continuous aggregate "timestamp_agg" is already up-to-date
CALL refresh_continuous_aggregate('timestamptz_agg', NULL, NULL);
NOTICE: continuous aggregate "timestamptz_agg" is already up-to-date
-- Watermarks at min for the above caggs:
SELECT user_view_name, _timescaledb_internal.cagg_watermark(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg

View File

@ -769,11 +769,17 @@ FROM timestamptz_table
GROUP BY 1;
-- Refresh first without data
CALL refresh_continuous_aggregate('int_agg', NULL, NULL);
NOTICE: continuous aggregate "int_agg" is already up-to-date
CALL refresh_continuous_aggregate('smallint_agg', NULL, NULL);
NOTICE: continuous aggregate "smallint_agg" is already up-to-date
CALL refresh_continuous_aggregate('bigint_agg', NULL, NULL);
NOTICE: continuous aggregate "bigint_agg" is already up-to-date
CALL refresh_continuous_aggregate('date_agg', NULL, NULL);
NOTICE: continuous aggregate "date_agg" is already up-to-date
CALL refresh_continuous_aggregate('timestamp_agg', NULL, NULL);
NOTICE: continuous aggregate "timestamp_agg" is already up-to-date
CALL refresh_continuous_aggregate('timestamptz_agg', NULL, NULL);
NOTICE: continuous aggregate "timestamptz_agg" is already up-to-date
-- Watermarks at min for the above caggs:
SELECT user_view_name, _timescaledb_internal.cagg_watermark(mat_hypertable_id)
FROM _timescaledb_catalog.continuous_agg

View File

@ -15,22 +15,22 @@ SELECT create_hypertable('conditions', 'time', chunk_time_interval => 10);
CREATE TABLE measurements (time int NOT NULL, device int, temp float);
SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10);
CREATE OR REPLACE FUNCTION cond_now()
CREATE OR REPLACE FUNCTION bigint_now()
RETURNS bigint LANGUAGE SQL STABLE AS
$$
SELECT coalesce(max(time), 0)
FROM conditions
$$;
CREATE OR REPLACE FUNCTION measure_now()
CREATE OR REPLACE FUNCTION int_now()
RETURNS int LANGUAGE SQL STABLE AS
$$
SELECT coalesce(max(time), 0)
FROM measurements
$$;
SELECT set_integer_now_func('conditions', 'cond_now');
SELECT set_integer_now_func('measurements', 'measure_now');
SELECT set_integer_now_func('conditions', 'bigint_now');
SELECT set_integer_now_func('measurements', 'int_now');
INSERT INTO conditions
SELECT t, ceil(abs(timestamp_hash(to_timestamp(t)::timestamp))%4)::int,
@ -515,3 +515,135 @@ INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0);
CALL refresh_continuous_aggregate('cond_1', 0, 3);
SELECT * FROM cond_1
ORDER BY 1,2;
----------------------------------------------
-- Test that invalidation threshold is capped
----------------------------------------------
CREATE table threshold_test (time int, value int);
SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4);
SELECT set_integer_now_func('threshold_test', 'int_now');
CREATE MATERIALIZED VIEW thresh_2
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket(2, time) AS bucket, max(value) AS max
FROM threshold_test
GROUP BY 1;
SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id
FROM _timescaledb_catalog.continuous_agg
WHERE user_view_name = 'thresh_2' \gset
-- There's no invalidation threshold initially
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
-- Test that threshold is initilized to min value when there's no data
-- and we specify an infinite end. Note that the min value may differ
-- depending on time type.
CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
INSERT INTO threshold_test
SELECT v, v FROM generate_series(1, 10) v;
CALL refresh_continuous_aggregate('thresh_2', 0, 5);
-- Threshold should move to end of refresh window (note that window
-- expands to end of bucket).
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
-- Refresh where both the start and end of the window is above the
-- max data value
CALL refresh_continuous_aggregate('thresh_2', 14, NULL);
SELECT watermark AS thresh_hyper_id_watermark
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id \gset
-- Refresh where we start from the current watermark to infinity
CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL);
-- Now refresh with max end of the window to test that the
-- invalidation threshold is capped at the last bucket of data
CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
-- Should not have processed invalidations beyond the invalidation
-- threshold.
SELECT materialization_id AS cagg_id,
lowest_modified_value AS start,
greatest_modified_value AS end
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE materialization_id = :thresh_cagg_id
ORDER BY 1,2,3;
-- Check that things are properly materialized
SELECT * FROM thresh_2
ORDER BY 1;
-- Delete the last data
SELECT show_chunks AS chunk_to_drop
FROM show_chunks('threshold_test')
ORDER BY 1 DESC
LIMIT 1 \gset
DELETE FROM threshold_test
WHERE time > 6;
-- The last data in the hypertable is gone
SELECT time_bucket(2, time) AS bucket, max(value) AS max
FROM threshold_test
GROUP BY 1
ORDER BY 1;
-- The aggregate still holds data
SELECT * FROM thresh_2
ORDER BY 1;
-- Refresh the aggregate to bring it up-to-date
CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
-- Data also gone from the aggregate
SELECT * FROM thresh_2
ORDER BY 1;
-- The invalidation threshold remains the same
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
-- Insert new data beyond the invalidation threshold to move it
-- forward
INSERT INTO threshold_test
SELECT v, v FROM generate_series(7, 15) v;
CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
-- Aggregate now updated to reflect newly aggregated data
SELECT * FROM thresh_2
ORDER BY 1;
-- The invalidation threshold should have moved forward to the end of
-- the new data
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
-- The aggregate remains invalid beyond the invalidation threshold
SELECT materialization_id AS cagg_id,
lowest_modified_value AS start,
greatest_modified_value AS end
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE materialization_id = :thresh_cagg_id
ORDER BY 1,2,3;

View File

@ -55,7 +55,7 @@ ts_run_continuous_agg_materialization(PG_FUNCTION_ARGS)
if (partial_view.name == NULL)
elog(ERROR, "view cannot be NULL");
invalidation_threshold = continuous_agg_invalidation_threshold_get(hypertable_id);
invalidation_threshold = invalidation_threshold_get(hypertable_id);
completed_threshold = ts_continuous_agg_get_completed_threshold(materialization_id);
if (lmv > completed_threshold)