diff --git a/tsl/src/continuous_aggs/invalidation.c b/tsl/src/continuous_aggs/invalidation.c index 054dae3c3..e54574da8 100644 --- a/tsl/src/continuous_aggs/invalidation.c +++ b/tsl/src/continuous_aggs/invalidation.c @@ -361,7 +361,7 @@ cut_invalidation_along_refresh_window(const CaggInvalidationState *state, /* * Entry completely enclosed so can be deleted: * - * |---------------| + * [---------------) * [+++++] */ @@ -380,7 +380,7 @@ cut_invalidation_along_refresh_window(const CaggInvalidationState *state, /* * Need to cut in right end: * - * |------| + * [------) * [++++++] * * [++] @@ -403,13 +403,21 @@ cut_invalidation_along_refresh_window(const CaggInvalidationState *state, if (invalidation->lowest_modified_value < refresh_window->end && invalidation->greatest_modified_value >= refresh_window->end) { + /* + * If the invalidation is already cut on the left above, the reminder is set and + * will be reset here. The assert prevents from losing information from the reminder. + */ + Assert((result == INVAL_CUT && + remainder->lowest_modified_value == refresh_window->start) || + result == INVAL_NOMATCH); + /* * Need to cut in left end: * - * |------| + * [------) * [++++++++] * - * [+++] + * [++++] */ upper = create_invalidation_tup(tupdesc, cagg_hyper_id, diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index 82f6e5aad..e5df40836 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -78,50 +78,134 @@ get_largest_bucketed_window(Oid timetype, int64 bucket_width) } /* - * Adjust the refresh window to align with buckets in an inclusive manner. + * Adjust the refresh window to align with inscribed buckets, so it includes buckets, which are + * fully covered by the refresh window. * - * It is OK to refresh more than the given refresh window, but not less. Since - * we can only refresh along bucket boundaries, we need to adjust the refresh - * window to be inclusive in both ends to be able to refresh the given - * region. For example, if the dotted region below is the original window, the - * adjusted refresh window includes all four buckets shown. + * Bucketing refresh window is necessary for a continuous aggregate refresh, which can refresh only + * entire buckets. The result of the function is a bucketed window, where its start is at the start + * of the first bucket, which is fully inside the refresh window, and its end is at the end of the + * last fully covered bucket. * - * | ....|.....|.. | + * Example1, the window needs to shrink: + * [---------) - given refresh window + * .|....|....|....|. - buckets + * [----) - inscribed bucketed window + * + * Example2, the window is already aligned: + * [----) - given refresh window + * .|....|....|....|. - buckets + * [----) - inscribed bucketed window + * + * This function is called for the continuous aggregate policy and manual refresh. In such case + * excluding buckets, which are not fully covered by the refresh window, avoids refreshing a bucket, + * where part of its data were dropped by a retention policy. See #2198 for details. */ static InternalTimeRange -compute_bucketed_refresh_window(const InternalTimeRange *refresh_window, int64 bucket_width) +compute_inscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window, + const int64 bucket_width) { InternalTimeRange result = *refresh_window; InternalTimeRange largest_bucketed_window = get_largest_bucketed_window(refresh_window->type, bucket_width); - if (result.start <= largest_bucketed_window.start) + if (refresh_window->start <= largest_bucketed_window.start) + { result.start = largest_bucketed_window.start; - else - result.start = ts_time_bucket_by_type(bucket_width, result.start, result.type); - - if (result.end >= largest_bucketed_window.end) - result.end = largest_bucketed_window.end; + } else { - int64 exclusive_end = result.end; + /* The start time needs to be aligned with the first fully enclosed bucket. + * So the original window start is moved to next bucket, except if the start is + * already aligned with a bucket, thus 1 is subtracted to avoid moving into next + * bucket in the aligned case. */ + int64 included_bucket = + ts_time_saturating_add(refresh_window->start, bucket_width - 1, refresh_window->type); + /* Get the start of the included bucket. */ + result.start = ts_time_bucket_by_type(bucket_width, included_bucket, refresh_window->type); + } + + if (refresh_window->end >= largest_bucketed_window.end) + { + result.end = largest_bucketed_window.end; + } + else + { + /* The window is reduced to the beginning of the bucket, which contains the exclusive + * end of the refresh window. */ + result.end = + ts_time_bucket_by_type(bucket_width, refresh_window->end, refresh_window->type); + } + return result; +} + +/* + * Adjust the refresh window to align with circumscribed buckets, so it includes buckets, which + * fully cover the refresh window. + * + * Bucketing refresh window is necessary for a continuous aggregate refresh, which can refresh only + * entire buckets. The result of the function is a bucketed window, where its start is at the start + * of a bucket, which contains the start of the refresh window, and its end is at the end of a + * bucket, which contains the end of the refresh window. + * + * Example1, the window needs to expand: + * [---------) - given refresh window + * .|....|....|....|. - buckets + * [--------------) - circumscribed bucketed window + * + * Example2, the window is already aligned: + * [----) - given refresh window + * .|....|....|....|. - buckets + * [----) - inscribed bucketed window + * + * This function is called for an invalidation window before refreshing it and after the + * invalidation window was adjusted to be fully inside a refresh window. In the case of a + * continuous aggregate policy or manual refresh, the refresh window is the inscribed bucketed + * window. + * + * The circumscribed behaviour is also used for a refresh on drop, when the refresh is called during + * dropping chunks manually or as part of retention policy. + */ +static InternalTimeRange +compute_circumscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window, + const int64 bucket_width) +{ + InternalTimeRange result = *refresh_window; + InternalTimeRange largest_bucketed_window = + get_largest_bucketed_window(refresh_window->type, bucket_width); + + if (refresh_window->start <= largest_bucketed_window.start) + { + result.start = largest_bucketed_window.start; + } + else + { + /* For alignment with a bucket, which includes the start of the refresh window, we just + * need to get start of the bucket. */ + result.start = + ts_time_bucket_by_type(bucket_width, refresh_window->start, refresh_window->type); + } + + if (refresh_window->end >= largest_bucketed_window.end) + { + result.end = largest_bucketed_window.end; + } + else + { + int64 exclusive_end; int64 bucketed_end; + Assert(refresh_window->end > result.start); + /* The end of the window is non-inclusive so subtract one before * bucketing in case we're already at the end of the bucket (we don't - * want to add an extra bucket). But we also don't want to subtract if - * we are at the start of the bucket (we don't want to remove a - * bucket). The last */ - if (result.end > result.start) - exclusive_end = ts_time_saturating_sub(result.end, 1, result.type); - - bucketed_end = ts_time_bucket_by_type(bucket_width, exclusive_end, result.type); + * want to add an extra bucket). */ + exclusive_end = ts_time_saturating_sub(refresh_window->end, 1, refresh_window->type); + bucketed_end = ts_time_bucket_by_type(bucket_width, exclusive_end, refresh_window->type); /* We get the time value for the start of the bucket, so need to add - * bucket_width to get the end of it */ + * bucket_width to get the end of it. */ result.end = ts_time_saturating_add(bucketed_end, bucket_width, refresh_window->type); } - return result; } @@ -235,7 +319,7 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg, }; InternalTimeRange bucketed_refresh_window = - compute_bucketed_refresh_window(&invalidation, cagg->data.bucket_width); + compute_circumscribed_bucketed_refresh_window(&invalidation, cagg->data.bucket_width); log_refresh_window(DEBUG1, cagg, &bucketed_refresh_window, "invalidation refresh on"); continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window); @@ -336,9 +420,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg, Assert(OidIsValid(cagg->relid)); ereport(NOTICE, (errmsg("refreshing continuous aggregate \"%s\"", get_rel_name(cagg->relid)), - errhint( - "Use WITH NO DATA if you do not want to refresh the continuous aggregate " - "on creation."))); + errhint("Use WITH NO DATA if you do not want to refresh the continuous " + "aggregate on creation."))); } continuous_agg_refresh_with_window(cagg, refresh_window, invalidations); invalidation_store_free(invalidations); @@ -381,7 +464,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, errmsg("invalid refresh window"), errhint("The start of the window must be before the end."))); - refresh_window = compute_bucketed_refresh_window(refresh_window_arg, cagg->data.bucket_width); + refresh_window = + compute_inscribed_bucketed_refresh_window(refresh_window_arg, cagg->data.bucket_width); log_refresh_window(DEBUG1, cagg, &refresh_window, "refreshing continuous aggregate"); /* Perform the refresh across two transactions. @@ -419,17 +503,15 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, * won't be refreshed when the threshold is moved forward in the * future. The invalidation threshold should already be aligned on bucket * boundary. */ - if (refresh_window.end > invalidation_threshold) - { + if (refresh_window_arg->end > invalidation_threshold) refresh_window.end = invalidation_threshold; - /* Capping the end might have made the window 0, or negative, so - * nothing to refresh in that case */ - if (refresh_window.start >= refresh_window.end) - { - emit_up_to_date_notice(cagg); - return; - } + /* Capping the end might have made the window 0, or negative, so + * nothing to refresh in that case */ + if (refresh_window.start >= refresh_window.end) + { + emit_up_to_date_notice(cagg); + return; } /* Process invalidations in the hypertable invalidation log */ diff --git a/tsl/test/expected/continuous_aggs_invalidation.out b/tsl/test/expected/continuous_aggs_invalidation.out index 034a3f2be..680d92a0f 100644 --- a/tsl/test/expected/continuous_aggs_invalidation.out +++ b/tsl/test/expected/continuous_aggs_invalidation.out @@ -158,7 +158,7 @@ SELECT * FROM cagg_invals; 5 | -9223372036854775808 | 9223372036854775807 (3 rows) --- Now refresh up to 50, and the threshold should be updated accordingly: +-- Now refresh up to 50 without the first bucket, and the threshold should be updated accordingly: CALL refresh_continuous_aggregate('cond_10', 1, 50); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; @@ -167,7 +167,18 @@ ORDER BY 1,2; 1 | 50 (1 row) --- Invalidations should be cleared for the refresh window: +-- Invalidations should be cleared inside the refresh window: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | 9 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | 9223372036854775807 +(4 rows) + +-- Refresh up to 50 from the beginning +CALL refresh_continuous_aggregate('cond_10', 0, 50); SELECT * FROM cagg_invals; cagg_id | start | end ---------+----------------------+--------------------- @@ -199,7 +210,7 @@ SELECT * FROM cagg_invals; (4 rows) -- Refreshing measure_10 moves the threshold only for the other hypertable: -CALL refresh_continuous_aggregate('measure_10', 1, 30); +CALL refresh_continuous_aggregate('measure_10', 0, 30); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; hypertable_id | watermark @@ -412,7 +423,7 @@ SELECT * FROM cagg_invals; (7 rows) -- Refresh cond_10 to completely remove an invalidation: -CALL refresh_continuous_aggregate('cond_10', 1, 20); +CALL refresh_continuous_aggregate('cond_10', 0, 20); -- The 1-19 invalidation should be deleted: SELECT * FROM cagg_invals; cagg_id | start | end @@ -869,14 +880,15 @@ ORDER BY 1,2; INSERT INTO threshold_test SELECT v, v FROM generate_series(1, 10) v; CALL refresh_continuous_aggregate('thresh_2', 0, 5); --- Threshold should move to end of refresh window (note that window --- expands to end of bucket). +-- Threshold should move to end of the last refreshed bucket, which is +-- the last bucket fully included in the window, i.e., the window +-- shrinks to end of previous bucket. SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id ORDER BY 1,2; hypertable_id | watermark ---------------+----------- - 7 | 6 + 7 | 4 (1 row) -- Refresh where both the start and end of the window is above the diff --git a/tsl/test/expected/continuous_aggs_policy.out b/tsl/test/expected/continuous_aggs_policy.out index 916b7ae8b..8a954ea70 100644 --- a/tsl/test/expected/continuous_aggs_policy.out +++ b/tsl/test/expected/continuous_aggs_policy.out @@ -146,6 +146,7 @@ WHERE id = :job_id; INSERT INTO continuous_agg_max_mat_date SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day'); CALL run_job(:job_id); +NOTICE: continuous aggregate "max_mat_view_date" is already up-to-date DROP MATERIALIZED VIEW max_mat_view_date; CREATE TABLE continuous_agg_timestamp(time TIMESTAMP); SELECT create_hypertable('continuous_agg_timestamp', 'time'); diff --git a/tsl/test/expected/continuous_aggs_refresh.out b/tsl/test/expected/continuous_aggs_refresh.out index 4bffef469..73ba3bd41 100644 --- a/tsl/test/expected/continuous_aggs_refresh.out +++ b/tsl/test/expected/continuous_aggs_refresh.out @@ -59,7 +59,7 @@ ORDER BY day DESC, device; (0 rows) -- Refresh the most recent few days: -CALL refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-05'); +CALL refresh_continuous_aggregate('daily_temp', '2020-05-02', '2020-05-05 17:00'); SELECT * FROM daily_temp ORDER BY day DESC, device; day | device | avg_temp @@ -80,7 +80,7 @@ ORDER BY day DESC, device; -- Refresh the rest (and try DEBUG output) SET client_min_messages TO DEBUG1; -CALL refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-03'); +CALL refresh_continuous_aggregate('daily_temp', '2020-04-30', '2020-05-04'); DEBUG: refreshing continuous aggregate "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sun May 03 17:00:00 2020 PDT ] DEBUG: hypertable 1 existing watermark >= new invalidation threshold 1588723200000000 1588550400000000 DEBUG: invalidation refresh on "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sat May 02 17:00:00 2020 PDT ] @@ -243,7 +243,7 @@ AS SELECT time_bucket(SMALLINT '20', time) AS bucket, device, avg(temp) AS avg_temp FROM conditions_smallint c GROUP BY 1,2 WITH NO DATA; -CALL refresh_continuous_aggregate('cond_20_smallint', 5::smallint, 50::smallint); +CALL refresh_continuous_aggregate('cond_20_smallint', 0::smallint, 70::smallint); SELECT * FROM cond_20_smallint ORDER BY 1,2; bucket | device | avg_temp @@ -294,7 +294,7 @@ AS SELECT time_bucket(INT '20', time) AS bucket, device, avg(temp) AS avg_temp FROM conditions_int GROUP BY 1,2 WITH NO DATA; -CALL refresh_continuous_aggregate('cond_20_int', 5, 50); +CALL refresh_continuous_aggregate('cond_20_int', 0, 65); SELECT * FROM cond_20_int ORDER BY 1,2; bucket | device | avg_temp @@ -345,7 +345,7 @@ AS SELECT time_bucket(BIGINT '20', time) AS bucket, device, avg(temp) AS avg_temp FROM conditions_bigint GROUP BY 1,2 WITH NO DATA; -CALL refresh_continuous_aggregate('cond_20_bigint', 5, 50); +CALL refresh_continuous_aggregate('cond_20_bigint', 0, 75); SELECT * FROM cond_20_bigint ORDER BY 1,2; bucket | device | avg_temp diff --git a/tsl/test/isolation/expected/continuous_aggs_concurrent_refresh.out b/tsl/test/isolation/expected/continuous_aggs_concurrent_refresh.out index 6493a6048..af1b7c9d2 100644 --- a/tsl/test/isolation/expected/continuous_aggs_concurrent_refresh.out +++ b/tsl/test/isolation/expected/continuous_aggs_concurrent_refresh.out @@ -2,7 +2,7 @@ Parsed test spec with 8 sessions starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table step R1_refresh: - CALL refresh_continuous_aggregate('cond_10', 35, 62); + CALL refresh_continuous_aggregate('cond_10', 25, 70); step S1_select: SELECT bucket, avg_temp @@ -31,7 +31,7 @@ hypertable threshold conditions 70 step R3_refresh: - CALL refresh_continuous_aggregate('cond_10', 71, 97); + CALL refresh_continuous_aggregate('cond_10', 70, 107); step S1_select: SELECT bucket, avg_temp @@ -78,7 +78,7 @@ step L2_read_lock_threshold_table: IN ACCESS SHARE MODE; step R3_refresh: - CALL refresh_continuous_aggregate('cond_10', 71, 97); + CALL refresh_continuous_aggregate('cond_10', 70, 107); step L2_read_unlock_threshold_table: ROLLBACK; @@ -118,14 +118,14 @@ step L1_unlock_threshold_table: starting permutation: R1_refresh L2_read_lock_threshold_table R3_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table step R1_refresh: - CALL refresh_continuous_aggregate('cond_10', 35, 62); + CALL refresh_continuous_aggregate('cond_10', 25, 70); step L2_read_lock_threshold_table: LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold IN ACCESS SHARE MODE; step R3_refresh: - CALL refresh_continuous_aggregate('cond_10', 71, 97); + CALL refresh_continuous_aggregate('cond_10', 70, 107); step L2_read_unlock_threshold_table: ROLLBACK; @@ -169,14 +169,14 @@ step L1_unlock_threshold_table: starting permutation: R3_refresh L2_read_lock_threshold_table R1_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table step R3_refresh: - CALL refresh_continuous_aggregate('cond_10', 71, 97); + CALL refresh_continuous_aggregate('cond_10', 70, 107); step L2_read_lock_threshold_table: LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold IN ACCESS SHARE MODE; step R1_refresh: - CALL refresh_continuous_aggregate('cond_10', 35, 62); + CALL refresh_continuous_aggregate('cond_10', 25, 70); step L2_read_unlock_threshold_table: ROLLBACK; @@ -226,7 +226,7 @@ lock_cagg step R1_refresh: - CALL refresh_continuous_aggregate('cond_10', 35, 62); + CALL refresh_continuous_aggregate('cond_10', 25, 70); step L3_unlock_cagg_table: ROLLBACK; @@ -273,7 +273,7 @@ lock_cagg step R1_refresh: - CALL refresh_continuous_aggregate('cond_10', 35, 62); + CALL refresh_continuous_aggregate('cond_10', 25, 70); step R2_refresh: CALL refresh_continuous_aggregate('cond_10', 35, 62); @@ -325,10 +325,10 @@ lock_cagg step R1_refresh: - CALL refresh_continuous_aggregate('cond_10', 35, 62); + CALL refresh_continuous_aggregate('cond_10', 25, 70); step R3_refresh: - CALL refresh_continuous_aggregate('cond_10', 71, 97); + CALL refresh_continuous_aggregate('cond_10', 70, 107); step L3_unlock_cagg_table: ROLLBACK; @@ -379,7 +379,7 @@ lock_cagg step R3_refresh: - CALL refresh_continuous_aggregate('cond_10', 71, 97); + CALL refresh_continuous_aggregate('cond_10', 70, 107); step R4_refresh: CALL refresh_continuous_aggregate('cond_20', 39, 84); diff --git a/tsl/test/isolation/specs/continuous_aggs_concurrent_refresh.spec b/tsl/test/isolation/specs/continuous_aggs_concurrent_refresh.spec index 51a2f211d..4bb89472e 100644 --- a/tsl/test/isolation/specs/continuous_aggs_concurrent_refresh.spec +++ b/tsl/test/isolation/specs/continuous_aggs_concurrent_refresh.spec @@ -128,7 +128,7 @@ setup } step "R1_refresh" { - CALL refresh_continuous_aggregate('cond_10', 35, 62); + CALL refresh_continuous_aggregate('cond_10', 25, 70); } @@ -154,7 +154,7 @@ setup } step "R3_refresh" { - CALL refresh_continuous_aggregate('cond_10', 71, 97); + CALL refresh_continuous_aggregate('cond_10', 70, 107); } # Overlapping refresh on another continuous aggregate (cond_20) diff --git a/tsl/test/sql/continuous_aggs_invalidation.sql b/tsl/test/sql/continuous_aggs_invalidation.sql index 6b56f6275..35aa18820 100644 --- a/tsl/test/sql/continuous_aggs_invalidation.sql +++ b/tsl/test/sql/continuous_aggs_invalidation.sql @@ -110,12 +110,16 @@ ORDER BY 1,2; -- invalidation log: SELECT * FROM cagg_invals; --- Now refresh up to 50, and the threshold should be updated accordingly: +-- Now refresh up to 50 without the first bucket, and the threshold should be updated accordingly: CALL refresh_continuous_aggregate('cond_10', 1, 50); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; --- Invalidations should be cleared for the refresh window: +-- Invalidations should be cleared inside the refresh window: +SELECT * FROM cagg_invals; + +-- Refresh up to 50 from the beginning +CALL refresh_continuous_aggregate('cond_10', 0, 50); SELECT * FROM cagg_invals; -- Refreshing below the threshold does not move it: @@ -128,7 +132,7 @@ ORDER BY 1,2; SELECT * FROM cagg_invals; -- Refreshing measure_10 moves the threshold only for the other hypertable: -CALL refresh_continuous_aggregate('measure_10', 1, 30); +CALL refresh_continuous_aggregate('measure_10', 0, 30); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; SELECT * FROM cagg_invals; @@ -218,7 +222,7 @@ CALL refresh_continuous_aggregate('cond_20', 20, 60); SELECT * FROM cagg_invals; -- Refresh cond_10 to completely remove an invalidation: -CALL refresh_continuous_aggregate('cond_10', 1, 20); +CALL refresh_continuous_aggregate('cond_10', 0, 20); -- The 1-19 invalidation should be deleted: SELECT * FROM cagg_invals; @@ -479,8 +483,9 @@ SELECT v, v FROM generate_series(1, 10) v; CALL refresh_continuous_aggregate('thresh_2', 0, 5); --- Threshold should move to end of refresh window (note that window --- expands to end of bucket). +-- Threshold should move to end of the last refreshed bucket, which is +-- the last bucket fully included in the window, i.e., the window +-- shrinks to end of previous bucket. SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id ORDER BY 1,2; diff --git a/tsl/test/sql/continuous_aggs_refresh.sql b/tsl/test/sql/continuous_aggs_refresh.sql index 000af184d..74ec53f87 100644 --- a/tsl/test/sql/continuous_aggs_refresh.sql +++ b/tsl/test/sql/continuous_aggs_refresh.sql @@ -34,14 +34,14 @@ SELECT * FROM daily_temp ORDER BY day DESC, device; -- Refresh the most recent few days: -CALL refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-05'); +CALL refresh_continuous_aggregate('daily_temp', '2020-05-02', '2020-05-05 17:00'); SELECT * FROM daily_temp ORDER BY day DESC, device; -- Refresh the rest (and try DEBUG output) SET client_min_messages TO DEBUG1; -CALL refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-03'); +CALL refresh_continuous_aggregate('daily_temp', '2020-04-30', '2020-05-04'); RESET client_min_messages; -- Compare the aggregate to the equivalent query on the source table @@ -140,7 +140,7 @@ SELECT time_bucket(SMALLINT '20', time) AS bucket, device, avg(temp) AS avg_temp FROM conditions_smallint c GROUP BY 1,2 WITH NO DATA; -CALL refresh_continuous_aggregate('cond_20_smallint', 5::smallint, 50::smallint); +CALL refresh_continuous_aggregate('cond_20_smallint', 0::smallint, 70::smallint); SELECT * FROM cond_20_smallint ORDER BY 1,2; @@ -173,7 +173,7 @@ SELECT time_bucket(INT '20', time) AS bucket, device, avg(temp) AS avg_temp FROM conditions_int GROUP BY 1,2 WITH NO DATA; -CALL refresh_continuous_aggregate('cond_20_int', 5, 50); +CALL refresh_continuous_aggregate('cond_20_int', 0, 65); SELECT * FROM cond_20_int ORDER BY 1,2; @@ -206,7 +206,7 @@ SELECT time_bucket(BIGINT '20', time) AS bucket, device, avg(temp) AS avg_temp FROM conditions_bigint GROUP BY 1,2 WITH NO DATA; -CALL refresh_continuous_aggregate('cond_20_bigint', 5, 50); +CALL refresh_continuous_aggregate('cond_20_bigint', 0, 75); SELECT * FROM cond_20_bigint ORDER BY 1,2;