1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-16 10:33:27 +08:00

Fix timestamp out of range

When start or end for a refresh job is null, then it gives an
error while bucketing because start and end are already min and
max timestamp value before bucketing. Hence, skip bucketing for
this case.

Partly fixes 
This commit is contained in:
Rafia Sabih 2022-10-07 16:25:46 +02:00
parent 684637a172
commit 043092a97f
16 changed files with 132 additions and 112 deletions

@ -7,6 +7,7 @@ accidentally triggering the load of a previous DB version.**
## Unreleased
**Bugfixes**
* #4804 Skip bucketing when start or end of refresh job is null
* #4926 Fix corruption when inserting into compressed chunks
## 2.9.2 (2023-01-26)

@ -466,7 +466,8 @@ ts_date_offset_bucket(PG_FUNCTION_ARGS)
/* when working with time_buckets stored in our catalog, we may not know ahead of time which
* bucketing function to use, this function dynamically dispatches to the correct time_bucket_<foo>
* based on an inputted timestamp_type*/
* based on an inputted timestamp_type
*/
TSDLLEXPORT int64
ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid timestamp_type)
{

@ -112,22 +112,22 @@ get_time_from_config(const Dimension *dim, const Jsonb *config, const char *json
}
int64
policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config)
policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config, bool *start_isnull)
{
bool start_isnull;
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_START_OFFSET, &start_isnull);
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_START_OFFSET, start_isnull);
/* interpret NULL as min value for that type */
if (start_isnull)
if (*start_isnull)
return ts_time_get_min(ts_dimension_get_partition_type(dim));
return res;
}
int64
policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config)
policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config, bool *end_isnull)
{
bool end_isnull;
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_END_OFFSET, &end_isnull);
if (end_isnull)
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_END_OFFSET, end_isnull);
if (*end_isnull)
return ts_time_get_end_or_max(ts_dimension_get_partition_type(dim));
return res;
}

@ -18,8 +18,10 @@ extern Datum policy_refresh_cagg_check(PG_FUNCTION_ARGS);
extern Datum policy_refresh_cagg_remove(PG_FUNCTION_ARGS);
int32 policy_continuous_aggregate_get_mat_hypertable_id(const Jsonb *config);
int64 policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config);
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config);
int64 policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config,
bool *start_isnull);
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config,
bool *end_isnull);
bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type,
Datum cmp_interval);
bool policy_refresh_cagg_exists(int32 materialization_id);

@ -336,7 +336,9 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
policy_refresh_cagg_read_and_validate_config(config, &policy_data);
continuous_agg_refresh_internal(policy_data.cagg,
&policy_data.refresh_window,
CAGG_REFRESH_POLICY);
CAGG_REFRESH_POLICY,
policy_data.start_is_null,
policy_data.end_is_null);
return true;
}
@ -349,6 +351,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
const Dimension *open_dim;
Oid dim_type;
int64 refresh_start, refresh_end;
bool start_isnull, end_isnull;
materialization_id = policy_continuous_aggregate_get_mat_hypertable_id(config);
mat_ht = ts_hypertable_get_by_id(materialization_id);
@ -361,8 +364,8 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
open_dim = get_open_dimension_for_hypertable(mat_ht);
dim_type = ts_dimension_get_partition_type(open_dim);
refresh_start = policy_refresh_cagg_get_refresh_start(open_dim, config);
refresh_end = policy_refresh_cagg_get_refresh_end(open_dim, config);
refresh_start = policy_refresh_cagg_get_refresh_start(open_dim, config, &start_isnull);
refresh_end = policy_refresh_cagg_get_refresh_end(open_dim, config, &end_isnull);
if (refresh_start >= refresh_end)
ereport(ERROR,
@ -379,6 +382,8 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
policy_data->refresh_window.start = refresh_start;
policy_data->refresh_window.end = refresh_end;
policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id);
policy_data->start_is_null = start_isnull;
policy_data->end_is_null = end_isnull;
}
}

@ -35,6 +35,7 @@ typedef struct PolicyContinuousAggData
{
InternalTimeRange refresh_window;
ContinuousAgg *cagg;
bool start_is_null, end_is_null;
} PolicyContinuousAggData;
typedef struct PolicyCompressionData

@ -2791,7 +2791,7 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
ts_time_get_min(refresh_window.type);
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);
continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION);
continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION, true, true);
}
return DDL_DONE;
}

@ -573,7 +573,11 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
else
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);
continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_WINDOW);
continuous_agg_refresh_internal(cagg,
&refresh_window,
CAGG_REFRESH_WINDOW,
PG_ARGISNULL(1),
PG_ARGISNULL(2));
PG_RETURN_VOID();
}
@ -698,11 +702,12 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
void
continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window_arg,
const CaggRefreshCallContext callctx)
const CaggRefreshCallContext callctx, const bool start_isnull,
const bool end_isnull)
{
Catalog *catalog = ts_catalog_get();
int32 mat_id = cagg->data.mat_hypertable_id;
InternalTimeRange refresh_window;
InternalTimeRange refresh_window = *refresh_window_arg;
int64 computed_invalidation_threshold;
int64 invalidation_threshold;
bool is_raw_ht_distributed;
@ -732,18 +737,22 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
Hypertable *ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id);
is_raw_ht_distributed = hypertable_is_distributed(ht);
if (ts_continuous_agg_bucket_width_variable(cagg))
/* No bucketing when open ended */
if (!(start_isnull && end_isnull))
{
refresh_window = *refresh_window_arg;
ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
&refresh_window.end,
cagg->bucket_function);
}
else
{
refresh_window =
compute_inscribed_bucketed_refresh_window(refresh_window_arg,
ts_continuous_agg_bucket_width(cagg));
if (ts_continuous_agg_bucket_width_variable(cagg))
{
refresh_window = *refresh_window_arg;
ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start,
&refresh_window.end,
cagg->bucket_function);
}
else
{
refresh_window =
compute_inscribed_bucketed_refresh_window(refresh_window_arg,
ts_continuous_agg_bucket_width(cagg));
}
}
if (refresh_window.start >= refresh_window.end)

@ -28,6 +28,7 @@ extern void continuous_agg_calculate_merged_refresh_window(
InternalTimeRange *merged_refresh_window);
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx);
const CaggRefreshCallContext callctx,
const bool start_isnull, const bool end_isnull);
#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_REFRESH_H */

@ -559,17 +559,16 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(7 rows)
SELECT * FROM hyper_invals;
hyper_id | start | end

@ -619,18 +619,17 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(9 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)
SELECT * FROM hyper_invals;
hyper_id | start | end

@ -619,18 +619,17 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(9 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)
SELECT * FROM hyper_invals;
hyper_id | start | end

@ -619,18 +619,17 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(9 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)
SELECT * FROM hyper_invals;
hyper_id | start | end

@ -619,18 +619,17 @@ SELECT * FROM cagg_invals;
-- Test max refresh window
CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+----------------------
3 | -9223372036854775808 | -9223372036854775801
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(9 rows)
cagg_id | start | end
---------+----------------------+---------------------
3 | 110 | 9223372036854775807
4 | -9223372036854775808 | 19
4 | 0 | 59
4 | 20 | 39
4 | 20 | 59
4 | 60 | 9223372036854775807
5 | -9223372036854775808 | -1
5 | 30 | 9223372036854775807
(8 rows)
SELECT * FROM hyper_invals;
hyper_id | start | end

@ -910,6 +910,15 @@ SELECT create_hypertable(
(17,public,conditions_timestamptz,t)
(1 row)
-- Add some data to the hypertable and make sure it is visible in the cagg
INSERT INTO conditions_timestamptz(tstamp, city, temperature)
SELECT ts, city,
(CASE WHEN city = 'Moscow' THEN 20000 ELSE 10000 END) +
date_part('day', ts at time zone 'MSK')*100 +
date_part('hour', ts at time zone 'MSK')
FROM
generate_series('2022-01-01 00:00:00 MSK' :: timestamptz, '2022-01-02 00:00:00 MSK' :: timestamptz - interval '1 hour', '1 hour') as ts,
unnest(array['Moscow', 'Berlin']) as city;
\set ON_ERROR_STOP 0
-- For monthly buckets origin should be the first day of the month in given timezone
-- 2020-06-02 00:00:00 MSK == 2020-06-01 21:00:00 UTC
@ -941,23 +950,18 @@ SELECT city,
MAX(temperature)
FROM conditions_timestamptz
GROUP BY city, bucket;
NOTICE: continuous aggregate "conditions_summary_timestamptz" is already up-to-date
NOTICE: refreshing continuous aggregate "conditions_summary_timestamptz"
SELECT city, to_char(bucket at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') AS b, min, max
FROM conditions_summary_timestamptz
ORDER BY b, city;
city | b | min | max
------+---+-----+-----
(0 rows)
city | b | min | max
--------+---------------------+-------+-------
Berlin | 2022-01-01 00:00:00 | 10100 | 10111
Moscow | 2022-01-01 00:00:00 | 20100 | 20111
Berlin | 2022-01-01 12:00:00 | 10112 | 10123
Moscow | 2022-01-01 12:00:00 | 20112 | 20123
(4 rows)
-- Add some data to the hypertable and make sure it is visible in the cagg
INSERT INTO conditions_timestamptz(tstamp, city, temperature)
SELECT ts, city,
(CASE WHEN city = 'Moscow' THEN 20000 ELSE 10000 END) +
date_part('day', ts at time zone 'MSK')*100 +
date_part('hour', ts at time zone 'MSK')
FROM
generate_series('2022-01-01 00:00:00 MSK' :: timestamptz, '2022-01-02 00:00:00 MSK' :: timestamptz - interval '1 hour', '1 hour') as ts,
unnest(array['Moscow', 'Berlin']) as city;
-- Check the data
SELECT to_char(tstamp at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') AS ts, city, temperature FROM conditions_timestamptz
ORDER BY ts, city;
@ -1026,6 +1030,7 @@ ORDER BY b, city;
-- Refresh the cagg and make sure that the result of SELECT query didn't change
CALL refresh_continuous_aggregate('conditions_summary_timestamptz', '2022-01-01 00:00:00 MSK', '2022-01-02 00:00:00 MSK');
NOTICE: continuous aggregate "conditions_summary_timestamptz" is already up-to-date
SELECT city, to_char(bucket at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') AS b, min, max
FROM conditions_summary_timestamptz
ORDER BY b, city;

@ -557,6 +557,16 @@ SELECT create_hypertable(
chunk_time_interval => INTERVAL '1 day'
);
-- Add some data to the hypertable and make sure it is visible in the cagg
INSERT INTO conditions_timestamptz(tstamp, city, temperature)
SELECT ts, city,
(CASE WHEN city = 'Moscow' THEN 20000 ELSE 10000 END) +
date_part('day', ts at time zone 'MSK')*100 +
date_part('hour', ts at time zone 'MSK')
FROM
generate_series('2022-01-01 00:00:00 MSK' :: timestamptz, '2022-01-02 00:00:00 MSK' :: timestamptz - interval '1 hour', '1 hour') as ts,
unnest(array['Moscow', 'Berlin']) as city;
\set ON_ERROR_STOP 0
-- For monthly buckets origin should be the first day of the month in given timezone
@ -595,16 +605,6 @@ SELECT city, to_char(bucket at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') AS b, m
FROM conditions_summary_timestamptz
ORDER BY b, city;
-- Add some data to the hypertable and make sure it is visible in the cagg
INSERT INTO conditions_timestamptz(tstamp, city, temperature)
SELECT ts, city,
(CASE WHEN city = 'Moscow' THEN 20000 ELSE 10000 END) +
date_part('day', ts at time zone 'MSK')*100 +
date_part('hour', ts at time zone 'MSK')
FROM
generate_series('2022-01-01 00:00:00 MSK' :: timestamptz, '2022-01-02 00:00:00 MSK' :: timestamptz - interval '1 hour', '1 hour') as ts,
unnest(array['Moscow', 'Berlin']) as city;
-- Check the data
SELECT to_char(tstamp at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') AS ts, city, temperature FROM conditions_timestamptz
ORDER BY ts, city;