Change cagg refresh to cover buckets

Refresh of a continuous aggregate was expanding refresh window to
include buckets, which contain the start and end of the window. This
was leading to refreshing dropped data into the first bucket in the
corner case, when drop_before of a retention policy is the same as
start_offset of a continuous aggregate policy and the last dropped
chunk happens to intersect with the first bucket. See #2198 for
detailed discussion.

A behavior of a refresh, which is called when chunks are dropped, is
not changed, i.e., buckets, which fully cover chunks ot be dropped,
are refreshed if needed (i.e., there were changes in the chunks, which
were not refreshed yet). It can be done separately if needed.

Fixes #2198
This commit is contained in:
Ruslan Fomkin 2020-09-24 14:19:43 +02:00 committed by Ruslan Fomkin
parent 423e8fa527
commit 66c63476e5
9 changed files with 187 additions and 79 deletions

View File

@ -361,7 +361,7 @@ cut_invalidation_along_refresh_window(const CaggInvalidationState *state,
/*
* Entry completely enclosed so can be deleted:
*
* |---------------|
* [---------------)
* [+++++]
*/
@ -380,7 +380,7 @@ cut_invalidation_along_refresh_window(const CaggInvalidationState *state,
/*
* Need to cut in right end:
*
* |------|
* [------)
* [++++++]
*
* [++]
@ -403,13 +403,21 @@ cut_invalidation_along_refresh_window(const CaggInvalidationState *state,
if (invalidation->lowest_modified_value < refresh_window->end &&
invalidation->greatest_modified_value >= refresh_window->end)
{
/*
* If the invalidation is already cut on the left above, the reminder is set and
* will be reset here. The assert prevents from losing information from the reminder.
*/
Assert((result == INVAL_CUT &&
remainder->lowest_modified_value == refresh_window->start) ||
result == INVAL_NOMATCH);
/*
* Need to cut in left end:
*
* |------|
* [------)
* [++++++++]
*
* [+++]
* [++++]
*/
upper = create_invalidation_tup(tupdesc,
cagg_hyper_id,

View File

@ -78,50 +78,134 @@ get_largest_bucketed_window(Oid timetype, int64 bucket_width)
}
/*
* Adjust the refresh window to align with buckets in an inclusive manner.
* Adjust the refresh window to align with inscribed buckets, so it includes buckets, which are
* fully covered by the refresh window.
*
* It is OK to refresh more than the given refresh window, but not less. Since
* we can only refresh along bucket boundaries, we need to adjust the refresh
* window to be inclusive in both ends to be able to refresh the given
* region. For example, if the dotted region below is the original window, the
* adjusted refresh window includes all four buckets shown.
* Bucketing refresh window is necessary for a continuous aggregate refresh, which can refresh only
* entire buckets. The result of the function is a bucketed window, where its start is at the start
* of the first bucket, which is fully inside the refresh window, and its end is at the end of the
* last fully covered bucket.
*
* | ....|.....|.. |
* Example1, the window needs to shrink:
* [---------) - given refresh window
* .|....|....|....|. - buckets
* [----) - inscribed bucketed window
*
* Example2, the window is already aligned:
* [----) - given refresh window
* .|....|....|....|. - buckets
* [----) - inscribed bucketed window
*
* This function is called for the continuous aggregate policy and manual refresh. In such case
* excluding buckets, which are not fully covered by the refresh window, avoids refreshing a bucket,
* where part of its data were dropped by a retention policy. See #2198 for details.
*/
static InternalTimeRange
compute_bucketed_refresh_window(const InternalTimeRange *refresh_window, int64 bucket_width)
compute_inscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window,
const int64 bucket_width)
{
InternalTimeRange result = *refresh_window;
InternalTimeRange largest_bucketed_window =
get_largest_bucketed_window(refresh_window->type, bucket_width);
if (result.start <= largest_bucketed_window.start)
if (refresh_window->start <= largest_bucketed_window.start)
{
result.start = largest_bucketed_window.start;
else
result.start = ts_time_bucket_by_type(bucket_width, result.start, result.type);
if (result.end >= largest_bucketed_window.end)
result.end = largest_bucketed_window.end;
}
else
{
int64 exclusive_end = result.end;
/* The start time needs to be aligned with the first fully enclosed bucket.
* So the original window start is moved to next bucket, except if the start is
* already aligned with a bucket, thus 1 is subtracted to avoid moving into next
* bucket in the aligned case. */
int64 included_bucket =
ts_time_saturating_add(refresh_window->start, bucket_width - 1, refresh_window->type);
/* Get the start of the included bucket. */
result.start = ts_time_bucket_by_type(bucket_width, included_bucket, refresh_window->type);
}
if (refresh_window->end >= largest_bucketed_window.end)
{
result.end = largest_bucketed_window.end;
}
else
{
/* The window is reduced to the beginning of the bucket, which contains the exclusive
* end of the refresh window. */
result.end =
ts_time_bucket_by_type(bucket_width, refresh_window->end, refresh_window->type);
}
return result;
}
/*
* Adjust the refresh window to align with circumscribed buckets, so it includes buckets, which
* fully cover the refresh window.
*
* Bucketing refresh window is necessary for a continuous aggregate refresh, which can refresh only
* entire buckets. The result of the function is a bucketed window, where its start is at the start
* of a bucket, which contains the start of the refresh window, and its end is at the end of a
* bucket, which contains the end of the refresh window.
*
* Example1, the window needs to expand:
* [---------) - given refresh window
* .|....|....|....|. - buckets
* [--------------) - circumscribed bucketed window
*
* Example2, the window is already aligned:
* [----) - given refresh window
* .|....|....|....|. - buckets
* [----) - inscribed bucketed window
*
* This function is called for an invalidation window before refreshing it and after the
* invalidation window was adjusted to be fully inside a refresh window. In the case of a
* continuous aggregate policy or manual refresh, the refresh window is the inscribed bucketed
* window.
*
* The circumscribed behaviour is also used for a refresh on drop, when the refresh is called during
* dropping chunks manually or as part of retention policy.
*/
static InternalTimeRange
compute_circumscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window,
const int64 bucket_width)
{
InternalTimeRange result = *refresh_window;
InternalTimeRange largest_bucketed_window =
get_largest_bucketed_window(refresh_window->type, bucket_width);
if (refresh_window->start <= largest_bucketed_window.start)
{
result.start = largest_bucketed_window.start;
}
else
{
/* For alignment with a bucket, which includes the start of the refresh window, we just
* need to get start of the bucket. */
result.start =
ts_time_bucket_by_type(bucket_width, refresh_window->start, refresh_window->type);
}
if (refresh_window->end >= largest_bucketed_window.end)
{
result.end = largest_bucketed_window.end;
}
else
{
int64 exclusive_end;
int64 bucketed_end;
Assert(refresh_window->end > result.start);
/* The end of the window is non-inclusive so subtract one before
* bucketing in case we're already at the end of the bucket (we don't
* want to add an extra bucket). But we also don't want to subtract if
* we are at the start of the bucket (we don't want to remove a
* bucket). The last */
if (result.end > result.start)
exclusive_end = ts_time_saturating_sub(result.end, 1, result.type);
bucketed_end = ts_time_bucket_by_type(bucket_width, exclusive_end, result.type);
* want to add an extra bucket). */
exclusive_end = ts_time_saturating_sub(refresh_window->end, 1, refresh_window->type);
bucketed_end = ts_time_bucket_by_type(bucket_width, exclusive_end, refresh_window->type);
/* We get the time value for the start of the bucket, so need to add
* bucket_width to get the end of it */
* bucket_width to get the end of it. */
result.end = ts_time_saturating_add(bucketed_end, bucket_width, refresh_window->type);
}
return result;
}
@ -235,7 +319,7 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
};
InternalTimeRange bucketed_refresh_window =
compute_bucketed_refresh_window(&invalidation, cagg->data.bucket_width);
compute_circumscribed_bucketed_refresh_window(&invalidation, cagg->data.bucket_width);
log_refresh_window(DEBUG1, cagg, &bucketed_refresh_window, "invalidation refresh on");
continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window);
@ -336,9 +420,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
Assert(OidIsValid(cagg->relid));
ereport(NOTICE,
(errmsg("refreshing continuous aggregate \"%s\"", get_rel_name(cagg->relid)),
errhint(
"Use WITH NO DATA if you do not want to refresh the continuous aggregate "
"on creation.")));
errhint("Use WITH NO DATA if you do not want to refresh the continuous "
"aggregate on creation.")));
}
continuous_agg_refresh_with_window(cagg, refresh_window, invalidations);
invalidation_store_free(invalidations);
@ -381,7 +464,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
errmsg("invalid refresh window"),
errhint("The start of the window must be before the end.")));
refresh_window = compute_bucketed_refresh_window(refresh_window_arg, cagg->data.bucket_width);
refresh_window =
compute_inscribed_bucketed_refresh_window(refresh_window_arg, cagg->data.bucket_width);
log_refresh_window(DEBUG1, cagg, &refresh_window, "refreshing continuous aggregate");
/* Perform the refresh across two transactions.
@ -419,17 +503,15 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
* won't be refreshed when the threshold is moved forward in the
* future. The invalidation threshold should already be aligned on bucket
* boundary. */
if (refresh_window.end > invalidation_threshold)
{
if (refresh_window_arg->end > invalidation_threshold)
refresh_window.end = invalidation_threshold;
/* Capping the end might have made the window 0, or negative, so
* nothing to refresh in that case */
if (refresh_window.start >= refresh_window.end)
{
emit_up_to_date_notice(cagg);
return;
}
/* Capping the end might have made the window 0, or negative, so
* nothing to refresh in that case */
if (refresh_window.start >= refresh_window.end)
{
emit_up_to_date_notice(cagg);
return;
}
/* Process invalidations in the hypertable invalidation log */

View File

@ -158,7 +158,7 @@ SELECT * FROM cagg_invals;
5 | -9223372036854775808 | 9223372036854775807
(3 rows)
-- Now refresh up to 50, and the threshold should be updated accordingly:
-- Now refresh up to 50 without the first bucket, and the threshold should be updated accordingly:
CALL refresh_continuous_aggregate('cond_10', 1, 50);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
@ -167,7 +167,18 @@ ORDER BY 1,2;
1 | 50
(1 row)
-- Invalidations should be cleared for the refresh window:
-- Invalidations should be cleared inside the refresh window:
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+---------------------
3 | -9223372036854775808 | 9
3 | 50 | 9223372036854775807
4 | -9223372036854775808 | 9223372036854775807
5 | -9223372036854775808 | 9223372036854775807
(4 rows)
-- Refresh up to 50 from the beginning
CALL refresh_continuous_aggregate('cond_10', 0, 50);
SELECT * FROM cagg_invals;
cagg_id | start | end
---------+----------------------+---------------------
@ -199,7 +210,7 @@ SELECT * FROM cagg_invals;
(4 rows)
-- Refreshing measure_10 moves the threshold only for the other hypertable:
CALL refresh_continuous_aggregate('measure_10', 1, 30);
CALL refresh_continuous_aggregate('measure_10', 0, 30);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
hypertable_id | watermark
@ -412,7 +423,7 @@ SELECT * FROM cagg_invals;
(7 rows)
-- Refresh cond_10 to completely remove an invalidation:
CALL refresh_continuous_aggregate('cond_10', 1, 20);
CALL refresh_continuous_aggregate('cond_10', 0, 20);
-- The 1-19 invalidation should be deleted:
SELECT * FROM cagg_invals;
cagg_id | start | end
@ -869,14 +880,15 @@ ORDER BY 1,2;
INSERT INTO threshold_test
SELECT v, v FROM generate_series(1, 10) v;
CALL refresh_continuous_aggregate('thresh_2', 0, 5);
-- Threshold should move to end of refresh window (note that window
-- expands to end of bucket).
-- Threshold should move to end of the last refreshed bucket, which is
-- the last bucket fully included in the window, i.e., the window
-- shrinks to end of previous bucket.
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
7 | 6
7 | 4
(1 row)
-- Refresh where both the start and end of the window is above the

View File

@ -146,6 +146,7 @@ WHERE id = :job_id;
INSERT INTO continuous_agg_max_mat_date
SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day');
CALL run_job(:job_id);
NOTICE: continuous aggregate "max_mat_view_date" is already up-to-date
DROP MATERIALIZED VIEW max_mat_view_date;
CREATE TABLE continuous_agg_timestamp(time TIMESTAMP);
SELECT create_hypertable('continuous_agg_timestamp', 'time');

View File

@ -59,7 +59,7 @@ ORDER BY day DESC, device;
(0 rows)
-- Refresh the most recent few days:
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-05');
CALL refresh_continuous_aggregate('daily_temp', '2020-05-02', '2020-05-05 17:00');
SELECT * FROM daily_temp
ORDER BY day DESC, device;
day | device | avg_temp
@ -80,7 +80,7 @@ ORDER BY day DESC, device;
-- Refresh the rest (and try DEBUG output)
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-03');
CALL refresh_continuous_aggregate('daily_temp', '2020-04-30', '2020-05-04');
DEBUG: refreshing continuous aggregate "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sun May 03 17:00:00 2020 PDT ]
DEBUG: hypertable 1 existing watermark >= new invalidation threshold 1588723200000000 1588550400000000
DEBUG: invalidation refresh on "daily_temp" in window [ Thu Apr 30 17:00:00 2020 PDT, Sat May 02 17:00:00 2020 PDT ]
@ -243,7 +243,7 @@ AS
SELECT time_bucket(SMALLINT '20', time) AS bucket, device, avg(temp) AS avg_temp
FROM conditions_smallint c
GROUP BY 1,2 WITH NO DATA;
CALL refresh_continuous_aggregate('cond_20_smallint', 5::smallint, 50::smallint);
CALL refresh_continuous_aggregate('cond_20_smallint', 0::smallint, 70::smallint);
SELECT * FROM cond_20_smallint
ORDER BY 1,2;
bucket | device | avg_temp
@ -294,7 +294,7 @@ AS
SELECT time_bucket(INT '20', time) AS bucket, device, avg(temp) AS avg_temp
FROM conditions_int
GROUP BY 1,2 WITH NO DATA;
CALL refresh_continuous_aggregate('cond_20_int', 5, 50);
CALL refresh_continuous_aggregate('cond_20_int', 0, 65);
SELECT * FROM cond_20_int
ORDER BY 1,2;
bucket | device | avg_temp
@ -345,7 +345,7 @@ AS
SELECT time_bucket(BIGINT '20', time) AS bucket, device, avg(temp) AS avg_temp
FROM conditions_bigint
GROUP BY 1,2 WITH NO DATA;
CALL refresh_continuous_aggregate('cond_20_bigint', 5, 50);
CALL refresh_continuous_aggregate('cond_20_bigint', 0, 75);
SELECT * FROM cond_20_bigint
ORDER BY 1,2;
bucket | device | avg_temp

View File

@ -2,7 +2,7 @@ Parsed test spec with 8 sessions
starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
CALL refresh_continuous_aggregate('cond_10', 25, 70);
step S1_select:
SELECT bucket, avg_temp
@ -31,7 +31,7 @@ hypertable threshold
conditions 70
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
CALL refresh_continuous_aggregate('cond_10', 70, 107);
step S1_select:
SELECT bucket, avg_temp
@ -78,7 +78,7 @@ step L2_read_lock_threshold_table:
IN ACCESS SHARE MODE;
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
CALL refresh_continuous_aggregate('cond_10', 70, 107);
<waiting ...>
step L2_read_unlock_threshold_table:
ROLLBACK;
@ -118,14 +118,14 @@ step L1_unlock_threshold_table:
starting permutation: R1_refresh L2_read_lock_threshold_table R3_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
CALL refresh_continuous_aggregate('cond_10', 25, 70);
step L2_read_lock_threshold_table:
LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold
IN ACCESS SHARE MODE;
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
CALL refresh_continuous_aggregate('cond_10', 70, 107);
<waiting ...>
step L2_read_unlock_threshold_table:
ROLLBACK;
@ -169,14 +169,14 @@ step L1_unlock_threshold_table:
starting permutation: R3_refresh L2_read_lock_threshold_table R1_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
CALL refresh_continuous_aggregate('cond_10', 70, 107);
step L2_read_lock_threshold_table:
LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold
IN ACCESS SHARE MODE;
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
CALL refresh_continuous_aggregate('cond_10', 25, 70);
<waiting ...>
step L2_read_unlock_threshold_table:
ROLLBACK;
@ -226,7 +226,7 @@ lock_cagg
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
CALL refresh_continuous_aggregate('cond_10', 25, 70);
<waiting ...>
step L3_unlock_cagg_table:
ROLLBACK;
@ -273,7 +273,7 @@ lock_cagg
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
CALL refresh_continuous_aggregate('cond_10', 25, 70);
<waiting ...>
step R2_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
@ -325,10 +325,10 @@ lock_cagg
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
CALL refresh_continuous_aggregate('cond_10', 25, 70);
<waiting ...>
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
CALL refresh_continuous_aggregate('cond_10', 70, 107);
<waiting ...>
step L3_unlock_cagg_table:
ROLLBACK;
@ -379,7 +379,7 @@ lock_cagg
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
CALL refresh_continuous_aggregate('cond_10', 70, 107);
<waiting ...>
step R4_refresh:
CALL refresh_continuous_aggregate('cond_20', 39, 84);

View File

@ -128,7 +128,7 @@ setup
}
step "R1_refresh"
{
CALL refresh_continuous_aggregate('cond_10', 35, 62);
CALL refresh_continuous_aggregate('cond_10', 25, 70);
}
@ -154,7 +154,7 @@ setup
}
step "R3_refresh"
{
CALL refresh_continuous_aggregate('cond_10', 71, 97);
CALL refresh_continuous_aggregate('cond_10', 70, 107);
}
# Overlapping refresh on another continuous aggregate (cond_20)

View File

@ -110,12 +110,16 @@ ORDER BY 1,2;
-- invalidation log:
SELECT * FROM cagg_invals;
-- Now refresh up to 50, and the threshold should be updated accordingly:
-- Now refresh up to 50 without the first bucket, and the threshold should be updated accordingly:
CALL refresh_continuous_aggregate('cond_10', 1, 50);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
-- Invalidations should be cleared for the refresh window:
-- Invalidations should be cleared inside the refresh window:
SELECT * FROM cagg_invals;
-- Refresh up to 50 from the beginning
CALL refresh_continuous_aggregate('cond_10', 0, 50);
SELECT * FROM cagg_invals;
-- Refreshing below the threshold does not move it:
@ -128,7 +132,7 @@ ORDER BY 1,2;
SELECT * FROM cagg_invals;
-- Refreshing measure_10 moves the threshold only for the other hypertable:
CALL refresh_continuous_aggregate('measure_10', 1, 30);
CALL refresh_continuous_aggregate('measure_10', 0, 30);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
SELECT * FROM cagg_invals;
@ -218,7 +222,7 @@ CALL refresh_continuous_aggregate('cond_20', 20, 60);
SELECT * FROM cagg_invals;
-- Refresh cond_10 to completely remove an invalidation:
CALL refresh_continuous_aggregate('cond_10', 1, 20);
CALL refresh_continuous_aggregate('cond_10', 0, 20);
-- The 1-19 invalidation should be deleted:
SELECT * FROM cagg_invals;
@ -479,8 +483,9 @@ SELECT v, v FROM generate_series(1, 10) v;
CALL refresh_continuous_aggregate('thresh_2', 0, 5);
-- Threshold should move to end of refresh window (note that window
-- expands to end of bucket).
-- Threshold should move to end of the last refreshed bucket, which is
-- the last bucket fully included in the window, i.e., the window
-- shrinks to end of previous bucket.
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
WHERE hypertable_id = :thresh_hyper_id
ORDER BY 1,2;

View File

@ -34,14 +34,14 @@ SELECT * FROM daily_temp
ORDER BY day DESC, device;
-- Refresh the most recent few days:
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-05');
CALL refresh_continuous_aggregate('daily_temp', '2020-05-02', '2020-05-05 17:00');
SELECT * FROM daily_temp
ORDER BY day DESC, device;
-- Refresh the rest (and try DEBUG output)
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-03');
CALL refresh_continuous_aggregate('daily_temp', '2020-04-30', '2020-05-04');
RESET client_min_messages;
-- Compare the aggregate to the equivalent query on the source table
@ -140,7 +140,7 @@ SELECT time_bucket(SMALLINT '20', time) AS bucket, device, avg(temp) AS avg_temp
FROM conditions_smallint c
GROUP BY 1,2 WITH NO DATA;
CALL refresh_continuous_aggregate('cond_20_smallint', 5::smallint, 50::smallint);
CALL refresh_continuous_aggregate('cond_20_smallint', 0::smallint, 70::smallint);
SELECT * FROM cond_20_smallint
ORDER BY 1,2;
@ -173,7 +173,7 @@ SELECT time_bucket(INT '20', time) AS bucket, device, avg(temp) AS avg_temp
FROM conditions_int
GROUP BY 1,2 WITH NO DATA;
CALL refresh_continuous_aggregate('cond_20_int', 5, 50);
CALL refresh_continuous_aggregate('cond_20_int', 0, 65);
SELECT * FROM cond_20_int
ORDER BY 1,2;
@ -206,7 +206,7 @@ SELECT time_bucket(BIGINT '20', time) AS bucket, device, avg(temp) AS avg_temp
FROM conditions_bigint
GROUP BY 1,2 WITH NO DATA;
CALL refresh_continuous_aggregate('cond_20_bigint', 5, 50);
CALL refresh_continuous_aggregate('cond_20_bigint', 0, 75);
SELECT * FROM cond_20_bigint
ORDER BY 1,2;