Don't access bucket_width directly

Refactoring. Since bucket_width will not be fixed in the future this
patch introduces two new procedures:

- ts_continuous_agg_bucket_width(), for determining the exact width
  of given bucket;
- ts_continuous_agg_max_bucket_width(), for estimating the maximum
  bucket width for given continuous aggregate;

This will allow determining the bucket width on the fly, which is
not possible when ContinuousAgg* -> data.bucket_width is accessed
directly. All accesses to data.bucket_width were changed accordingly.
This commit is contained in:
Aleksander Alekseev 2021-06-15 15:16:24 +03:00 committed by Aleksander Alekseev
parent b0cd6495b3
commit 3f9adffaf0
7 changed files with 52 additions and 14 deletions

View File

@ -871,6 +871,12 @@ typedef struct FormData_continuous_agg
NameData user_view_name;
NameData partial_view_schema;
NameData partial_view_name;
/*
* Don't access bucket_width directly to determine the width of the bucket.
* Use corresponding procedures instead:
* - ts_continuous_agg_bucket_width
* - ts_continuous_agg_max_bucket_width
*/
int64 bucket_width;
NameData direct_view_schema;
NameData direct_view_name;

View File

@ -1001,12 +1001,13 @@ watermark_create(const ContinuousAgg *cagg, MemoryContext top_mctx)
if (!max_isnull)
{
int64 value;
int64 bucket_width = ts_continuous_agg_bucket_width(cagg);
/* The materialized hypertable is already bucketed, which means the
* max is the start of the last bucket. Add one bucket to move to the
* point where the materialized data ends. */
value = ts_time_value_to_internal(maxdat, timetype);
w->value = ts_time_saturating_add(value, cagg->data.bucket_width, timetype);
w->value = ts_time_saturating_add(value, bucket_width, timetype);
}
else
{
@ -1067,3 +1068,29 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
PG_RETURN_INT64(watermark->value);
}
/*
* Determines bucket width for given continuous aggregate.
*
* Currently all buckets are fixed in size but this is going to change in the
* future. Any code that needs to know bucket width has to determine it using
* this procedure instead of accessing any ContinuousAgg fields directly.
*/
int64
ts_continuous_agg_bucket_width(const ContinuousAgg *agg)
{
return agg->data.bucket_width;
}
/*
* Determines maximum possible bucket width for given continuous aggregate.
*
* E.g. for monthly continuous aggreagtes this procedure will return 31 days.
* This is not true for ts_continuous_agg_bucket_width which may use additional
* arguments to determine the width of a concrete bucket.
*/
int64
ts_continuous_agg_max_bucket_width(const ContinuousAgg *agg)
{
return agg->data.bucket_width;
}

View File

@ -90,4 +90,7 @@ extern TSDLLEXPORT const Dimension *
ts_continuous_agg_find_integer_now_func_by_materialization_id(int32 mat_htid);
extern ContinuousAgg *ts_continuous_agg_find_userview_name(const char *schema, const char *name);
extern TSDLLEXPORT int64 ts_continuous_agg_bucket_width(const ContinuousAgg *agg);
extern TSDLLEXPORT int64 ts_continuous_agg_max_bucket_width(const ContinuousAgg *agg);
#endif /* TIMESCALEDB_CONTINUOUS_AGG_H */

View File

@ -358,6 +358,7 @@ validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config)
{
int64 start_offset;
int64 end_offset;
int64 max_bucket_width;
if (config->offset_start.isnull)
start_offset = ts_time_get_max(cagg->partition_type);
@ -369,7 +370,8 @@ validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config)
else
end_offset = interval_to_int64(config->offset_end.value, config->offset_end.type);
if (ts_time_saturating_add(end_offset, cagg->data.bucket_width * 2, INT8OID) > start_offset)
max_bucket_width = ts_continuous_agg_max_bucket_width(cagg);
if (ts_time_saturating_add(end_offset, max_bucket_width * 2, INT8OID) > start_offset)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("policy refresh window too small"),

View File

@ -711,6 +711,7 @@ move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state)
TupleInfo *ti;
MemoryContext oldmctx;
Invalidation logentry;
int64 bucket_width = ts_continuous_agg_bucket_width(cagg);
oldmctx = MemoryContextSwitchTo(state->per_tuple_mctx);
ti = ts_scan_iterator_tuple_info(&iterator);
@ -719,7 +720,7 @@ move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state)
ti,
cagg_hyper_id,
state->dimtype,
cagg->data.bucket_width);
bucket_width);
if (!IS_VALID_INVALIDATION(&mergedentry))
{
@ -882,12 +883,10 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
MemoryContext oldmctx;
Invalidation logentry;
int64 bucket_width = ts_continuous_agg_bucket_width(&state->cagg);
oldmctx = MemoryContextSwitchTo(state->per_tuple_mctx);
invalidation_entry_set_from_cagg_invalidation(&logentry,
ti,
state->dimtype,
state->cagg.data.bucket_width);
invalidation_entry_set_from_cagg_invalidation(&logentry, ti, state->dimtype, bucket_width);
if (!IS_VALID_INVALIDATION(&mergedentry))
mergedentry = logentry;

View File

@ -299,13 +299,11 @@ invalidation_threshold_compute(const ContinuousAgg *cagg, const InternalTimeRang
}
else
{
int64 bucket_width = ts_continuous_agg_bucket_width(cagg);
int64 maxval = ts_time_value_to_internal(maxdat, refresh_window->type);
int64 bucket_start =
ts_time_bucket_by_type(cagg->data.bucket_width, maxval, refresh_window->type);
int64 bucket_start = ts_time_bucket_by_type(bucket_width, maxval, refresh_window->type);
/* Add one bucket to get to the end of the last bucket */
return ts_time_saturating_add(bucket_start,
cagg->data.bucket_width,
refresh_window->type);
return ts_time_saturating_add(bucket_start, bucket_width, refresh_window->type);
}
}

View File

@ -405,8 +405,9 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
.end = ts_time_saturating_add(DatumGetInt64(end), 1, refresh_window->type),
};
int64 max_bucket_width = ts_continuous_agg_max_bucket_width(cagg);
InternalTimeRange bucketed_refresh_window =
compute_circumscribed_bucketed_refresh_window(&invalidation, cagg->data.bucket_width);
compute_circumscribed_bucketed_refresh_window(&invalidation, max_bucket_width);
if (do_merged_refresh)
{
@ -571,6 +572,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
InternalTimeRange refresh_window;
int64 computed_invalidation_threshold;
int64 invalidation_threshold;
int64 max_bucket_width;
/* Like regular materialized views, require owner to refresh. */
if (!pg_class_ownercheck(cagg->relid, GetUserId()))
@ -589,8 +591,9 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
* prevent transaction blocks. */
PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME);
max_bucket_width = ts_continuous_agg_max_bucket_width(cagg);
refresh_window =
compute_inscribed_bucketed_refresh_window(refresh_window_arg, cagg->data.bucket_width);
compute_inscribed_bucketed_refresh_window(refresh_window_arg, max_bucket_width);
if (refresh_window.start >= refresh_window.end)
ereport(ERROR,