mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-21 21:21:22 +08:00
Convert max materializations setting to GUC
PR #2926 introduced a session-based configuration parameter for the CAgg refresh behavior. If more individual refreshes have to be carried out than specified by this setting, a refresh for a larger window is performed. It is mentioned in the original PR that this setting should be converted into a GUC later. This PR performs the proposed change.
This commit is contained in:
parent
e30699101b
commit
06f08c5b25
21
src/guc.c
21
src/guc.c
@ -62,6 +62,7 @@ bool ts_guc_enable_qual_propagation = true;
|
|||||||
bool ts_guc_enable_cagg_reorder_groupby = true;
|
bool ts_guc_enable_cagg_reorder_groupby = true;
|
||||||
bool ts_guc_enable_now_constify = true;
|
bool ts_guc_enable_now_constify = true;
|
||||||
TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true;
|
TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true;
|
||||||
|
TSDLLEXPORT int ts_guc_cagg_max_individual_materializations = 10;
|
||||||
bool ts_guc_enable_osm_reads = true;
|
bool ts_guc_enable_osm_reads = true;
|
||||||
TSDLLEXPORT bool ts_guc_enable_dml_decompression = true;
|
TSDLLEXPORT bool ts_guc_enable_dml_decompression = true;
|
||||||
TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml = 100000;
|
TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml = 100000;
|
||||||
@ -437,6 +438,26 @@ _guc_init(void)
|
|||||||
NULL,
|
NULL,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Define the limit on number of invalidation-based refreshes we allow per
|
||||||
|
* refresh call. If this limit is exceeded, fall back to a single refresh that
|
||||||
|
* covers the range decided by the min and max invalidated time.
|
||||||
|
*/
|
||||||
|
DefineCustomIntVariable(MAKE_EXTOPTION("materializations_per_refresh_window"),
|
||||||
|
"Max number of materializations per cagg refresh window",
|
||||||
|
"The maximal number of individual refreshes per cagg refresh. If more "
|
||||||
|
"refreshes need to be performed, they are merged into a larger "
|
||||||
|
"single refresh.",
|
||||||
|
&ts_guc_cagg_max_individual_materializations,
|
||||||
|
10,
|
||||||
|
0,
|
||||||
|
INT_MAX,
|
||||||
|
PGC_USERSET,
|
||||||
|
0,
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(MAKE_EXTOPTION("enable_tiered_reads"),
|
DefineCustomBoolVariable(MAKE_EXTOPTION("enable_tiered_reads"),
|
||||||
"Enable tiered data reads",
|
"Enable tiered data reads",
|
||||||
"Enable reading of tiered data by including a foreign table "
|
"Enable reading of tiered data by including a foreign table "
|
||||||
|
@ -24,6 +24,7 @@ extern bool ts_guc_enable_qual_propagation;
|
|||||||
extern bool ts_guc_enable_runtime_exclusion;
|
extern bool ts_guc_enable_runtime_exclusion;
|
||||||
extern bool ts_guc_enable_constraint_exclusion;
|
extern bool ts_guc_enable_constraint_exclusion;
|
||||||
extern bool ts_guc_enable_cagg_reorder_groupby;
|
extern bool ts_guc_enable_cagg_reorder_groupby;
|
||||||
|
extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations;
|
||||||
extern bool ts_guc_enable_now_constify;
|
extern bool ts_guc_enable_now_constify;
|
||||||
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
|
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
|
||||||
extern bool ts_guc_enable_osm_reads;
|
extern bool ts_guc_enable_osm_reads;
|
||||||
|
@ -55,7 +55,6 @@ static void continuous_agg_refresh_execute(const CaggRefreshState *refresh,
|
|||||||
const int32 chunk_id);
|
const int32 chunk_id);
|
||||||
static void log_refresh_window(int elevel, const ContinuousAgg *cagg,
|
static void log_refresh_window(int elevel, const ContinuousAgg *cagg,
|
||||||
const InternalTimeRange *refresh_window, const char *msg);
|
const InternalTimeRange *refresh_window, const char *msg);
|
||||||
static long materialization_per_refresh_window(void);
|
|
||||||
static void continuous_agg_refresh_execute_wrapper(const InternalTimeRange *bucketed_refresh_window,
|
static void continuous_agg_refresh_execute_wrapper(const InternalTimeRange *bucketed_refresh_window,
|
||||||
const long iteration, void *arg1_refresh,
|
const long iteration, void *arg1_refresh,
|
||||||
void *arg2_chunk_id);
|
void *arg2_chunk_id);
|
||||||
@ -336,53 +335,6 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang
|
|||||||
DatumGetCString(OidFunctionCall1(outfuncid, end_ts)));
|
DatumGetCString(OidFunctionCall1(outfuncid, end_ts)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Get the limit on number of invalidation-based refreshes we allow per
|
|
||||||
* refresh call. If this limit is exceeded, fall back to a single refresh that
|
|
||||||
* covers the range decided by the min and max invalidated time.
|
|
||||||
*
|
|
||||||
* Use a session variable for debugging and testing. In other words, this
|
|
||||||
* purposefully not a user-visible GUC. Might be promoted to official GUC in
|
|
||||||
* the future.
|
|
||||||
*/
|
|
||||||
static long
|
|
||||||
materialization_per_refresh_window(void)
|
|
||||||
{
|
|
||||||
#define DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW 10
|
|
||||||
#define MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME \
|
|
||||||
MAKE_EXTOPTION("materializations_per_refresh_window")
|
|
||||||
|
|
||||||
const char *max_materializations_setting =
|
|
||||||
GetConfigOption(MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME, true, false);
|
|
||||||
long max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW;
|
|
||||||
|
|
||||||
if (max_materializations_setting)
|
|
||||||
{
|
|
||||||
char *endptr = NULL;
|
|
||||||
|
|
||||||
/* Not using pg_strtol here since we don't want to throw error in case
|
|
||||||
* of parsing issue */
|
|
||||||
max_materializations = strtol(max_materializations_setting, &endptr, 10);
|
|
||||||
|
|
||||||
/* Accept trailing whitespaces */
|
|
||||||
while (*endptr == ' ')
|
|
||||||
endptr++;
|
|
||||||
|
|
||||||
if (*endptr != '\0')
|
|
||||||
{
|
|
||||||
ereport(WARNING,
|
|
||||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
||||||
errmsg("invalid value for session variable \"%s\"",
|
|
||||||
MATERIALIZATIONS_PER_REFRESH_WINDOW_OPT_NAME),
|
|
||||||
errdetail("Expected an integer but current value is \"%s\".",
|
|
||||||
max_materializations_setting)));
|
|
||||||
max_materializations = DEFAULT_MATERIALIZATIONS_PER_REFRESH_WINDOW;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return max_materializations;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef void (*scan_refresh_ranges_funct_t)(const InternalTimeRange *bucketed_refresh_window,
|
typedef void (*scan_refresh_ranges_funct_t)(const InternalTimeRange *bucketed_refresh_window,
|
||||||
const long iteration, /* 0 is first range */
|
const long iteration, /* 0 is first range */
|
||||||
void *arg1, void *arg2);
|
void *arg1, void *arg2);
|
||||||
@ -661,7 +613,6 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
|
|||||||
Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id, false);
|
Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id, false);
|
||||||
bool do_merged_refresh = false;
|
bool do_merged_refresh = false;
|
||||||
InternalTimeRange merged_refresh_window;
|
InternalTimeRange merged_refresh_window;
|
||||||
long max_materializations;
|
|
||||||
|
|
||||||
/* Lock the continuous aggregate's materialized hypertable to protect
|
/* Lock the continuous aggregate's materialized hypertable to protect
|
||||||
* against concurrent refreshes. Only concurrent reads will be
|
* against concurrent refreshes. Only concurrent reads will be
|
||||||
@ -674,12 +625,11 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
|
|||||||
LockRelationOid(hyper_relid, ExclusiveLock);
|
LockRelationOid(hyper_relid, ExclusiveLock);
|
||||||
const CaggsInfo all_caggs_info =
|
const CaggsInfo all_caggs_info =
|
||||||
ts_continuous_agg_get_all_caggs_info(cagg->data.raw_hypertable_id);
|
ts_continuous_agg_get_all_caggs_info(cagg->data.raw_hypertable_id);
|
||||||
max_materializations = materialization_per_refresh_window();
|
|
||||||
invalidations = invalidation_process_cagg_log(cagg->data.mat_hypertable_id,
|
invalidations = invalidation_process_cagg_log(cagg->data.mat_hypertable_id,
|
||||||
cagg->data.raw_hypertable_id,
|
cagg->data.raw_hypertable_id,
|
||||||
refresh_window,
|
refresh_window,
|
||||||
&all_caggs_info,
|
&all_caggs_info,
|
||||||
max_materializations,
|
ts_guc_cagg_max_individual_materializations,
|
||||||
&do_merged_refresh,
|
&do_merged_refresh,
|
||||||
&merged_refresh_window);
|
&merged_refresh_window);
|
||||||
|
|
||||||
|
@ -1201,32 +1201,33 @@ INSERT INTO conditions VALUES (120, 1, 1.0);
|
|||||||
INSERT INTO conditions VALUES (140, 1, 1.0);
|
INSERT INTO conditions VALUES (140, 1, 1.0);
|
||||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
|
\set ON_ERROR_STOP 0
|
||||||
-- Test acceptable values for materializations per refresh
|
-- Test acceptable values for materializations per refresh
|
||||||
SET timescaledb.materializations_per_refresh_window=' 5 ';
|
SET timescaledb.materializations_per_refresh_window=' 5 ';
|
||||||
INSERT INTO conditions VALUES (140, 1, 1.0);
|
INSERT INTO conditions VALUES (140, 1, 1.0);
|
||||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||||
-- Large value will be treated as LONG_MAX
|
-- Large value will be treated as LONG_MAX
|
||||||
SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347;
|
SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347;
|
||||||
|
ERROR: invalid value for parameter "timescaledb.materializations_per_refresh_window": "342239897234023842394249234766923492347"
|
||||||
|
HINT: Value exceeds integer range.
|
||||||
INSERT INTO conditions VALUES (140, 1, 1.0);
|
INSERT INTO conditions VALUES (140, 1, 1.0);
|
||||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||||
-- Test bad values for materializations per refresh
|
-- Test bad values for materializations per refresh
|
||||||
SET timescaledb.materializations_per_refresh_window='foo';
|
SET timescaledb.materializations_per_refresh_window='foo';
|
||||||
|
ERROR: invalid value for parameter "timescaledb.materializations_per_refresh_window": "foo"
|
||||||
INSERT INTO conditions VALUES (140, 1, 1.0);
|
INSERT INTO conditions VALUES (140, 1, 1.0);
|
||||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||||
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
|
|
||||||
DETAIL: Expected an integer but current value is "foo".
|
|
||||||
SET timescaledb.materializations_per_refresh_window='2bar';
|
SET timescaledb.materializations_per_refresh_window='2bar';
|
||||||
|
ERROR: invalid value for parameter "timescaledb.materializations_per_refresh_window": "2bar"
|
||||||
INSERT INTO conditions VALUES (140, 1, 1.0);
|
INSERT INTO conditions VALUES (140, 1, 1.0);
|
||||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||||
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
|
|
||||||
DETAIL: Expected an integer but current value is "2bar".
|
|
||||||
SET timescaledb.materializations_per_refresh_window='-';
|
SET timescaledb.materializations_per_refresh_window='-';
|
||||||
|
ERROR: invalid value for parameter "timescaledb.materializations_per_refresh_window": "-"
|
||||||
INSERT INTO conditions VALUES (140, 1, 1.0);
|
INSERT INTO conditions VALUES (140, 1, 1.0);
|
||||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||||
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
|
|
||||||
DETAIL: Expected an integer but current value is "-".
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
RESET timescaledb.materializations_per_refresh_window;
|
RESET timescaledb.materializations_per_refresh_window;
|
||||||
|
\set ON_ERROR_STOP 1
|
||||||
-- Test refresh with undefined invalidation threshold and variable sized buckets
|
-- Test refresh with undefined invalidation threshold and variable sized buckets
|
||||||
CREATE TABLE timestamp_ht (
|
CREATE TABLE timestamp_ht (
|
||||||
time timestamptz NOT NULL,
|
time timestamptz NOT NULL,
|
||||||
|
@ -698,6 +698,7 @@ INSERT INTO conditions VALUES (140, 1, 1.0);
|
|||||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||||
|
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
|
\set ON_ERROR_STOP 0
|
||||||
-- Test acceptable values for materializations per refresh
|
-- Test acceptable values for materializations per refresh
|
||||||
SET timescaledb.materializations_per_refresh_window=' 5 ';
|
SET timescaledb.materializations_per_refresh_window=' 5 ';
|
||||||
INSERT INTO conditions VALUES (140, 1, 1.0);
|
INSERT INTO conditions VALUES (140, 1, 1.0);
|
||||||
@ -720,6 +721,7 @@ INSERT INTO conditions VALUES (140, 1, 1.0);
|
|||||||
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
CALL refresh_continuous_aggregate('cond_10', 0, 200);
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
RESET timescaledb.materializations_per_refresh_window;
|
RESET timescaledb.materializations_per_refresh_window;
|
||||||
|
\set ON_ERROR_STOP 1
|
||||||
|
|
||||||
-- Test refresh with undefined invalidation threshold and variable sized buckets
|
-- Test refresh with undefined invalidation threshold and variable sized buckets
|
||||||
CREATE TABLE timestamp_ht (
|
CREATE TABLE timestamp_ht (
|
||||||
|
Loading…
x
Reference in New Issue
Block a user