From 06f08c5b25e1fa3b7aad0a675bfde675e28c4cdb Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Wed, 6 Mar 2024 12:01:48 +0100 Subject: [PATCH] Convert max materializations setting to GUC PR #2926 introduced a session-based configuration parameter for the CAgg refresh behavior. If more individual refreshes have to be carried out than specified by this setting, a refresh for a larger window is performed. It is mentioned in the original PR that this setting should be converted into a GUC later. This PR performs the proposed change. --- src/guc.c | 21 ++++++++++ src/guc.h | 1 + tsl/src/continuous_aggs/refresh.c | 52 +------------------------ tsl/test/expected/cagg_invalidation.out | 13 ++++--- tsl/test/sql/cagg_invalidation.sql | 2 + 5 files changed, 32 insertions(+), 57 deletions(-) diff --git a/src/guc.c b/src/guc.c index a8fff8098..a7ffd79cf 100644 --- a/src/guc.c +++ b/src/guc.c @@ -62,6 +62,7 @@ bool ts_guc_enable_qual_propagation = true; bool ts_guc_enable_cagg_reorder_groupby = true; bool ts_guc_enable_now_constify = true; TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true; +TSDLLEXPORT int ts_guc_cagg_max_individual_materializations = 10; bool ts_guc_enable_osm_reads = true; TSDLLEXPORT bool ts_guc_enable_dml_decompression = true; TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml = 100000; @@ -437,6 +438,26 @@ _guc_init(void) NULL, NULL); + /* + * Define the limit on number of invalidation-based refreshes we allow per + * refresh call. If this limit is exceeded, fall back to a single refresh that + * covers the range decided by the min and max invalidated time. + */ + DefineCustomIntVariable(MAKE_EXTOPTION("materializations_per_refresh_window"), + "Max number of materializations per cagg refresh window", + "The maximal number of individual refreshes per cagg refresh. If more " + "refreshes need to be performed, they are merged into a larger " + "single refresh.", + &ts_guc_cagg_max_individual_materializations, + 10, + 0, + INT_MAX, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); + DefineCustomBoolVariable(MAKE_EXTOPTION("enable_tiered_reads"), "Enable tiered data reads", "Enable reading of tiered data by including a foreign table " diff --git a/src/guc.h b/src/guc.h index e8844fb17..ecaa739fe 100644 --- a/src/guc.h +++ b/src/guc.h @@ -24,6 +24,7 @@ extern bool ts_guc_enable_qual_propagation; extern bool ts_guc_enable_runtime_exclusion; extern bool ts_guc_enable_constraint_exclusion; extern bool ts_guc_enable_cagg_reorder_groupby; +extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations; extern bool ts_guc_enable_now_constify; extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify; extern bool ts_guc_enable_osm_reads; diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index 41d39b190..f22076c5d 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -55,7 +55,6 @@ static void continuous_agg_refresh_execute(const CaggRefreshState *refresh, const int32 chunk_id); static void log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRange *refresh_window, const char *msg); -static long materialization_per_refresh_window(void); static void continuous_agg_refresh_execute_wrapper(const InternalTimeRange *bucketed_refresh_window, const long iteration, void *arg1_refresh, void *arg2_chunk_id); @@ -336,53 +335,6 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang DatumGetCString(OidFunctionCall1(outfuncid, end_ts))); } -/* - * Get the limit on number of invalidation-based refreshes we allow per - * refresh call. If this limit is exceeded, fall back to a single refresh that - * covers the range decided by the min and max invalidated time. - * - * Use a session variable for debugging and testing. In other words, this - * purposefully not a user-visible GUC. Might be promoted to official GUC in - * the future. - */ -static long -materialization_per_refresh_window(void) -{ -#define DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW 10 -#define MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME \ - MAKE_EXTOPTION("materializations_per_refresh_window") - - const char *max_materializations_setting = - GetConfigOption(MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME, true, false); - long max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW; - - if (max_materializations_setting) - { - char *endptr = NULL; - - /* Not using pg_strtol here since we don't want to throw error in case - * of parsing issue */ - max_materializations = strtol(max_materializations_setting, &endptr, 10); - - /* Accept trailing whitespaces */ - while (*endptr == ' ') - endptr++; - - if (*endptr != '\0') - { - ereport(WARNING, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid value for session variable \"%s\"", - MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME), - errdetail("Expected an integer but current value is \"%s\".", - max_materializations_setting))); - max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW; - } - } - - return max_materializations; -} - typedef void (*scan_refresh_ranges_funct_t)(const InternalTimeRange *bucketed_refresh_window, const long iteration, /* 0 is first range */ void *arg1, void *arg2); @@ -661,7 +613,6 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg, Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id, false); bool do_merged_refresh = false; InternalTimeRange merged_refresh_window; - long max_materializations; /* Lock the continuous aggregate's materialized hypertable to protect * against concurrent refreshes. Only concurrent reads will be @@ -674,12 +625,11 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg, LockRelationOid(hyper_relid, ExclusiveLock); const CaggsInfo all_caggs_info = ts_continuous_agg_get_all_caggs_info(cagg->data.raw_hypertable_id); - max_materializations = materialization_per_refresh_window(); invalidations = invalidation_process_cagg_log(cagg->data.mat_hypertable_id, cagg->data.raw_hypertable_id, refresh_window, &all_caggs_info, - max_materializations, + ts_guc_cagg_max_individual_materializations, &do_merged_refresh, &merged_refresh_window); diff --git a/tsl/test/expected/cagg_invalidation.out b/tsl/test/expected/cagg_invalidation.out index 87ac5fc4f..9b12fb520 100644 --- a/tsl/test/expected/cagg_invalidation.out +++ b/tsl/test/expected/cagg_invalidation.out @@ -1201,32 +1201,33 @@ INSERT INTO conditions VALUES (120, 1, 1.0); INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); \set VERBOSITY default +\set ON_ERROR_STOP 0 -- Test acceptable values for materializations per refresh SET timescaledb.materializations_per_refresh_window=' 5 '; INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -- Large value will be treated as LONG_MAX SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347; +ERROR: invalid value for parameter "timescaledb.materializations_per_refresh_window": "342239897234023842394249234766923492347" +HINT: Value exceeds integer range. INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -- Test bad values for materializations per refresh SET timescaledb.materializations_per_refresh_window='foo'; +ERROR: invalid value for parameter "timescaledb.materializations_per_refresh_window": "foo" INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" -DETAIL: Expected an integer but current value is "foo". SET timescaledb.materializations_per_refresh_window='2bar'; +ERROR: invalid value for parameter "timescaledb.materializations_per_refresh_window": "2bar" INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" -DETAIL: Expected an integer but current value is "2bar". SET timescaledb.materializations_per_refresh_window='-'; +ERROR: invalid value for parameter "timescaledb.materializations_per_refresh_window": "-" INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" -DETAIL: Expected an integer but current value is "-". \set VERBOSITY terse RESET timescaledb.materializations_per_refresh_window; +\set ON_ERROR_STOP 1 -- Test refresh with undefined invalidation threshold and variable sized buckets CREATE TABLE timestamp_ht ( time timestamptz NOT NULL, diff --git a/tsl/test/sql/cagg_invalidation.sql b/tsl/test/sql/cagg_invalidation.sql index d9bdee8a8..387ebeee3 100644 --- a/tsl/test/sql/cagg_invalidation.sql +++ b/tsl/test/sql/cagg_invalidation.sql @@ -698,6 +698,7 @@ INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); \set VERBOSITY default +\set ON_ERROR_STOP 0 -- Test acceptable values for materializations per refresh SET timescaledb.materializations_per_refresh_window=' 5 '; INSERT INTO conditions VALUES (140, 1, 1.0); @@ -720,6 +721,7 @@ INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); \set VERBOSITY terse RESET timescaledb.materializations_per_refresh_window; +\set ON_ERROR_STOP 1 -- Test refresh with undefined invalidation threshold and variable sized buckets CREATE TABLE timestamp_ht (