Use NULL in CAgg bucket function catalog table

Historically, we have used an empty string for undefined values in the
catalog table continuous_aggs_bucket_function. Since #6624, the optional
arguments can be NULL. This patch cleans up the empty strings and
changes the logic to work with NULL values.
This commit is contained in:
Jan Nidzwetzki 2024-02-21 13:12:24 +01:00 committed by Jan Nidzwetzki
parent 17d424f849
commit fdf3aa3bfa
7 changed files with 94 additions and 50 deletions

View File

@ -146,3 +146,8 @@ DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_get_bucket_function(INTEGER)
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function UPDATE _timescaledb_catalog.continuous_aggs_bucket_function
SET bucket_origin = bucket_origin::timestamp::timestamptz::text SET bucket_origin = bucket_origin::timestamp::timestamptz::text
WHERE length(bucket_origin) > 1; WHERE length(bucket_origin) > 1;
-- Historically, we have used empty strings for undefined bucket_origin and timezone
-- attributes. This is now replaced by proper NULL values. We use TRIM() to ensure we handle empty string well.
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function SET bucket_origin = NULL WHERE TRIM(bucket_origin) = '';
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function SET bucket_timezone = NULL WHERE TRIM(bucket_timezone) = '';

View File

@ -3,6 +3,9 @@ DROP FUNCTION IF EXISTS _timescaledb_functions.remove_dropped_chunk_metadata(INT
-- --
-- Rebuild the catalog table `_timescaledb_catalog.continuous_aggs_bucket_function` -- Rebuild the catalog table `_timescaledb_catalog.continuous_aggs_bucket_function`
-- --
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function SET bucket_origin = '' WHERE bucket_origin IS NULL;
UPDATE _timescaledb_catalog.continuous_aggs_bucket_function SET bucket_timezone = '' WHERE bucket_timezone IS NULL;
CREATE TABLE _timescaledb_catalog._tmp_continuous_aggs_bucket_function AS CREATE TABLE _timescaledb_catalog._tmp_continuous_aggs_bucket_function AS
SELECT SELECT
mat_hypertable_id, mat_hypertable_id,

View File

@ -413,12 +413,10 @@ continuous_agg_fill_bucket_function(int32 mat_hypertable_id, ContinuousAggsBucke
init_scan_cagg_bucket_function_by_mat_hypertable_id(&iterator, mat_hypertable_id); init_scan_cagg_bucket_function_by_mat_hypertable_id(&iterator, mat_hypertable_id);
ts_scanner_foreach(&iterator) ts_scanner_foreach(&iterator)
{ {
const char *bucket_width_str;
const char *origin_str;
Datum values[Natts_continuous_aggs_bucket_function]; Datum values[Natts_continuous_aggs_bucket_function];
bool isnull[Natts_continuous_aggs_bucket_function]; bool isnull[Natts_continuous_aggs_bucket_function];
bool should_free; bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free); HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
/* /*
@ -427,6 +425,7 @@ continuous_agg_fill_bucket_function(int32 mat_hypertable_id, ContinuousAggsBucke
*/ */
heap_deform_tuple(tuple, ts_scan_iterator_tupledesc(&iterator), values, isnull); heap_deform_tuple(tuple, ts_scan_iterator_tupledesc(&iterator), values, isnull);
/* Bucket function */
Assert(!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_function)]); Assert(!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_function)]);
bf->bucket_function = DatumGetObjectId( bf->bucket_function = DatumGetObjectId(
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_function)]); values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_function)]);
@ -434,34 +433,41 @@ continuous_agg_fill_bucket_function(int32 mat_hypertable_id, ContinuousAggsBucke
Assert(OidIsValid(bf->bucket_function)); Assert(OidIsValid(bf->bucket_function));
/* /*
* So far bucket_width is stored as TEXT for flexibility, but it's type * bucket_width
* most likely is going to change to Interval when the variable-sized *
* buckets feature will stabilize. * The value is stored as TEXT since we have to store the interval value of time
* buckets and also the number value of integer based buckets.
*/ */
Assert(!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)]); Assert(!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)]);
bucket_width_str = TextDatumGetCString( const char *bucket_width_str = TextDatumGetCString(
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)]); values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)]);
Assert(strlen(bucket_width_str) > 0); Assert(strlen(bucket_width_str) > 0);
bf->bucket_width = DatumGetIntervalP( bf->bucket_width = DatumGetIntervalP(
DirectFunctionCall3(interval_in, CStringGetDatum(bucket_width_str), InvalidOid, -1)); DirectFunctionCall3(interval_in, CStringGetDatum(bucket_width_str), InvalidOid, -1));
Assert( /* Bucket origin */
!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)]); if (!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)])
origin_str = TextDatumGetCString( {
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)]); const char *origin_str = TextDatumGetCString(values[AttrNumberGetAttrOffset(
if (strlen(origin_str) == 0) Anum_continuous_aggs_bucket_function_bucket_origin)]);
TIMESTAMP_NOBEGIN(bf->bucket_origin);
else
bf->bucket_origin = DatumGetTimestamp(DirectFunctionCall3(timestamptz_in, bf->bucket_origin = DatumGetTimestamp(DirectFunctionCall3(timestamptz_in,
CStringGetDatum(origin_str), CStringGetDatum(origin_str),
ObjectIdGetDatum(InvalidOid), ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1))); Int32GetDatum(-1)));
}
else
{
TIMESTAMP_NOBEGIN(bf->bucket_origin);
}
Assert( /* Bucket timezone */
!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)]); if (!isnull[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)])
bf->timezone = TextDatumGetCString( {
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)]); bf->timezone = TextDatumGetCString(values[AttrNumberGetAttrOffset(
Anum_continuous_aggs_bucket_function_bucket_timezone)]);
}
/* Bucket fixed width */
Assert(!isnull[AttrNumberGetAttrOffset( Assert(!isnull[AttrNumberGetAttrOffset(
Anum_continuous_aggs_bucket_function_bucket_fixed_width)]); Anum_continuous_aggs_bucket_function_bucket_fixed_width)]);
bf->bucket_fixed_interval = DatumGetBool(values[AttrNumberGetAttrOffset( bf->bucket_fixed_interval = DatumGetBool(values[AttrNumberGetAttrOffset(
@ -1351,16 +1357,13 @@ ts_continuous_agg_bucket_width(const ContinuousAgg *agg)
static Datum static Datum
generic_time_bucket(const ContinuousAggsBucketFunction *bf, Datum timestamp) generic_time_bucket(const ContinuousAggsBucketFunction *bf, Datum timestamp)
{ {
/* bf->timezone can't be NULL. If timezone is not specified, "" is stored */
Assert(bf->timezone != NULL);
FuncInfo *func_info = ts_func_cache_get_bucketing_func(bf->bucket_function); FuncInfo *func_info = ts_func_cache_get_bucketing_func(bf->bucket_function);
Ensure(func_info != NULL, "unable to get bucket function for Oid %d", bf->bucket_function); Ensure(func_info != NULL, "unable to get bucket function for Oid %d", bf->bucket_function);
bool is_experimental = func_info->origin == ORIGIN_TIMESCALE_EXPERIMENTAL; bool is_experimental = func_info->origin == ORIGIN_TIMESCALE_EXPERIMENTAL;
if (!is_experimental) if (!is_experimental)
{ {
if (strlen(bf->timezone) > 0) if (bf->timezone != NULL)
{ {
if (TIMESTAMP_NOT_FINITE(bf->bucket_origin)) if (TIMESTAMP_NOT_FINITE(bf->bucket_origin))
{ {
@ -1399,7 +1402,7 @@ generic_time_bucket(const ContinuousAggsBucketFunction *bf, Datum timestamp)
} }
else else
{ {
if (strlen(bf->timezone) > 0) if (bf->timezone != NULL)
{ {
if (TIMESTAMP_NOT_FINITE(bf->bucket_origin)) if (TIMESTAMP_NOT_FINITE(bf->bucket_origin))
{ {
@ -1449,12 +1452,7 @@ static Datum
generic_add_interval(const ContinuousAggsBucketFunction *bf, Datum timestamp) generic_add_interval(const ContinuousAggsBucketFunction *bf, Datum timestamp)
{ {
Datum tzname = 0; Datum tzname = 0;
bool has_timezone; bool has_timezone = (bf->timezone != NULL);
/* bf->timezone can't be NULL. If timezone is not specified, "" is stored */
Assert(bf->timezone != NULL);
has_timezone = (strlen(bf->timezone) > 0);
if (has_timezone) if (has_timezone)
{ {

View File

@ -93,7 +93,7 @@ typedef struct ContinuousAggsBucketFunction
TimestampTz bucket_origin; TimestampTz bucket_origin;
/* `bucket_offset` argument of the function. */ /* `bucket_offset` argument of the function. */
Interval *bucket_offest; Interval *bucket_offset;
/* `timezone` argument of the function provided by the user. */ /* `timezone` argument of the function provided by the user. */
char *timezone; char *timezone;

View File

@ -188,8 +188,8 @@ create_cagg_catalog_entry(int32 matht_id, int32 rawht_id, const char *user_schem
*/ */
static void static void
create_bucket_function_catalog_entry(int32 matht_id, Oid bucket_function, const char *bucket_width, create_bucket_function_catalog_entry(int32 matht_id, Oid bucket_function, const char *bucket_width,
const char *origin, const char *offset, const char *timezone, const char *bucket_origin, const char *bucket_offset,
const bool bucket_fixed_width) const char *bucket_timezone, const bool bucket_fixed_width)
{ {
Catalog *catalog = ts_catalog_get(); Catalog *catalog = ts_catalog_get();
Relation rel; Relation rel;
@ -203,18 +203,53 @@ create_bucket_function_catalog_entry(int32 matht_id, Oid bucket_function, const
desc = RelationGetDescr(rel); desc = RelationGetDescr(rel);
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
/* Hypertable ID */
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_mat_hypertable_id)] = values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_mat_hypertable_id)] =
matht_id; matht_id;
/* Bucket function */
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_function)] = values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_function)] =
ObjectIdGetDatum(bucket_function); ObjectIdGetDatum(bucket_function);
/* Bucket width */
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)] = values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)] =
CStringGetTextDatum(bucket_width); CStringGetTextDatum(bucket_width);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)] =
CStringGetTextDatum(origin); /* Bucket origin */
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_offset)] = if (bucket_origin != NULL)
CStringGetTextDatum(offset); {
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)] = values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)] =
CStringGetTextDatum(timezone ? timezone : ""); CStringGetTextDatum(bucket_origin);
}
else
{
nulls[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_origin)] = true;
}
/* Bucket offset */
if (bucket_offset != NULL)
{
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_offset)] =
CStringGetTextDatum(bucket_offset);
}
else
{
nulls[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_offset)] = true;
}
/* Bucket timezone */
if (bucket_timezone != NULL)
{
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)] =
CStringGetTextDatum(bucket_timezone);
}
else
{
nulls[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_timezone)] = true;
}
/* Bucket fixed width */
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_fixed_width)] = values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_fixed_width)] =
BoolGetDatum(bucket_fixed_width); BoolGetDatum(bucket_fixed_width);
@ -784,28 +819,27 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer
if (bucket_info->bucket_width == BUCKET_WIDTH_VARIABLE) if (bucket_info->bucket_width == BUCKET_WIDTH_VARIABLE)
{ {
const char *bucket_width; const char *bucket_origin = NULL;
const char *origin = ""; const char *bucket_offset = NULL;
const char *offset = "";
/* /*
* Variable-sized buckets work only with intervals. * Variable-sized buckets work only with intervals.
*/ */
Assert(bucket_info->interval != NULL); Assert(bucket_info->interval != NULL);
bucket_width = DatumGetCString( const char *bucket_width = DatumGetCString(
DirectFunctionCall1(interval_out, IntervalPGetDatum(bucket_info->interval))); DirectFunctionCall1(interval_out, IntervalPGetDatum(bucket_info->interval)));
if (!TIMESTAMP_NOT_FINITE(bucket_info->origin)) if (!TIMESTAMP_NOT_FINITE(bucket_info->origin))
{ {
origin = DatumGetCString( bucket_origin = DatumGetCString(
DirectFunctionCall1(timestamptz_out, TimestampTzGetDatum(bucket_info->origin))); DirectFunctionCall1(timestamptz_out, TimestampTzGetDatum(bucket_info->origin)));
} }
create_bucket_function_catalog_entry(materialize_hypertable_id, create_bucket_function_catalog_entry(materialize_hypertable_id,
bucket_info->bucket_func->funcid, bucket_info->bucket_func->funcid,
bucket_width, bucket_width,
origin, bucket_origin,
offset, bucket_offset,
bucket_info->timezone, bucket_info->timezone,
bucket_info->bucket_width != BUCKET_WIDTH_VARIABLE); bucket_info->bucket_width != BUCKET_WIDTH_VARIABLE);
} }

View File

@ -70,14 +70,16 @@ WHERE mat_hypertable_id = :cagg_id;
-1 -1
(1 row) (1 row)
SELECT bucket_func, bucket_width, bucket_origin, bucket_timezone, bucket_fixed_width \pset null <NULL>
SELECT *
FROM _timescaledb_catalog.continuous_aggs_bucket_function FROM _timescaledb_catalog.continuous_aggs_bucket_function
WHERE mat_hypertable_id = :cagg_id; WHERE mat_hypertable_id = :cagg_id;
bucket_func | bucket_width | bucket_origin | bucket_timezone | bucket_fixed_width mat_hypertable_id | bucket_func | bucket_width | bucket_origin | bucket_offset | bucket_timezone | bucket_fixed_width
--------------------------------------------------------+--------------+---------------+-----------------+-------------------- -------------------+--------------------------------------------------------+--------------+---------------+---------------+-----------------+--------------------
timescaledb_experimental.time_bucket_ng(interval,date) | @ 1 mon | | | f 2 | timescaledb_experimental.time_bucket_ng(interval,date) | @ 1 mon | <NULL> | <NULL> | <NULL> | f
(1 row) (1 row)
\pset null ""
-- Check that the saved invalidation threshold is -infinity -- Check that the saved invalidation threshold is -infinity
SELECT _timescaledb_functions.to_timestamp(watermark) SELECT _timescaledb_functions.to_timestamp(watermark)
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold

View File

@ -70,9 +70,11 @@ SELECT bucket_width
FROM _timescaledb_catalog.continuous_agg FROM _timescaledb_catalog.continuous_agg
WHERE mat_hypertable_id = :cagg_id; WHERE mat_hypertable_id = :cagg_id;
SELECT bucket_func, bucket_width, bucket_origin, bucket_timezone, bucket_fixed_width \pset null <NULL>
SELECT *
FROM _timescaledb_catalog.continuous_aggs_bucket_function FROM _timescaledb_catalog.continuous_aggs_bucket_function
WHERE mat_hypertable_id = :cagg_id; WHERE mat_hypertable_id = :cagg_id;
\pset null ""
-- Check that the saved invalidation threshold is -infinity -- Check that the saved invalidation threshold is -infinity
SELECT _timescaledb_functions.to_timestamp(watermark) SELECT _timescaledb_functions.to_timestamp(watermark)