mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-21 05:04:32 +08:00
Refactoring: get rid of max_bucket_width
Our code occasionally mentions max_bucket_width. However, in practice, there is no such thing. For fixed-sized buckets, bucket_width and max_bucket_width are always the same, while for variable-sized buckets bucket_width is not used at all (except the fact that it equals -1 to indicate that the bucket size is variable). This patch removes any use of max_bucket_width, except for arguments of: - _timescaledb_internal.invalidation_process_hypertable_log() - _timescaledb_internal.invalidation_process_cagg_log() The signatures of these functions were not changed for backward compatibility between access and data nodes, which can run different versions of TimescaleDB.
This commit is contained in:
parent
beb8527def
commit
91f3edf609
@ -51,8 +51,8 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.drop_dist_ht_invalidation_trigg
|
|||||||
-- Access Node that belong to 'raw_hypertable_id'
|
-- Access Node that belong to 'raw_hypertable_id'
|
||||||
-- bucket_widths - The array of time bucket widths for all the CAGGs that belong to
|
-- bucket_widths - The array of time bucket widths for all the CAGGs that belong to
|
||||||
-- 'raw_hypertable_id'
|
-- 'raw_hypertable_id'
|
||||||
-- max_bucket_widths - The array of the maximum time bucket widths for all the CAGGs that belong
|
-- max_bucket_widths - (Deprecated) This argument is ignored and is present only
|
||||||
-- to 'raw_hypertable_id'
|
-- for backward compatibility.
|
||||||
-- bucket_functions - (Optional) The array of serialized information about bucket functions
|
-- bucket_functions - (Optional) The array of serialized information about bucket functions
|
||||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.invalidation_process_hypertable_log(
|
CREATE OR REPLACE FUNCTION _timescaledb_internal.invalidation_process_hypertable_log(
|
||||||
mat_hypertable_id INTEGER,
|
mat_hypertable_id INTEGER,
|
||||||
@ -87,8 +87,8 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.invalidation_process_hypertable
|
|||||||
-- Access Node that belong to 'raw_hypertable_id'
|
-- Access Node that belong to 'raw_hypertable_id'
|
||||||
-- bucket_widths - The array of time bucket widths for all the CAGGs that belong to
|
-- bucket_widths - The array of time bucket widths for all the CAGGs that belong to
|
||||||
-- 'raw_hypertable_id'
|
-- 'raw_hypertable_id'
|
||||||
-- max_bucket_widths - The array of the maximum time bucket widths for all the CAGGs that belong
|
-- max_bucket_widths - (Deprecated) This argument is ignored and is present only
|
||||||
-- to 'raw_hypertable_id'
|
-- for backward compatibility.
|
||||||
-- bucket_functions - (Optional) The array of serialized information about bucket functions
|
-- bucket_functions - (Optional) The array of serialized information about bucket functions
|
||||||
--
|
--
|
||||||
-- Returns a tuple of:
|
-- Returns a tuple of:
|
||||||
|
@ -884,7 +884,6 @@ typedef struct FormData_continuous_agg
|
|||||||
* procedures instead, such as:
|
* procedures instead, such as:
|
||||||
* - ts_continuous_agg_bucket_width_variable
|
* - ts_continuous_agg_bucket_width_variable
|
||||||
* - ts_continuous_agg_bucket_width
|
* - ts_continuous_agg_bucket_width
|
||||||
* - ts_continuous_agg_max_bucket_width
|
|
||||||
* - ts_bucket_function_to_bucket_width_in_months
|
* - ts_bucket_function_to_bucket_width_in_months
|
||||||
*/
|
*/
|
||||||
int64 bucket_width;
|
int64 bucket_width;
|
||||||
|
@ -361,7 +361,6 @@ ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id)
|
|||||||
Datum bucket_width;
|
Datum bucket_width;
|
||||||
|
|
||||||
all_caggs_info.bucket_widths = NIL;
|
all_caggs_info.bucket_widths = NIL;
|
||||||
all_caggs_info.max_bucket_widths = NIL;
|
|
||||||
all_caggs_info.mat_hypertable_ids = NIL;
|
all_caggs_info.mat_hypertable_ids = NIL;
|
||||||
all_caggs_info.bucket_functions = NIL;
|
all_caggs_info.bucket_functions = NIL;
|
||||||
|
|
||||||
@ -377,12 +376,6 @@ ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id)
|
|||||||
all_caggs_info.bucket_widths =
|
all_caggs_info.bucket_widths =
|
||||||
lappend(all_caggs_info.bucket_widths, DatumGetPointer(bucket_width));
|
lappend(all_caggs_info.bucket_widths, DatumGetPointer(bucket_width));
|
||||||
|
|
||||||
bucket_width = Int64GetDatum(ts_continuous_agg_bucket_width_variable(cagg) ?
|
|
||||||
BUCKET_WIDTH_VARIABLE :
|
|
||||||
ts_continuous_agg_max_bucket_width(cagg));
|
|
||||||
all_caggs_info.max_bucket_widths =
|
|
||||||
lappend(all_caggs_info.max_bucket_widths, DatumGetPointer(bucket_width));
|
|
||||||
|
|
||||||
all_caggs_info.bucket_functions =
|
all_caggs_info.bucket_functions =
|
||||||
lappend(all_caggs_info.bucket_functions, cagg->bucket_function);
|
lappend(all_caggs_info.bucket_functions, cagg->bucket_function);
|
||||||
|
|
||||||
@ -492,34 +485,29 @@ bucket_function_deserialize(const char *str)
|
|||||||
*/
|
*/
|
||||||
TSDLLEXPORT void
|
TSDLLEXPORT void
|
||||||
ts_populate_caggs_info_from_arrays(ArrayType *mat_hypertable_ids, ArrayType *bucket_widths,
|
ts_populate_caggs_info_from_arrays(ArrayType *mat_hypertable_ids, ArrayType *bucket_widths,
|
||||||
ArrayType *max_bucket_widths, ArrayType *bucket_functions,
|
ArrayType *bucket_functions, CaggsInfo *all_caggs)
|
||||||
CaggsInfo *all_caggs)
|
|
||||||
{
|
{
|
||||||
all_caggs->mat_hypertable_ids = NIL;
|
all_caggs->mat_hypertable_ids = NIL;
|
||||||
all_caggs->bucket_widths = NIL;
|
all_caggs->bucket_widths = NIL;
|
||||||
all_caggs->max_bucket_widths = NIL;
|
|
||||||
all_caggs->bucket_functions = NIL;
|
all_caggs->bucket_functions = NIL;
|
||||||
|
|
||||||
Assert(ARR_NDIM(mat_hypertable_ids) > 0 && ARR_NDIM(bucket_widths) > 0 &&
|
Assert(ARR_NDIM(mat_hypertable_ids) > 0 && ARR_NDIM(bucket_widths) > 0 &&
|
||||||
ARR_NDIM(max_bucket_widths) > 0 && ARR_NDIM(bucket_functions) > 0);
|
ARR_NDIM(bucket_functions) > 0);
|
||||||
Assert(ARR_NDIM(mat_hypertable_ids) == ARR_NDIM(bucket_widths) &&
|
Assert(ARR_NDIM(mat_hypertable_ids) == ARR_NDIM(bucket_widths) &&
|
||||||
ARR_NDIM(max_bucket_widths) == ARR_NDIM(bucket_widths) &&
|
|
||||||
ARR_NDIM(bucket_functions) == ARR_NDIM(bucket_widths));
|
ARR_NDIM(bucket_functions) == ARR_NDIM(bucket_widths));
|
||||||
|
|
||||||
ArrayIterator it_htids, it_widths, it_maxes, it_bfs;
|
ArrayIterator it_htids, it_widths, it_bfs;
|
||||||
Datum array_datum1, array_datum2, array_datum3, array_datum4;
|
Datum array_datum1, array_datum2, array_datum3;
|
||||||
bool isnull1, isnull2, isnull3, isnull4;
|
bool isnull1, isnull2, isnull3;
|
||||||
|
|
||||||
it_htids = array_create_iterator(mat_hypertable_ids, 0, NULL);
|
it_htids = array_create_iterator(mat_hypertable_ids, 0, NULL);
|
||||||
it_widths = array_create_iterator(bucket_widths, 0, NULL);
|
it_widths = array_create_iterator(bucket_widths, 0, NULL);
|
||||||
it_maxes = array_create_iterator(max_bucket_widths, 0, NULL);
|
|
||||||
it_bfs = array_create_iterator(bucket_functions, 0, NULL);
|
it_bfs = array_create_iterator(bucket_functions, 0, NULL);
|
||||||
while (array_iterate(it_htids, &array_datum1, &isnull1) &&
|
while (array_iterate(it_htids, &array_datum1, &isnull1) &&
|
||||||
array_iterate(it_widths, &array_datum2, &isnull2) &&
|
array_iterate(it_widths, &array_datum2, &isnull2) &&
|
||||||
array_iterate(it_maxes, &array_datum3, &isnull3) &&
|
array_iterate(it_bfs, &array_datum3, &isnull3))
|
||||||
array_iterate(it_bfs, &array_datum4, &isnull4))
|
|
||||||
{
|
{
|
||||||
Assert(!isnull1 && !isnull2 && !isnull3 && !isnull4);
|
Assert(!isnull1 && !isnull2 && !isnull3);
|
||||||
int32 mat_hypertable_id = DatumGetInt32(array_datum1);
|
int32 mat_hypertable_id = DatumGetInt32(array_datum1);
|
||||||
all_caggs->mat_hypertable_ids =
|
all_caggs->mat_hypertable_ids =
|
||||||
lappend_int(all_caggs->mat_hypertable_ids, mat_hypertable_id);
|
lappend_int(all_caggs->mat_hypertable_ids, mat_hypertable_id);
|
||||||
@ -528,19 +516,15 @@ ts_populate_caggs_info_from_arrays(ArrayType *mat_hypertable_ids, ArrayType *buc
|
|||||||
bucket_width = array_datum2;
|
bucket_width = array_datum2;
|
||||||
all_caggs->bucket_widths = lappend(all_caggs->bucket_widths, DatumGetPointer(bucket_width));
|
all_caggs->bucket_widths = lappend(all_caggs->bucket_widths, DatumGetPointer(bucket_width));
|
||||||
|
|
||||||
bucket_width = array_datum3;
|
|
||||||
all_caggs->max_bucket_widths =
|
|
||||||
lappend(all_caggs->max_bucket_widths, DatumGetPointer(bucket_width));
|
|
||||||
|
|
||||||
const ContinuousAggsBucketFunction *bucket_function =
|
const ContinuousAggsBucketFunction *bucket_function =
|
||||||
bucket_function_deserialize(TextDatumGetCString(array_datum4));
|
bucket_function_deserialize(TextDatumGetCString(array_datum3));
|
||||||
/* bucket_function is cast to non-const type to make Visual Studio happy */
|
/* bucket_function is cast to non-const type to make Visual Studio happy */
|
||||||
all_caggs->bucket_functions =
|
all_caggs->bucket_functions =
|
||||||
lappend(all_caggs->bucket_functions, (ContinuousAggsBucketFunction *) bucket_function);
|
lappend(all_caggs->bucket_functions, (ContinuousAggsBucketFunction *) bucket_function);
|
||||||
}
|
}
|
||||||
array_free_iterator(it_htids);
|
array_free_iterator(it_htids);
|
||||||
array_free_iterator(it_widths);
|
array_free_iterator(it_widths);
|
||||||
array_free_iterator(it_maxes);
|
array_free_iterator(it_bfs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -549,34 +533,29 @@ ts_populate_caggs_info_from_arrays(ArrayType *mat_hypertable_ids, ArrayType *buc
|
|||||||
*/
|
*/
|
||||||
TSDLLEXPORT void
|
TSDLLEXPORT void
|
||||||
ts_create_arrays_from_caggs_info(const CaggsInfo *all_caggs, ArrayType **mat_hypertable_ids,
|
ts_create_arrays_from_caggs_info(const CaggsInfo *all_caggs, ArrayType **mat_hypertable_ids,
|
||||||
ArrayType **bucket_widths, ArrayType **max_bucket_widths,
|
ArrayType **bucket_widths, ArrayType **bucket_functions)
|
||||||
ArrayType **bucket_functions)
|
|
||||||
{
|
{
|
||||||
ListCell *lc1, *lc2, *lc3, *lc4;
|
ListCell *lc1, *lc2, *lc3;
|
||||||
unsigned i;
|
unsigned i;
|
||||||
|
|
||||||
Datum *matiddatums = palloc(sizeof(Datum) * list_length(all_caggs->mat_hypertable_ids));
|
Datum *matiddatums = palloc(sizeof(Datum) * list_length(all_caggs->mat_hypertable_ids));
|
||||||
Datum *widthdatums = palloc(sizeof(Datum) * list_length(all_caggs->bucket_widths));
|
Datum *widthdatums = palloc(sizeof(Datum) * list_length(all_caggs->bucket_widths));
|
||||||
Datum *maxwidthdatums = palloc(sizeof(Datum) * list_length(all_caggs->max_bucket_widths));
|
|
||||||
Datum *bucketfunctions = palloc(sizeof(Datum) * list_length(all_caggs->bucket_functions));
|
Datum *bucketfunctions = palloc(sizeof(Datum) * list_length(all_caggs->bucket_functions));
|
||||||
|
|
||||||
i = 0;
|
i = 0;
|
||||||
forfour(lc1,
|
forthree (lc1,
|
||||||
all_caggs->mat_hypertable_ids,
|
all_caggs->mat_hypertable_ids,
|
||||||
lc2,
|
lc2,
|
||||||
all_caggs->bucket_widths,
|
all_caggs->bucket_widths,
|
||||||
lc3,
|
lc3,
|
||||||
all_caggs->max_bucket_widths,
|
all_caggs->bucket_functions)
|
||||||
lc4,
|
|
||||||
all_caggs->bucket_functions)
|
|
||||||
{
|
{
|
||||||
int32 cagg_hyper_id = lfirst_int(lc1);
|
int32 cagg_hyper_id = lfirst_int(lc1);
|
||||||
matiddatums[i] = Int32GetDatum(cagg_hyper_id);
|
matiddatums[i] = Int32GetDatum(cagg_hyper_id);
|
||||||
|
|
||||||
widthdatums[i] = PointerGetDatum(lfirst(lc2));
|
widthdatums[i] = PointerGetDatum(lfirst(lc2));
|
||||||
maxwidthdatums[i] = PointerGetDatum(lfirst(lc3));
|
|
||||||
|
|
||||||
const ContinuousAggsBucketFunction *bucket_function = lfirst(lc4);
|
const ContinuousAggsBucketFunction *bucket_function = lfirst(lc3);
|
||||||
bucketfunctions[i] = CStringGetTextDatum(bucket_function_serialize(bucket_function));
|
bucketfunctions[i] = CStringGetTextDatum(bucket_function_serialize(bucket_function));
|
||||||
|
|
||||||
++i;
|
++i;
|
||||||
@ -596,13 +575,6 @@ ts_create_arrays_from_caggs_info(const CaggsInfo *all_caggs, ArrayType **mat_hyp
|
|||||||
FLOAT8PASSBYVAL,
|
FLOAT8PASSBYVAL,
|
||||||
TYPALIGN_DOUBLE);
|
TYPALIGN_DOUBLE);
|
||||||
|
|
||||||
*max_bucket_widths = construct_array(maxwidthdatums,
|
|
||||||
list_length(all_caggs->max_bucket_widths),
|
|
||||||
INT8OID,
|
|
||||||
8,
|
|
||||||
FLOAT8PASSBYVAL,
|
|
||||||
TYPALIGN_DOUBLE);
|
|
||||||
|
|
||||||
*bucket_functions = construct_array(bucketfunctions,
|
*bucket_functions = construct_array(bucketfunctions,
|
||||||
list_length(all_caggs->bucket_functions),
|
list_length(all_caggs->bucket_functions),
|
||||||
TEXTOID,
|
TEXTOID,
|
||||||
@ -1510,21 +1482,6 @@ ts_continuous_agg_bucket_width(const ContinuousAgg *agg)
|
|||||||
return agg->data.bucket_width;
|
return agg->data.bucket_width;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Determines maximum possible bucket width for given continuous aggregate. */
|
|
||||||
int64
|
|
||||||
ts_continuous_agg_max_bucket_width(const ContinuousAgg *agg)
|
|
||||||
{
|
|
||||||
if (ts_continuous_agg_bucket_width_variable(agg))
|
|
||||||
{
|
|
||||||
/* should never happen, this code is useful mostly for debugging purposes */
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
||||||
errmsg("maximum bucket width is not defined for a variable bucket")));
|
|
||||||
}
|
|
||||||
|
|
||||||
return agg->data.bucket_width;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Determines bucket size in months for given bucketing function.
|
* Determines bucket size in months for given bucketing function.
|
||||||
*
|
*
|
||||||
|
@ -110,8 +110,6 @@ typedef struct CaggsInfoData
|
|||||||
List *mat_hypertable_ids;
|
List *mat_hypertable_ids;
|
||||||
/* (int64) Datum elements; stores BUCKET_WIDTH_VARIABLE for variable buckets */
|
/* (int64) Datum elements; stores BUCKET_WIDTH_VARIABLE for variable buckets */
|
||||||
List *bucket_widths;
|
List *bucket_widths;
|
||||||
/* (int64) Datum elements; stores BUCKET_WIDTH_VARIABLE for variable buckets */
|
|
||||||
List *max_bucket_widths;
|
|
||||||
/* (const ContinuousAggsBucketFunction *) elements; stores NULL for fixed buckets */
|
/* (const ContinuousAggsBucketFunction *) elements; stores NULL for fixed buckets */
|
||||||
List *bucket_functions;
|
List *bucket_functions;
|
||||||
} CaggsInfo;
|
} CaggsInfo;
|
||||||
@ -119,13 +117,11 @@ typedef struct CaggsInfoData
|
|||||||
extern TSDLLEXPORT const CaggsInfo ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id);
|
extern TSDLLEXPORT const CaggsInfo ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id);
|
||||||
extern TSDLLEXPORT void ts_populate_caggs_info_from_arrays(ArrayType *mat_hypertable_ids,
|
extern TSDLLEXPORT void ts_populate_caggs_info_from_arrays(ArrayType *mat_hypertable_ids,
|
||||||
ArrayType *bucket_widths,
|
ArrayType *bucket_widths,
|
||||||
ArrayType *max_bucket_widths,
|
|
||||||
ArrayType *bucket_functions,
|
ArrayType *bucket_functions,
|
||||||
CaggsInfo *all_caggs);
|
CaggsInfo *all_caggs);
|
||||||
TSDLLEXPORT void ts_create_arrays_from_caggs_info(const CaggsInfo *all_caggs,
|
TSDLLEXPORT void ts_create_arrays_from_caggs_info(const CaggsInfo *all_caggs,
|
||||||
ArrayType **mat_hypertable_ids,
|
ArrayType **mat_hypertable_ids,
|
||||||
ArrayType **bucket_widths,
|
ArrayType **bucket_widths,
|
||||||
ArrayType **max_bucket_widths,
|
|
||||||
ArrayType **bucket_functions);
|
ArrayType **bucket_functions);
|
||||||
|
|
||||||
extern TSDLLEXPORT ContinuousAgg *
|
extern TSDLLEXPORT ContinuousAgg *
|
||||||
@ -161,7 +157,6 @@ extern ContinuousAgg *ts_continuous_agg_find_userview_name(const char *schema, c
|
|||||||
|
|
||||||
extern TSDLLEXPORT bool ts_continuous_agg_bucket_width_variable(const ContinuousAgg *agg);
|
extern TSDLLEXPORT bool ts_continuous_agg_bucket_width_variable(const ContinuousAgg *agg);
|
||||||
extern TSDLLEXPORT int64 ts_continuous_agg_bucket_width(const ContinuousAgg *agg);
|
extern TSDLLEXPORT int64 ts_continuous_agg_bucket_width(const ContinuousAgg *agg);
|
||||||
extern TSDLLEXPORT int64 ts_continuous_agg_max_bucket_width(const ContinuousAgg *agg);
|
|
||||||
extern TSDLLEXPORT int32
|
extern TSDLLEXPORT int32
|
||||||
ts_bucket_function_to_bucket_width_in_months(const ContinuousAggsBucketFunction *agg);
|
ts_bucket_function_to_bucket_width_in_months(const ContinuousAggsBucketFunction *agg);
|
||||||
|
|
||||||
|
@ -358,7 +358,7 @@ validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config)
|
|||||||
{
|
{
|
||||||
int64 start_offset;
|
int64 start_offset;
|
||||||
int64 end_offset;
|
int64 end_offset;
|
||||||
int64 max_bucket_width;
|
int64 bucket_width;
|
||||||
|
|
||||||
if (config->offset_start.isnull)
|
if (config->offset_start.isnull)
|
||||||
start_offset = ts_time_get_max(cagg->partition_type);
|
start_offset = ts_time_get_max(cagg->partition_type);
|
||||||
@ -370,8 +370,8 @@ validate_window_size(const ContinuousAgg *cagg, const CaggPolicyConfig *config)
|
|||||||
else
|
else
|
||||||
end_offset = interval_to_int64(config->offset_end.value, config->offset_end.type);
|
end_offset = interval_to_int64(config->offset_end.value, config->offset_end.type);
|
||||||
|
|
||||||
max_bucket_width = ts_continuous_agg_max_bucket_width(cagg);
|
bucket_width = ts_continuous_agg_bucket_width(cagg);
|
||||||
if (ts_time_saturating_add(end_offset, max_bucket_width * 2, INT8OID) > start_offset)
|
if (ts_time_saturating_add(end_offset, bucket_width * 2, INT8OID) > start_offset)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
errmsg("policy refresh window too small"),
|
errmsg("policy refresh window too small"),
|
||||||
|
@ -89,9 +89,6 @@
|
|||||||
#define INTERNAL_TO_TSTZ_FUNCTION "to_timestamp"
|
#define INTERNAL_TO_TSTZ_FUNCTION "to_timestamp"
|
||||||
#define INTERNAL_TO_TS_FUNCTION "to_timestamp_without_timezone"
|
#define INTERNAL_TO_TS_FUNCTION "to_timestamp_without_timezone"
|
||||||
|
|
||||||
#define DEFAULT_MAX_INTERVAL_MULTIPLIER 20
|
|
||||||
#define DEFAULT_MAX_INTERVAL_MAX_BUCKET_WIDTH (PG_INT64_MAX / DEFAULT_MAX_INTERVAL_MULTIPLIER)
|
|
||||||
|
|
||||||
/*switch to ts user for _timescaledb_internal access */
|
/*switch to ts user for _timescaledb_internal access */
|
||||||
#define SWITCH_TO_TS_USER(schemaname, newuid, saved_uid, saved_secctx) \
|
#define SWITCH_TO_TS_USER(schemaname, newuid, saved_uid, saved_secctx) \
|
||||||
do \
|
do \
|
||||||
|
@ -101,8 +101,7 @@ typedef struct CaggInvalidationState
|
|||||||
Tuplestorestate *invalidations;
|
Tuplestorestate *invalidations;
|
||||||
const CaggsInfo *all_caggs;
|
const CaggsInfo *all_caggs;
|
||||||
int64 bucket_width;
|
int64 bucket_width;
|
||||||
int64 max_bucket_width;
|
/* bucket_function is NULL unless bucket_width == BUCKET_WIDTH_VARIABLE */
|
||||||
/* bucket_function is NULL unless bucket_width == max_bucket_width == BUCKET_WIDTH_VARIABLE */
|
|
||||||
const ContinuousAggsBucketFunction *bucket_function;
|
const ContinuousAggsBucketFunction *bucket_function;
|
||||||
} CaggInvalidationState;
|
} CaggInvalidationState;
|
||||||
|
|
||||||
@ -1051,7 +1050,7 @@ static void
|
|||||||
invalidation_state_init(CaggInvalidationState *state, int32 mat_hypertable_id,
|
invalidation_state_init(CaggInvalidationState *state, int32 mat_hypertable_id,
|
||||||
int32 raw_hypertable_id, Oid dimtype, const CaggsInfo *all_caggs)
|
int32 raw_hypertable_id, Oid dimtype, const CaggsInfo *all_caggs)
|
||||||
{
|
{
|
||||||
ListCell *lc1, *lc2, *lc3, *lc4;
|
ListCell *lc1, *lc2, *lc3;
|
||||||
bool PG_USED_FOR_ASSERTS_ONLY found = false;
|
bool PG_USED_FOR_ASSERTS_ONLY found = false;
|
||||||
|
|
||||||
state->mat_hypertable_id = mat_hypertable_id;
|
state->mat_hypertable_id = mat_hypertable_id;
|
||||||
@ -1063,25 +1062,19 @@ invalidation_state_init(CaggInvalidationState *state, int32 mat_hypertable_id,
|
|||||||
"Continuous aggregate invalidations",
|
"Continuous aggregate invalidations",
|
||||||
ALLOCSET_DEFAULT_SIZES);
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
state->snapshot = RegisterSnapshot(GetTransactionSnapshot());
|
state->snapshot = RegisterSnapshot(GetTransactionSnapshot());
|
||||||
forfour(lc1,
|
forthree (lc1,
|
||||||
all_caggs->mat_hypertable_ids,
|
all_caggs->mat_hypertable_ids,
|
||||||
lc2,
|
lc2,
|
||||||
all_caggs->bucket_widths,
|
all_caggs->bucket_widths,
|
||||||
lc3,
|
lc3,
|
||||||
all_caggs->max_bucket_widths,
|
all_caggs->bucket_functions)
|
||||||
lc4,
|
|
||||||
all_caggs->bucket_functions)
|
|
||||||
{
|
{
|
||||||
int32 cagg_hyper_id = lfirst_int(lc1);
|
int32 cagg_hyper_id = lfirst_int(lc1);
|
||||||
|
|
||||||
if (cagg_hyper_id == mat_hypertable_id)
|
if (cagg_hyper_id == mat_hypertable_id)
|
||||||
{
|
{
|
||||||
int64 bucket_width = DatumGetInt64(PointerGetDatum(lfirst(lc2)));
|
state->bucket_width = DatumGetInt64(PointerGetDatum(lfirst(lc2)));
|
||||||
int64 max_bucket_width = DatumGetInt64(PointerGetDatum(lfirst(lc3)));
|
state->bucket_function = lfirst(lc3);
|
||||||
|
|
||||||
state->bucket_width = bucket_width;
|
|
||||||
state->max_bucket_width = max_bucket_width;
|
|
||||||
state->bucket_function = lfirst(lc4);
|
|
||||||
|
|
||||||
found = true;
|
found = true;
|
||||||
break;
|
break;
|
||||||
@ -1147,8 +1140,8 @@ bucket_functions_default_argument(int ndim)
|
|||||||
* the Access Node that belong to 'raw_hypertable_id'.
|
* the Access Node that belong to 'raw_hypertable_id'.
|
||||||
* @param bucket_widths The array of time bucket widths for all the CAGGs that belong to
|
* @param bucket_widths The array of time bucket widths for all the CAGGs that belong to
|
||||||
* 'raw_hypertable_id'.
|
* 'raw_hypertable_id'.
|
||||||
* @param max_bucket_widths The array of the maximum time bucket widths for all the CAGGs that
|
* @param max_bucket_widths (Deprecated) This argument is ignored and is present only for backward
|
||||||
* belong to 'raw_hypertable_id'.
|
* compatibility.
|
||||||
* @param bucket_functions (Optional) The array of serialized information about bucket functions.
|
* @param bucket_functions (Optional) The array of serialized information about bucket functions.
|
||||||
*/
|
*/
|
||||||
Datum
|
Datum
|
||||||
@ -1159,7 +1152,6 @@ tsl_invalidation_process_hypertable_log(PG_FUNCTION_ARGS)
|
|||||||
Oid dimtype = PG_GETARG_OID(2);
|
Oid dimtype = PG_GETARG_OID(2);
|
||||||
ArrayType *mat_hypertable_ids = PG_GETARG_ARRAYTYPE_P(3);
|
ArrayType *mat_hypertable_ids = PG_GETARG_ARRAYTYPE_P(3);
|
||||||
ArrayType *bucket_widths = PG_GETARG_ARRAYTYPE_P(4);
|
ArrayType *bucket_widths = PG_GETARG_ARRAYTYPE_P(4);
|
||||||
ArrayType *max_bucket_widths = PG_GETARG_ARRAYTYPE_P(5);
|
|
||||||
ArrayType *bucket_functions = PG_NARGS() > 6 ?
|
ArrayType *bucket_functions = PG_NARGS() > 6 ?
|
||||||
PG_GETARG_ARRAYTYPE_P(6) :
|
PG_GETARG_ARRAYTYPE_P(6) :
|
||||||
bucket_functions_default_argument(ARR_NDIM(bucket_widths));
|
bucket_functions_default_argument(ARR_NDIM(bucket_widths));
|
||||||
@ -1167,7 +1159,6 @@ tsl_invalidation_process_hypertable_log(PG_FUNCTION_ARGS)
|
|||||||
|
|
||||||
ts_populate_caggs_info_from_arrays(mat_hypertable_ids,
|
ts_populate_caggs_info_from_arrays(mat_hypertable_ids,
|
||||||
bucket_widths,
|
bucket_widths,
|
||||||
max_bucket_widths,
|
|
||||||
bucket_functions,
|
bucket_functions,
|
||||||
&all_caggs_info);
|
&all_caggs_info);
|
||||||
|
|
||||||
@ -1188,7 +1179,6 @@ remote_invalidation_process_hypertable_log(int32 mat_hypertable_id, int32 raw_hy
|
|||||||
Oid func_oid;
|
Oid func_oid;
|
||||||
ArrayType *mat_hypertable_ids;
|
ArrayType *mat_hypertable_ids;
|
||||||
ArrayType *bucket_widths;
|
ArrayType *bucket_widths;
|
||||||
ArrayType *max_bucket_widths;
|
|
||||||
ArrayType *bucket_functions;
|
ArrayType *bucket_functions;
|
||||||
LOCAL_FCINFO(fcinfo, INVALIDATION_PROCESS_HYPERTABLE_LOG_NARGS);
|
LOCAL_FCINFO(fcinfo, INVALIDATION_PROCESS_HYPERTABLE_LOG_NARGS);
|
||||||
FmgrInfo flinfo;
|
FmgrInfo flinfo;
|
||||||
@ -1197,7 +1187,6 @@ remote_invalidation_process_hypertable_log(int32 mat_hypertable_id, int32 raw_hy
|
|||||||
ts_create_arrays_from_caggs_info(all_caggs,
|
ts_create_arrays_from_caggs_info(all_caggs,
|
||||||
&mat_hypertable_ids,
|
&mat_hypertable_ids,
|
||||||
&bucket_widths,
|
&bucket_widths,
|
||||||
&max_bucket_widths,
|
|
||||||
&bucket_functions);
|
&bucket_functions);
|
||||||
|
|
||||||
static const Oid type_id[INVALIDATION_PROCESS_HYPERTABLE_LOG_NARGS] = {
|
static const Oid type_id[INVALIDATION_PROCESS_HYPERTABLE_LOG_NARGS] = {
|
||||||
@ -1230,7 +1219,7 @@ remote_invalidation_process_hypertable_log(int32 mat_hypertable_id, int32 raw_hy
|
|||||||
FC_ARG(fcinfo, 2) = ObjectIdGetDatum(dimtype);
|
FC_ARG(fcinfo, 2) = ObjectIdGetDatum(dimtype);
|
||||||
FC_ARG(fcinfo, 3) = PointerGetDatum(mat_hypertable_ids);
|
FC_ARG(fcinfo, 3) = PointerGetDatum(mat_hypertable_ids);
|
||||||
FC_ARG(fcinfo, 4) = PointerGetDatum(bucket_widths);
|
FC_ARG(fcinfo, 4) = PointerGetDatum(bucket_widths);
|
||||||
FC_ARG(fcinfo, 5) = PointerGetDatum(max_bucket_widths);
|
FC_ARG(fcinfo, 5) = PointerGetDatum(construct_empty_array(INT8OID));
|
||||||
FC_ARG(fcinfo, 6) = PointerGetDatum(bucket_functions);
|
FC_ARG(fcinfo, 6) = PointerGetDatum(bucket_functions);
|
||||||
/* Check for null result, since caller is clearly not expecting one */
|
/* Check for null result, since caller is clearly not expecting one */
|
||||||
if (fcinfo->isnull)
|
if (fcinfo->isnull)
|
||||||
@ -1291,7 +1280,7 @@ invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id,
|
|||||||
InternalTimeRange merged_refresh_window;
|
InternalTimeRange merged_refresh_window;
|
||||||
continuous_agg_calculate_merged_refresh_window(refresh_window,
|
continuous_agg_calculate_merged_refresh_window(refresh_window,
|
||||||
store,
|
store,
|
||||||
state.max_bucket_width,
|
state.bucket_width,
|
||||||
state.bucket_function,
|
state.bucket_function,
|
||||||
&merged_refresh_window);
|
&merged_refresh_window);
|
||||||
*do_merged_refresh = true;
|
*do_merged_refresh = true;
|
||||||
@ -1319,8 +1308,8 @@ invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id,
|
|||||||
* the Access Node that belong to 'raw_hypertable_id'.
|
* the Access Node that belong to 'raw_hypertable_id'.
|
||||||
* @param bucket_widths The array of time bucket widths for all the CAGGs that belong to
|
* @param bucket_widths The array of time bucket widths for all the CAGGs that belong to
|
||||||
* 'raw_hypertable_id'.
|
* 'raw_hypertable_id'.
|
||||||
* @param max_bucket_widths The array of the maximum time bucket widths for all the CAGGs that
|
* @param max_bucket_widths (Deprecated) This argument is ignored and is present only for backward
|
||||||
* belong to 'raw_hypertable_id'.
|
* compatibility.
|
||||||
* @param bucket_functions (Optional) The array of serialized information about bucket functions.
|
* @param bucket_functions (Optional) The array of serialized information about bucket functions.
|
||||||
* @return a tuple of:
|
* @return a tuple of:
|
||||||
* ret_window_start - The merged refresh window starting time
|
* ret_window_start - The merged refresh window starting time
|
||||||
@ -1339,7 +1328,6 @@ tsl_invalidation_process_cagg_log(PG_FUNCTION_ARGS)
|
|||||||
.end = PG_GETARG_INT64(4) };
|
.end = PG_GETARG_INT64(4) };
|
||||||
ArrayType *mat_hypertable_ids = PG_GETARG_ARRAYTYPE_P(5);
|
ArrayType *mat_hypertable_ids = PG_GETARG_ARRAYTYPE_P(5);
|
||||||
ArrayType *bucket_widths = PG_GETARG_ARRAYTYPE_P(6);
|
ArrayType *bucket_widths = PG_GETARG_ARRAYTYPE_P(6);
|
||||||
ArrayType *max_bucket_widths = PG_GETARG_ARRAYTYPE_P(7);
|
|
||||||
ArrayType *bucket_functions = PG_NARGS() > 8 ?
|
ArrayType *bucket_functions = PG_NARGS() > 8 ?
|
||||||
PG_GETARG_ARRAYTYPE_P(8) :
|
PG_GETARG_ARRAYTYPE_P(8) :
|
||||||
bucket_functions_default_argument(ARR_NDIM(bucket_widths));
|
bucket_functions_default_argument(ARR_NDIM(bucket_widths));
|
||||||
@ -1348,7 +1336,6 @@ tsl_invalidation_process_cagg_log(PG_FUNCTION_ARGS)
|
|||||||
|
|
||||||
ts_populate_caggs_info_from_arrays(mat_hypertable_ids,
|
ts_populate_caggs_info_from_arrays(mat_hypertable_ids,
|
||||||
bucket_widths,
|
bucket_widths,
|
||||||
max_bucket_widths,
|
|
||||||
bucket_functions,
|
bucket_functions,
|
||||||
&all_caggs_info);
|
&all_caggs_info);
|
||||||
|
|
||||||
@ -1400,7 +1387,6 @@ remote_invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertab
|
|||||||
Oid func_oid;
|
Oid func_oid;
|
||||||
ArrayType *mat_hypertable_ids;
|
ArrayType *mat_hypertable_ids;
|
||||||
ArrayType *bucket_widths;
|
ArrayType *bucket_widths;
|
||||||
ArrayType *max_bucket_widths;
|
|
||||||
ArrayType *bucket_functions;
|
ArrayType *bucket_functions;
|
||||||
LOCAL_FCINFO(fcinfo, INVALIDATION_PROCESS_CAGG_LOG_NARGS);
|
LOCAL_FCINFO(fcinfo, INVALIDATION_PROCESS_CAGG_LOG_NARGS);
|
||||||
FmgrInfo flinfo;
|
FmgrInfo flinfo;
|
||||||
@ -1411,7 +1397,6 @@ remote_invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertab
|
|||||||
ts_create_arrays_from_caggs_info(all_caggs,
|
ts_create_arrays_from_caggs_info(all_caggs,
|
||||||
&mat_hypertable_ids,
|
&mat_hypertable_ids,
|
||||||
&bucket_widths,
|
&bucket_widths,
|
||||||
&max_bucket_widths,
|
|
||||||
&bucket_functions);
|
&bucket_functions);
|
||||||
|
|
||||||
static const Oid type_id[INVALIDATION_PROCESS_CAGG_LOG_NARGS] = {
|
static const Oid type_id[INVALIDATION_PROCESS_CAGG_LOG_NARGS] = {
|
||||||
@ -1447,7 +1432,7 @@ remote_invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertab
|
|||||||
FC_ARG(fcinfo, 4) = Int64GetDatum(refresh_window->end);
|
FC_ARG(fcinfo, 4) = Int64GetDatum(refresh_window->end);
|
||||||
FC_ARG(fcinfo, 5) = PointerGetDatum(mat_hypertable_ids);
|
FC_ARG(fcinfo, 5) = PointerGetDatum(mat_hypertable_ids);
|
||||||
FC_ARG(fcinfo, 6) = PointerGetDatum(bucket_widths);
|
FC_ARG(fcinfo, 6) = PointerGetDatum(bucket_widths);
|
||||||
FC_ARG(fcinfo, 7) = PointerGetDatum(max_bucket_widths);
|
FC_ARG(fcinfo, 7) = PointerGetDatum(construct_empty_array(INT8OID));
|
||||||
FC_ARG(fcinfo, 8) = PointerGetDatum(bucket_functions);
|
FC_ARG(fcinfo, 8) = PointerGetDatum(bucket_functions);
|
||||||
/* Check for null result, since caller is clearly not expecting one */
|
/* Check for null result, since caller is clearly not expecting one */
|
||||||
if (fcinfo->isnull)
|
if (fcinfo->isnull)
|
||||||
|
@ -466,7 +466,7 @@ update_merged_refresh_window(const InternalTimeRange *bucketed_refresh_window, c
|
|||||||
static long
|
static long
|
||||||
continuous_agg_scan_refresh_window_ranges(const InternalTimeRange *refresh_window,
|
continuous_agg_scan_refresh_window_ranges(const InternalTimeRange *refresh_window,
|
||||||
const InvalidationStore *invalidations,
|
const InvalidationStore *invalidations,
|
||||||
const int64 max_bucket_width,
|
const int64 bucket_width,
|
||||||
const ContinuousAggsBucketFunction *bucket_function,
|
const ContinuousAggsBucketFunction *bucket_function,
|
||||||
scan_refresh_ranges_funct_t exec_func, void *func_arg1,
|
scan_refresh_ranges_funct_t exec_func, void *func_arg1,
|
||||||
void *func_arg2)
|
void *func_arg2)
|
||||||
@ -500,7 +500,7 @@ continuous_agg_scan_refresh_window_ranges(const InternalTimeRange *refresh_windo
|
|||||||
|
|
||||||
InternalTimeRange bucketed_refresh_window =
|
InternalTimeRange bucketed_refresh_window =
|
||||||
compute_circumscribed_bucketed_refresh_window(&invalidation,
|
compute_circumscribed_bucketed_refresh_window(&invalidation,
|
||||||
max_bucket_width,
|
bucket_width,
|
||||||
bucket_function);
|
bucket_function);
|
||||||
|
|
||||||
(*exec_func)(&bucketed_refresh_window, count, func_arg1, func_arg2);
|
(*exec_func)(&bucketed_refresh_window, count, func_arg1, func_arg2);
|
||||||
@ -542,9 +542,9 @@ continuous_agg_scan_refresh_window_ranges(const InternalTimeRange *refresh_windo
|
|||||||
static void
|
static void
|
||||||
continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
|
continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
|
||||||
const InternalTimeRange *refresh_window,
|
const InternalTimeRange *refresh_window,
|
||||||
const InvalidationStore *invalidations,
|
const InvalidationStore *invalidations, const int64 bucket_width,
|
||||||
const int64 max_bucket_width, const int32 chunk_id,
|
const int32 chunk_id, const bool is_raw_ht_distributed,
|
||||||
const bool is_raw_ht_distributed, const bool do_merged_refresh,
|
const bool do_merged_refresh,
|
||||||
const InternalTimeRange merged_refresh_window)
|
const InternalTimeRange merged_refresh_window)
|
||||||
{
|
{
|
||||||
CaggRefreshState refresh;
|
CaggRefreshState refresh;
|
||||||
@ -560,8 +560,8 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
|
|||||||
{
|
{
|
||||||
Assert(merged_refresh_window.type == refresh_window->type);
|
Assert(merged_refresh_window.type == refresh_window->type);
|
||||||
Assert(merged_refresh_window.start >= refresh_window->start);
|
Assert(merged_refresh_window.start >= refresh_window->start);
|
||||||
Assert((max_bucket_width == BUCKET_WIDTH_VARIABLE) ||
|
Assert((bucket_width == BUCKET_WIDTH_VARIABLE) ||
|
||||||
(merged_refresh_window.end - max_bucket_width <= refresh_window->end));
|
(merged_refresh_window.end - bucket_width <= refresh_window->end));
|
||||||
|
|
||||||
log_refresh_window(DEBUG1,
|
log_refresh_window(DEBUG1,
|
||||||
cagg,
|
cagg,
|
||||||
@ -574,7 +574,7 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
|
|||||||
long count pg_attribute_unused();
|
long count pg_attribute_unused();
|
||||||
count = continuous_agg_scan_refresh_window_ranges(refresh_window,
|
count = continuous_agg_scan_refresh_window_ranges(refresh_window,
|
||||||
invalidations,
|
invalidations,
|
||||||
max_bucket_width,
|
bucket_width,
|
||||||
cagg->bucket_function,
|
cagg->bucket_function,
|
||||||
continuous_agg_refresh_execute_wrapper,
|
continuous_agg_refresh_execute_wrapper,
|
||||||
(void *) &refresh /* arg1 */,
|
(void *) &refresh /* arg1 */,
|
||||||
@ -666,14 +666,14 @@ emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshCallContext c
|
|||||||
void
|
void
|
||||||
continuous_agg_calculate_merged_refresh_window(const InternalTimeRange *refresh_window,
|
continuous_agg_calculate_merged_refresh_window(const InternalTimeRange *refresh_window,
|
||||||
const InvalidationStore *invalidations,
|
const InvalidationStore *invalidations,
|
||||||
const int64 max_bucket_width,
|
const int64 bucket_width,
|
||||||
const ContinuousAggsBucketFunction *bucket_function,
|
const ContinuousAggsBucketFunction *bucket_function,
|
||||||
InternalTimeRange *merged_refresh_window)
|
InternalTimeRange *merged_refresh_window)
|
||||||
{
|
{
|
||||||
long count pg_attribute_unused();
|
long count pg_attribute_unused();
|
||||||
count = continuous_agg_scan_refresh_window_ranges(refresh_window,
|
count = continuous_agg_scan_refresh_window_ranges(refresh_window,
|
||||||
invalidations,
|
invalidations,
|
||||||
max_bucket_width,
|
bucket_width,
|
||||||
bucket_function,
|
bucket_function,
|
||||||
update_merged_refresh_window,
|
update_merged_refresh_window,
|
||||||
(void *) merged_refresh_window,
|
(void *) merged_refresh_window,
|
||||||
@ -743,13 +743,13 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
|
|||||||
"aggregate on creation.")));
|
"aggregate on creation.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
int64 max_bucket_width = ts_continuous_agg_bucket_width_variable(cagg) ?
|
int64 bucket_width = ts_continuous_agg_bucket_width_variable(cagg) ?
|
||||||
BUCKET_WIDTH_VARIABLE :
|
BUCKET_WIDTH_VARIABLE :
|
||||||
ts_continuous_agg_max_bucket_width(cagg);
|
ts_continuous_agg_bucket_width(cagg);
|
||||||
continuous_agg_refresh_with_window(cagg,
|
continuous_agg_refresh_with_window(cagg,
|
||||||
refresh_window,
|
refresh_window,
|
||||||
invalidations,
|
invalidations,
|
||||||
max_bucket_width,
|
bucket_width,
|
||||||
chunk_id,
|
chunk_id,
|
||||||
is_raw_ht_distributed,
|
is_raw_ht_distributed,
|
||||||
do_merged_refresh,
|
do_merged_refresh,
|
||||||
@ -809,7 +809,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
|
|||||||
{
|
{
|
||||||
refresh_window =
|
refresh_window =
|
||||||
compute_inscribed_bucketed_refresh_window(refresh_window_arg,
|
compute_inscribed_bucketed_refresh_window(refresh_window_arg,
|
||||||
ts_continuous_agg_max_bucket_width(cagg));
|
ts_continuous_agg_bucket_width(cagg));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (refresh_window.start >= refresh_window.end)
|
if (refresh_window.start >= refresh_window.end)
|
||||||
|
@ -25,7 +25,7 @@ extern Datum continuous_agg_refresh(PG_FUNCTION_ARGS);
|
|||||||
extern Datum continuous_agg_refresh_chunk(PG_FUNCTION_ARGS);
|
extern Datum continuous_agg_refresh_chunk(PG_FUNCTION_ARGS);
|
||||||
extern void continuous_agg_calculate_merged_refresh_window(
|
extern void continuous_agg_calculate_merged_refresh_window(
|
||||||
const InternalTimeRange *refresh_window, const InvalidationStore *invalidations,
|
const InternalTimeRange *refresh_window, const InvalidationStore *invalidations,
|
||||||
const int64 max_bucket_width, const ContinuousAggsBucketFunction *bucket_function,
|
const int64 bucket_width, const ContinuousAggsBucketFunction *bucket_function,
|
||||||
InternalTimeRange *merged_refresh_window);
|
InternalTimeRange *merged_refresh_window);
|
||||||
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
|
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
|
||||||
const InternalTimeRange *refresh_window,
|
const InternalTimeRange *refresh_window,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user