From d878b9ee29f291c36c5aa6c3fa490fb4538af717 Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Mon, 29 Jan 2024 09:09:01 +0100 Subject: [PATCH] Fix pathtarget adjustment for MergeAppend paths The chunk-wise aggregation pushdown code creates a copy of the existing paths; it copies the existing paths to create new ones with pushed-down aggregates. However, the copy_merge_append_path function behaves differently than other copy functions (e.g., copy_append_path); it resets the pathtarget on the copy. This leads to a wrong pathlist and crashes. This PR fixes the wrong pathtarget by setting it after the path is copied. --- .unreleased/fix_6571 | 1 + src/nodes/chunk_append/chunk_append.c | 3 +- src/nodes/chunk_append/chunk_append.h | 3 +- src/planner/partialize.c | 24 +++--- tsl/src/nodes/skip_scan/planner.c | 2 +- tsl/test/expected/agg_partials_pushdown.out | 85 +++++++++++++++++++++ tsl/test/sql/agg_partials_pushdown.sql | 56 ++++++++++++++ 7 files changed, 162 insertions(+), 12 deletions(-) create mode 100644 .unreleased/fix_6571 diff --git a/.unreleased/fix_6571 b/.unreleased/fix_6571 new file mode 100644 index 000000000..91211fc3d --- /dev/null +++ b/.unreleased/fix_6571 @@ -0,0 +1 @@ +Fixes: #6571 Fix pathtarget adjustment for MergeAppend paths in aggregation pushdown code diff --git a/src/nodes/chunk_append/chunk_append.c b/src/nodes/chunk_append/chunk_append.c index 496e94d3a..8804ca05f 100644 --- a/src/nodes/chunk_append/chunk_append.c +++ b/src/nodes/chunk_append/chunk_append.c @@ -69,7 +69,7 @@ create_group_subpath(PlannerInfo *root, RelOptInfo *rel, List *group, List *path } ChunkAppendPath * -ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths) +ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths, PathTarget *pathtarget) { ListCell *lc; double total_cost = 0, rows = 0; @@ -85,6 +85,7 @@ ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths) } new->cpath.path.total_cost = total_cost; new->cpath.path.rows = rows; + new->cpath.path.pathtarget = copy_pathtarget(pathtarget); return new; } diff --git a/src/nodes/chunk_append/chunk_append.h b/src/nodes/chunk_append/chunk_append.h index 10e4c6780..3d3426169 100644 --- a/src/nodes/chunk_append/chunk_append.h +++ b/src/nodes/chunk_append/chunk_append.h @@ -21,7 +21,8 @@ typedef struct ChunkAppendPath int first_partial_path; } ChunkAppendPath; -extern TSDLLEXPORT ChunkAppendPath *ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths); +extern TSDLLEXPORT ChunkAppendPath *ts_chunk_append_path_copy(ChunkAppendPath *ca, List *subpaths, + PathTarget *pathtarget); extern Path *ts_chunk_append_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht, Path *subpath, bool parallel_aware, bool ordered, List *nested_oids); diff --git a/src/planner/partialize.c b/src/planner/partialize.c index ca7eb180b..e22fc2af1 100644 --- a/src/planner/partialize.c +++ b/src/planner/partialize.c @@ -264,11 +264,13 @@ get_subpaths_from_append_path(Path *path, bool handle_gather_path) * Copy an AppendPath and set new subpaths. */ static AppendPath * -copy_append_path(AppendPath *path, List *subpaths) +copy_append_path(AppendPath *path, List *subpaths, PathTarget *pathtarget) { AppendPath *newPath = makeNode(AppendPath); memcpy(newPath, path, sizeof(AppendPath)); newPath->subpaths = subpaths; + newPath->path.pathtarget = copy_pathtarget(pathtarget); + cost_append(newPath); return newPath; @@ -278,7 +280,8 @@ copy_append_path(AppendPath *path, List *subpaths) * Copy a MergeAppendPath and set new subpaths. */ static MergeAppendPath * -copy_merge_append_path(PlannerInfo *root, MergeAppendPath *path, List *subpaths) +copy_merge_append_path(PlannerInfo *root, MergeAppendPath *path, List *subpaths, + PathTarget *pathtarget) { MergeAppendPath *newPath = create_merge_append_path_compat(root, path->path.parent, @@ -292,6 +295,7 @@ copy_merge_append_path(PlannerInfo *root, MergeAppendPath *path, List *subpaths) #endif newPath->path.param_info = path->path.param_info; + newPath->path.pathtarget = copy_pathtarget(pathtarget); return newPath; } @@ -305,21 +309,23 @@ copy_append_like_path(PlannerInfo *root, Path *path, List *new_subpaths, PathTar if (IsA(path, AppendPath)) { AppendPath *append_path = castNode(AppendPath, path); - append_path->path.pathtarget = pathtarget; - return (Path *) copy_append_path(append_path, new_subpaths); + AppendPath *new_append_path = copy_append_path(append_path, new_subpaths, pathtarget); + return &new_append_path->path; } else if (IsA(path, MergeAppendPath)) { MergeAppendPath *merge_append_path = castNode(MergeAppendPath, path); - merge_append_path->path.pathtarget = pathtarget; - return (Path *) copy_merge_append_path(root, merge_append_path, new_subpaths); + MergeAppendPath *new_merge_append_path = + copy_merge_append_path(root, merge_append_path, new_subpaths, pathtarget); + return &new_merge_append_path->path; } else if (ts_is_chunk_append_path(path)) { CustomPath *custom_path = castNode(CustomPath, path); ChunkAppendPath *chunk_append_path = (ChunkAppendPath *) custom_path; - chunk_append_path->cpath.path.pathtarget = pathtarget; - return (Path *) ts_chunk_append_path_copy(chunk_append_path, new_subpaths); + ChunkAppendPath *new_chunk_append_path = + ts_chunk_append_path_copy(chunk_append_path, new_subpaths, pathtarget); + return &new_chunk_append_path->cpath.path; } /* Should never happen, already checked by caller */ @@ -767,7 +773,7 @@ ts_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_rel if (!parse->hasAggs) return; - /* Groupting sets are not supported by the partial aggregation pushdown */ + /* Grouping sets are not supported by the partial aggregation pushdown */ if (parse->groupingSets) return; diff --git a/tsl/src/nodes/skip_scan/planner.c b/tsl/src/nodes/skip_scan/planner.c index 36efec536..aff74d288 100644 --- a/tsl/src/nodes/skip_scan/planner.c +++ b/tsl/src/nodes/skip_scan/planner.c @@ -292,7 +292,7 @@ tsl_skip_scan_paths_add(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *ou * information used for creating the original one and we don't want to * duplicate all the checks done when creating the original one. */ - subpath = (Path *) ts_chunk_append_path_copy(ca, new_paths); + subpath = (Path *) ts_chunk_append_path_copy(ca, new_paths, ca->cpath.path.pathtarget); } else { diff --git a/tsl/test/expected/agg_partials_pushdown.out b/tsl/test/expected/agg_partials_pushdown.out index 2a55bda40..7dbaf7071 100644 --- a/tsl/test/expected/agg_partials_pushdown.out +++ b/tsl/test/expected/agg_partials_pushdown.out @@ -510,3 +510,88 @@ SELECT timeCustom t, min(series_0) FROM PUBLIC.testtable2 GROUP BY t ORDER BY t Worker 0: actual rows=1 loops=1 (33 rows) +RESET timescaledb.enable_chunkwise_aggregation; +RESET enable_hashagg; +-- Test aggregation pushdown with MergeAppend node +CREATE TABLE merge_append_test (start_time timestamptz, sensor_id int, cluster varchar (253), cost_recommendation_memory numeric); +SELECT * FROM create_hypertable('merge_append_test', 'start_time'); +WARNING: column type "character varying" used for "cluster" does not follow best practices +NOTICE: adding not-null constraint to column "start_time" + hypertable_id | schema_name | table_name | created +---------------+-------------+-------------------+--------- + 4 | public | merge_append_test | t +(1 row) + +CREATE INDEX merge_append_test_sensorid ON merge_append_test USING btree (start_time, sensor_id); +INSERT INTO merge_append_test +SELECT + date_series, + 1, + 'production-1', + random() * 100 + FROM generate_series('2023-10-01 00:00:00', '2023-12-01 00:00:00', INTERVAL '1 hour') AS date_series +; +INSERT INTO merge_append_test +SELECT + date_series, + sensor_id, + 'production-2', + random() * 100 + FROM generate_series('2023-10-01 00:00:00', '2023-12-01 00:00:00', INTERVAL '1 hour') AS date_series, +generate_series(1, 100, 1) AS sensor_id +; +ANALYZE merge_append_test; +SET enable_seqscan = off; +SET random_page_cost = 0; +SET cpu_operator_cost = 0; +SET enable_hashagg = off; +RESET parallel_setup_cost; +RESET parallel_tuple_cost; +SELECT set_config(CASE WHEN current_setting('server_version_num')::int < 160000 THEN 'force_parallel_mode' ELSE 'debug_parallel_query' END, 'off', false); + set_config +------------ + off +(1 row) + +:PREFIX +SELECT + start_time, sensor_id, + SUM(cost_recommendation_memory) +FROM + merge_append_test +WHERE + start_time >= '2023-11-27 00:00:00Z' + AND start_time <= '2023-12-01 00:00:00Z' + AND sensor_id < 10 + AND CLUSTER = 'production-2' +GROUP BY + 1, 2; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Finalize GroupAggregate (actual rows=873 loops=1) + Output: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id, sum(_hyper_4_17_chunk.cost_recommendation_memory) + Group Key: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id + -> Merge Append (actual rows=873 loops=1) + Sort Key: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id + -> Partial GroupAggregate (actual rows=648 loops=1) + Output: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id, PARTIAL sum(_hyper_4_17_chunk.cost_recommendation_memory) + Group Key: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id + -> Index Scan using _hyper_4_17_chunk_merge_append_test_sensorid on _timescaledb_internal._hyper_4_17_chunk (actual rows=648 loops=1) + Output: _hyper_4_17_chunk.start_time, _hyper_4_17_chunk.sensor_id, _hyper_4_17_chunk.cost_recommendation_memory + Index Cond: ((_hyper_4_17_chunk.start_time >= 'Sun Nov 26 16:00:00 2023 PST'::timestamp with time zone) AND (_hyper_4_17_chunk.start_time <= 'Thu Nov 30 16:00:00 2023 PST'::timestamp with time zone) AND (_hyper_4_17_chunk.sensor_id < 10)) + Filter: ((_hyper_4_17_chunk.cluster)::text = 'production-2'::text) + Rows Removed by Filter: 72 + -> Partial GroupAggregate (actual rows=225 loops=1) + Output: _hyper_4_18_chunk.start_time, _hyper_4_18_chunk.sensor_id, PARTIAL sum(_hyper_4_18_chunk.cost_recommendation_memory) + Group Key: _hyper_4_18_chunk.start_time, _hyper_4_18_chunk.sensor_id + -> Index Scan using _hyper_4_18_chunk_merge_append_test_sensorid on _timescaledb_internal._hyper_4_18_chunk (actual rows=225 loops=1) + Output: _hyper_4_18_chunk.start_time, _hyper_4_18_chunk.sensor_id, _hyper_4_18_chunk.cost_recommendation_memory + Index Cond: ((_hyper_4_18_chunk.start_time >= 'Sun Nov 26 16:00:00 2023 PST'::timestamp with time zone) AND (_hyper_4_18_chunk.start_time <= 'Thu Nov 30 16:00:00 2023 PST'::timestamp with time zone) AND (_hyper_4_18_chunk.sensor_id < 10)) + Filter: ((_hyper_4_18_chunk.cluster)::text = 'production-2'::text) + Rows Removed by Filter: 25 +(21 rows) + +RESET enable_seqscan; +RESET random_page_cost; +RESET cpu_operator_cost; +RESET enable_hashagg; diff --git a/tsl/test/sql/agg_partials_pushdown.sql b/tsl/test/sql/agg_partials_pushdown.sql index f4a38771c..bc0ac7176 100644 --- a/tsl/test/sql/agg_partials_pushdown.sql +++ b/tsl/test/sql/agg_partials_pushdown.sql @@ -129,3 +129,59 @@ SELECT timeCustom t, min(series_0) FROM PUBLIC.testtable2 GROUP BY t ORDER BY t :PREFIX SELECT timeCustom t, min(series_0) FROM PUBLIC.testtable2 GROUP BY t ORDER BY t DESC NULLS LAST limit 2; + +RESET timescaledb.enable_chunkwise_aggregation; +RESET enable_hashagg; + +-- Test aggregation pushdown with MergeAppend node +CREATE TABLE merge_append_test (start_time timestamptz, sensor_id int, cluster varchar (253), cost_recommendation_memory numeric); +SELECT * FROM create_hypertable('merge_append_test', 'start_time'); +CREATE INDEX merge_append_test_sensorid ON merge_append_test USING btree (start_time, sensor_id); + +INSERT INTO merge_append_test +SELECT + date_series, + 1, + 'production-1', + random() * 100 + FROM generate_series('2023-10-01 00:00:00', '2023-12-01 00:00:00', INTERVAL '1 hour') AS date_series +; + +INSERT INTO merge_append_test +SELECT + date_series, + sensor_id, + 'production-2', + random() * 100 + FROM generate_series('2023-10-01 00:00:00', '2023-12-01 00:00:00', INTERVAL '1 hour') AS date_series, +generate_series(1, 100, 1) AS sensor_id +; + +ANALYZE merge_append_test; + +SET enable_seqscan = off; +SET random_page_cost = 0; +SET cpu_operator_cost = 0; +SET enable_hashagg = off; +RESET parallel_setup_cost; +RESET parallel_tuple_cost; +SELECT set_config(CASE WHEN current_setting('server_version_num')::int < 160000 THEN 'force_parallel_mode' ELSE 'debug_parallel_query' END, 'off', false); + +:PREFIX +SELECT + start_time, sensor_id, + SUM(cost_recommendation_memory) +FROM + merge_append_test +WHERE + start_time >= '2023-11-27 00:00:00Z' + AND start_time <= '2023-12-01 00:00:00Z' + AND sensor_id < 10 + AND CLUSTER = 'production-2' +GROUP BY + 1, 2; + +RESET enable_seqscan; +RESET random_page_cost; +RESET cpu_operator_cost; +RESET enable_hashagg;