Limit number of materializations per cagg refresh

When there are many small (e.g., single timestamp) invalidations that
cannot be merged despite expanding invalidations to full buckets
(e.g., invalidations are spread across every second bucket in the
worst case), it might no longer be beneficial to materialize every
invalidation separately.

Instead, this change adds a threshold for the number of invalidations
used by the refresh (currently 10 by default) above which
invalidations are merged into one range based on the lowest and
greatest invalidated time value.

The limit can be controlled by an anonymous session variable for
debugging and tweaking purposes. It might be considered for promotion
to an official GUC in the future.

Fixes #2867
This commit is contained in:
Erik Nordström 2021-02-10 16:59:35 +01:00 committed by Erik Nordström
parent d4a93c7745
commit cc287f966d
4 changed files with 216 additions and 3 deletions

View File

@ -8,6 +8,7 @@ accidentally triggering the load of a previous DB version.**
**Minor features**
* #2736 Support adding columns to hypertables with compression enabled
* #2926 Optimize cagg refresh for small invalidations
**Bugfixes**
* #2883 Fix join qual propagation for nested joins
@ -16,6 +17,8 @@ accidentally triggering the load of a previous DB version.**
**Thanks**
* @zeeshanshabbir93 for reporting an issue with joins
* @Antiarchitect for reporting the issue with slow refreshes of
continuous aggregates.
## 1.7.5 (2021-02-12)

View File

@ -9,6 +9,7 @@
#include <utils/fmgrprotos.h>
#include <utils/snapmgr.h>
#include <utils/guc.h>
#include <utils/builtins.h>
#include <access/xact.h>
#include <storage/lmgr.h>
#include <miscadmin.h>
@ -287,6 +288,79 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang
DatumGetCString(OidFunctionCall1(outfuncid, end_ts)));
}
/*
* Get the limit on number of invalidation-based refreshes we allow per
* refresh call. If this limit is exceeded, fall back to a single refresh that
* covers the range decided by the min and max invalidated time.
*
* Use a session variable for debugging and testing. In other words, this
* purposefully not a user-visible GUC. Might be promoted to official GUC in
* the future.
*/
static long
materialization_per_refresh_window(void)
{
#define DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW 10
#define MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME \
"timescaledb.materializations_per_refresh_window"
const char *max_materializations_setting =
GetConfigOption(MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME, true, false);
long max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW;
if (max_materializations_setting)
{
char *endptr = NULL;
/* Not using pg_strtol here since we don't want to throw error in case
* of parsing issue */
max_materializations = strtol(max_materializations_setting, &endptr, 10);
/* Accept trailing whitespaces */
while (*endptr == ' ')
endptr++;
if (*endptr != '\0')
{
ereport(WARNING,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for session variable \"%s\"",
MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME),
errdetail("Expected an integer but current value is \"%s\".",
max_materializations_setting)));
max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW;
}
}
return max_materializations;
}
/*
* Execute refreshes based on the processed invalidations.
*
* The given refresh window covers a set of buckets, some of which are
* out-of-date (invalid) and some which are up-to-date (valid). Invalid
* buckets that are adjacent form larger ranges, as shown below.
*
* Refresh window: [-----------------------------------------)
* Invalid ranges: [-----] [-] [--] [-] [---]
* Merged range: [---------------------------)
*
* The maximum number of individual (non-mergable) ranges are
* #buckets_in_window/2 (i.e., every other bucket is invalid).
*
* Since it might not be efficient to materialize a lot buckets separately
* when there are many invalid (non-adjecent) buckets/ranges, we put a limit
* on the number of individual materializations we do. This limit is
* determined by the MATERIALIZATIONS_PER_REFRESH_WINDOW setting.
*
* Thus, if the refresh window covers a large number of buckets, but only a
* few of them are invalid, it is likely beneficial to materialized these
* separately to avoid materializing a lot of buckets that are already
* up-to-date. But if the number of invalid buckets/ranges go above the
* threshold, we materialize all of them in one go using the "merged range",
* as illustrated above.
*/
static void
continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
@ -294,8 +368,20 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
{
CaggRefreshState refresh;
TupleTableSlot *slot;
bool do_merged_refresh = false;
InternalTimeRange merged_refresh_window;
long count = 0;
continuous_agg_refresh_init(&refresh, cagg, refresh_window);
/*
* If there are many individual invalidation ranges to refresh, then
* revert to a merged refresh across the range decided by lowest and
* highest invalidated value.
*/
if (tuplestore_tuple_count(invalidations->tupstore) > materialization_per_refresh_window())
do_merged_refresh = true;
slot = MakeSingleTupleTableSlotCompat(invalidations->tupdesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(invalidations->tupstore,
@ -323,8 +409,39 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
InternalTimeRange bucketed_refresh_window =
compute_circumscribed_bucketed_refresh_window(&invalidation, cagg->data.bucket_width);
log_refresh_window(DEBUG1, cagg, &bucketed_refresh_window, "invalidation refresh on");
continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, chunk_id);
if (do_merged_refresh)
{
if (count == 0)
merged_refresh_window = bucketed_refresh_window;
else
{
if (bucketed_refresh_window.start < merged_refresh_window.start)
merged_refresh_window.start = bucketed_refresh_window.start;
if (bucketed_refresh_window.end > merged_refresh_window.end)
merged_refresh_window.end = bucketed_refresh_window.end;
}
}
else
{
log_refresh_window(DEBUG1, cagg, &bucketed_refresh_window, "invalidation refresh on");
continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, chunk_id);
}
count++;
}
if (do_merged_refresh && count > 0)
{
Assert(merged_refresh_window.type == refresh_window->type);
Assert(merged_refresh_window.start >= refresh_window->start);
Assert(merged_refresh_window.end <= refresh_window->end);
log_refresh_window(DEBUG1,
cagg,
&merged_refresh_window,
psprintf("merged %ld invalidations for refresh on", count));
continuous_agg_refresh_execute(&refresh, &merged_refresh_window, chunk_id);
}
ExecDropSingleTupleTableSlot(slot);

View File

@ -1131,3 +1131,53 @@ WHERE cagg_id = :cond_10_id;
3 | 60 | 9223372036854775807
(4 rows)
-- should trigger two individual refreshes
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('cond_10', 0, 200);
DEBUG: refreshing continuous aggregate "cond_10" in window [ 0, 200 ]
DEBUG: invalidation refresh on "cond_10" in window [ 0, 30 ]
DEBUG: invalidation refresh on "cond_10" in window [ 40, 50 ]
DEBUG: invalidation refresh on "cond_10" in window [ 60, 200 ]
RESET client_min_messages;
-- Allow at most 5 individual invalidations per refreshe
SET timescaledb.materializations_per_refresh_window=5;
-- Insert into every second bucket
INSERT INTO conditions VALUES (20, 1, 1.0);
INSERT INTO conditions VALUES (40, 1, 1.0);
INSERT INTO conditions VALUES (60, 1, 1.0);
INSERT INTO conditions VALUES (80, 1, 1.0);
INSERT INTO conditions VALUES (100, 1, 1.0);
INSERT INTO conditions VALUES (120, 1, 1.0);
INSERT INTO conditions VALUES (140, 1, 1.0);
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('cond_10', 0, 200);
DEBUG: refreshing continuous aggregate "cond_10" in window [ 0, 200 ]
DEBUG: hypertable 1 existing watermark >= new invalidation threshold 200 200
DEBUG: merged 7 invalidations for refresh on "cond_10" in window [ 20, 150 ]
RESET client_min_messages;
\set VERBOSITY default
-- Test acceptable values for materializations per refresh
SET timescaledb.materializations_per_refresh_window=' 5 ';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
-- Large value will be treated as LONG_MAX
SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347;
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
-- Test bad values for materializations per refresh
SET timescaledb.materializations_per_refresh_window='foo';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
DETAIL: Expected an integer but current value is "foo".
SET timescaledb.materializations_per_refresh_window='2bar';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
DETAIL: Expected an integer but current value is "2bar".
SET timescaledb.materializations_per_refresh_window='-';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
DETAIL: Expected an integer but current value is "-".
\set VERBOSITY terse

View File

@ -624,7 +624,6 @@ WHERE cagg_id = :cond_1_id;
-- Test that single timestamp invalidations are expanded to buckets,
-- and adjacent buckets merged.
---------------------------------------------------------------------
-- First clear invalidations in a range:
CALL refresh_continuous_aggregate('cond_10', -20, 60);
@ -645,3 +644,47 @@ WHERE user_view_name = 'cond_10' \gset
SELECT * FROM cagg_invals
WHERE cagg_id = :cond_10_id;
-- should trigger two individual refreshes
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('cond_10', 0, 200);
RESET client_min_messages;
-- Allow at most 5 individual invalidations per refreshe
SET timescaledb.materializations_per_refresh_window=5;
-- Insert into every second bucket
INSERT INTO conditions VALUES (20, 1, 1.0);
INSERT INTO conditions VALUES (40, 1, 1.0);
INSERT INTO conditions VALUES (60, 1, 1.0);
INSERT INTO conditions VALUES (80, 1, 1.0);
INSERT INTO conditions VALUES (100, 1, 1.0);
INSERT INTO conditions VALUES (120, 1, 1.0);
INSERT INTO conditions VALUES (140, 1, 1.0);
SET client_min_messages TO DEBUG1;
CALL refresh_continuous_aggregate('cond_10', 0, 200);
RESET client_min_messages;
\set VERBOSITY default
-- Test acceptable values for materializations per refresh
SET timescaledb.materializations_per_refresh_window=' 5 ';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
-- Large value will be treated as LONG_MAX
SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347;
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
-- Test bad values for materializations per refresh
SET timescaledb.materializations_per_refresh_window='foo';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
SET timescaledb.materializations_per_refresh_window='2bar';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
SET timescaledb.materializations_per_refresh_window='-';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
\set VERBOSITY terse