Allow use of infinite refresh windows

When clearing invalidations during a refresh of a continuous
aggregate, one should use the bucketed refresh window rather than the
"raw" user-defined window. This ensures that the window is capped at
the allowable timestamp range (e.g., when using an infinite window)
and that the window used to clear invalidations actually matches what
gets materialized.

Note, however, that, since the end of the refresh window is not
inclusive, the last possible time value is not included in the
refresh.  To include that value, there needs exist a time-type
agnostic definition of "infinity" and both the invalidation code and
the materialization code must be able to handle such windows.
This commit is contained in:
Erik Nordström 2020-08-10 13:30:20 +02:00 committed by Erik Nordström
parent e1c94484cf
commit c3ac8fa193

View File

@ -168,10 +168,12 @@ compute_bucketed_refresh_window(const InternalTimeRange *refresh_window, int64 b
result.end = largest_bucketed_window.end;
else
{
/* The end of the window is non-inclusive so subtract one before bucketing */
int64 exclusive_end = int64_saturating_sub(result.end, 1);
int64 bucketed_end = ts_time_bucket_by_type(bucket_width, exclusive_end, result.type);
/* We get the time value for the start of the bucket, so need to add
* bucket_width to get the end of it */
result.end = ts_time_bucket_by_type(bucket_width, result.end, result.type);
result.end = result.end + bucket_width;
result.end = bucketed_end + bucket_width;
}
return result;
@ -183,16 +185,15 @@ compute_bucketed_refresh_window(const InternalTimeRange *refresh_window, int64 b
* The state holds information for executing a refresh of a continuous aggregate.
*/
static void
continuous_agg_refresh_init(CaggRefreshState *refresh, ContinuousAgg *cagg,
continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window)
{
MemSet(refresh, 0, sizeof(*refresh));
refresh->cagg = *cagg;
refresh->cagg_ht = cagg_get_hypertable_or_fail(cagg->data.mat_hypertable_id);
refresh->refresh_window =
compute_bucketed_refresh_window(refresh_window, cagg->data.bucket_width);
refresh->partial_view.schema = &cagg->data.partial_view_schema;
refresh->partial_view.name = &cagg->data.partial_view_name;
refresh->refresh_window = *refresh_window;
refresh->partial_view.schema = &refresh->cagg.data.partial_view_schema;
refresh->partial_view.name = &refresh->cagg.data.partial_view_name;
/* Lock the continuous aggregate's materialized hypertable to protect
* against concurrent refreshes. Only reads will be allowed. This is a
@ -233,7 +234,6 @@ continuous_agg_refresh_execute(const CaggRefreshState *refresh)
};
Dimension *time_dim = hyperspace_get_open_dimension(refresh->cagg_ht->space, 0);
Assert(time_dim != NULL);
continuous_agg_update_materialization(refresh->partial_view,
cagg_hypertable_name,
&time_dim->fd.column_name,
@ -410,6 +410,14 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
CommitTransactionCommand();
StartTransactionCommand();
cagg = ts_continuous_agg_find_by_relid(cagg_relid);
refresh_window = compute_bucketed_refresh_window(&refresh_window, cagg->data.bucket_width);
elog(DEBUG1,
"computed refresh window at [ " INT64_FORMAT ", " INT64_FORMAT "]",
refresh_window.start,
refresh_window.end);
invalidation_process_cagg_log(cagg, &refresh_window);
continuous_agg_refresh_with_window(cagg, &refresh_window);