diff --git a/CHANGELOG.md b/CHANGELOG.md index f84b6e2bb..321eeb533 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ accidentally triggering the load of a previous DB version.** **Minor features** * #2736 Support adding columns to hypertables with compression enabled +* #2926 Optimize cagg refresh for small invalidations **Bugfixes** * #2883 Fix join qual propagation for nested joins @@ -16,6 +17,8 @@ accidentally triggering the load of a previous DB version.** **Thanks** * @zeeshanshabbir93 for reporting an issue with joins +* @Antiarchitect for reporting the issue with slow refreshes of + continuous aggregates. ## 1.7.5 (2021-02-12) diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index e52dd6ade..645d9c6e1 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -287,6 +288,79 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang DatumGetCString(OidFunctionCall1(outfuncid, end_ts))); } +/* + * Get the limit on number of invalidation-based refreshes we allow per + * refresh call. If this limit is exceeded, fall back to a single refresh that + * covers the range decided by the min and max invalidated time. + * + * Use a session variable for debugging and testing. In other words, this + * purposefully not a user-visible GUC. Might be promoted to official GUC in + * the future. + */ +static long +materialization_per_refresh_window(void) +{ +#define DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW 10 +#define MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME \ + "timescaledb.materializations_per_refresh_window" + + const char *max_materializations_setting = + GetConfigOption(MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME, true, false); + long max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW; + + if (max_materializations_setting) + { + char *endptr = NULL; + + /* Not using pg_strtol here since we don't want to throw error in case + * of parsing issue */ + max_materializations = strtol(max_materializations_setting, &endptr, 10); + + /* Accept trailing whitespaces */ + while (*endptr == ' ') + endptr++; + + if (*endptr != '\0') + { + ereport(WARNING, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for session variable \"%s\"", + MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME), + errdetail("Expected an integer but current value is \"%s\".", + max_materializations_setting))); + max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW; + } + } + + return max_materializations; +} + +/* + * Execute refreshes based on the processed invalidations. + * + * The given refresh window covers a set of buckets, some of which are + * out-of-date (invalid) and some which are up-to-date (valid). Invalid + * buckets that are adjacent form larger ranges, as shown below. + * + * Refresh window: [-----------------------------------------) + * Invalid ranges: [-----] [-] [--] [-] [---] + * Merged range: [---------------------------) + * + * The maximum number of individual (non-mergable) ranges are + * #buckets_in_window/2 (i.e., every other bucket is invalid). + * + * Since it might not be efficient to materialize a lot buckets separately + * when there are many invalid (non-adjecent) buckets/ranges, we put a limit + * on the number of individual materializations we do. This limit is + * determined by the MATERIALIZATIONS_PER_REFRESH_WINDOW setting. + * + * Thus, if the refresh window covers a large number of buckets, but only a + * few of them are invalid, it is likely beneficial to materialized these + * separately to avoid materializing a lot of buckets that are already + * up-to-date. But if the number of invalid buckets/ranges go above the + * threshold, we materialize all of them in one go using the "merged range", + * as illustrated above. + */ static void continuous_agg_refresh_with_window(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window, @@ -294,8 +368,20 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg, { CaggRefreshState refresh; TupleTableSlot *slot; + bool do_merged_refresh = false; + InternalTimeRange merged_refresh_window; + long count = 0; continuous_agg_refresh_init(&refresh, cagg, refresh_window); + + /* + * If there are many individual invalidation ranges to refresh, then + * revert to a merged refresh across the range decided by lowest and + * highest invalidated value. + */ + if (tuplestore_tuple_count(invalidations->tupstore) > materialization_per_refresh_window()) + do_merged_refresh = true; + slot = MakeSingleTupleTableSlotCompat(invalidations->tupdesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(invalidations->tupstore, @@ -323,8 +409,39 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg, InternalTimeRange bucketed_refresh_window = compute_circumscribed_bucketed_refresh_window(&invalidation, cagg->data.bucket_width); - log_refresh_window(DEBUG1, cagg, &bucketed_refresh_window, "invalidation refresh on"); - continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, chunk_id); + if (do_merged_refresh) + { + if (count == 0) + merged_refresh_window = bucketed_refresh_window; + else + { + if (bucketed_refresh_window.start < merged_refresh_window.start) + merged_refresh_window.start = bucketed_refresh_window.start; + + if (bucketed_refresh_window.end > merged_refresh_window.end) + merged_refresh_window.end = bucketed_refresh_window.end; + } + } + else + { + log_refresh_window(DEBUG1, cagg, &bucketed_refresh_window, "invalidation refresh on"); + continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, chunk_id); + } + + count++; + } + + if (do_merged_refresh && count > 0) + { + Assert(merged_refresh_window.type == refresh_window->type); + Assert(merged_refresh_window.start >= refresh_window->start); + Assert(merged_refresh_window.end <= refresh_window->end); + + log_refresh_window(DEBUG1, + cagg, + &merged_refresh_window, + psprintf("merged %ld invalidations for refresh on", count)); + continuous_agg_refresh_execute(&refresh, &merged_refresh_window, chunk_id); } ExecDropSingleTupleTableSlot(slot); diff --git a/tsl/test/expected/continuous_aggs_invalidation.out b/tsl/test/expected/continuous_aggs_invalidation.out index 5fe7e01fe..f669fb309 100644 --- a/tsl/test/expected/continuous_aggs_invalidation.out +++ b/tsl/test/expected/continuous_aggs_invalidation.out @@ -1131,3 +1131,53 @@ WHERE cagg_id = :cond_10_id; 3 | 60 | 9223372036854775807 (4 rows) +-- should trigger two individual refreshes +SET client_min_messages TO DEBUG1; +CALL refresh_continuous_aggregate('cond_10', 0, 200); +DEBUG: refreshing continuous aggregate "cond_10" in window [ 0, 200 ] +DEBUG: invalidation refresh on "cond_10" in window [ 0, 30 ] +DEBUG: invalidation refresh on "cond_10" in window [ 40, 50 ] +DEBUG: invalidation refresh on "cond_10" in window [ 60, 200 ] +RESET client_min_messages; +-- Allow at most 5 individual invalidations per refreshe +SET timescaledb.materializations_per_refresh_window=5; +-- Insert into every second bucket +INSERT INTO conditions VALUES (20, 1, 1.0); +INSERT INTO conditions VALUES (40, 1, 1.0); +INSERT INTO conditions VALUES (60, 1, 1.0); +INSERT INTO conditions VALUES (80, 1, 1.0); +INSERT INTO conditions VALUES (100, 1, 1.0); +INSERT INTO conditions VALUES (120, 1, 1.0); +INSERT INTO conditions VALUES (140, 1, 1.0); +SET client_min_messages TO DEBUG1; +CALL refresh_continuous_aggregate('cond_10', 0, 200); +DEBUG: refreshing continuous aggregate "cond_10" in window [ 0, 200 ] +DEBUG: hypertable 1 existing watermark >= new invalidation threshold 200 200 +DEBUG: merged 7 invalidations for refresh on "cond_10" in window [ 20, 150 ] +RESET client_min_messages; +\set VERBOSITY default +-- Test acceptable values for materializations per refresh +SET timescaledb.materializations_per_refresh_window=' 5 '; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +-- Large value will be treated as LONG_MAX +SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +-- Test bad values for materializations per refresh +SET timescaledb.materializations_per_refresh_window='foo'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +DETAIL: Expected an integer but current value is "foo". +SET timescaledb.materializations_per_refresh_window='2bar'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +DETAIL: Expected an integer but current value is "2bar". +SET timescaledb.materializations_per_refresh_window='-'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +DETAIL: Expected an integer but current value is "-". +\set VERBOSITY terse diff --git a/tsl/test/sql/continuous_aggs_invalidation.sql b/tsl/test/sql/continuous_aggs_invalidation.sql index 67c0c689f..64c7acb0b 100644 --- a/tsl/test/sql/continuous_aggs_invalidation.sql +++ b/tsl/test/sql/continuous_aggs_invalidation.sql @@ -624,7 +624,6 @@ WHERE cagg_id = :cond_1_id; -- Test that single timestamp invalidations are expanded to buckets, -- and adjacent buckets merged. --------------------------------------------------------------------- - -- First clear invalidations in a range: CALL refresh_continuous_aggregate('cond_10', -20, 60); @@ -645,3 +644,47 @@ WHERE user_view_name = 'cond_10' \gset SELECT * FROM cagg_invals WHERE cagg_id = :cond_10_id; + +-- should trigger two individual refreshes +SET client_min_messages TO DEBUG1; +CALL refresh_continuous_aggregate('cond_10', 0, 200); +RESET client_min_messages; + +-- Allow at most 5 individual invalidations per refreshe +SET timescaledb.materializations_per_refresh_window=5; + +-- Insert into every second bucket +INSERT INTO conditions VALUES (20, 1, 1.0); +INSERT INTO conditions VALUES (40, 1, 1.0); +INSERT INTO conditions VALUES (60, 1, 1.0); +INSERT INTO conditions VALUES (80, 1, 1.0); +INSERT INTO conditions VALUES (100, 1, 1.0); +INSERT INTO conditions VALUES (120, 1, 1.0); +INSERT INTO conditions VALUES (140, 1, 1.0); + +SET client_min_messages TO DEBUG1; +CALL refresh_continuous_aggregate('cond_10', 0, 200); +RESET client_min_messages; + +\set VERBOSITY default +-- Test acceptable values for materializations per refresh +SET timescaledb.materializations_per_refresh_window=' 5 '; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +-- Large value will be treated as LONG_MAX +SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); + +-- Test bad values for materializations per refresh +SET timescaledb.materializations_per_refresh_window='foo'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +SET timescaledb.materializations_per_refresh_window='2bar'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); + +SET timescaledb.materializations_per_refresh_window='-'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +\set VERBOSITY terse