mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-15 01:53:41 +08:00
Mark partialize_agg as parallel safe
Postgres knows whether a given aggregate is parallel-safe, and creates parallel aggregation plans based on that. The `partialize_agg` is a wrapper we use to perform partial aggregation on data nodes. It is a pure function that produces serialized aggregation state as a result. Being pure, it doesn't influence parallel safety. This means we don't need to mark it parallel-unsafe to artificially disable the parallel plans for partial aggregation. They will be chosen as usual based on the parallel-safety of the underlying aggregate function.
This commit is contained in:
parent
1d0670e703
commit
5c0110cbbf
@ -2,24 +2,30 @@
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-APACHE for a copy of the license.
|
||||
|
||||
-- These wrapper functions are used to push down aggregation to data nodes.
|
||||
-- They can be marked parallel-safe, and the parallel plan will be chosen
|
||||
-- depending on whether the underlying aggregate function itself is
|
||||
-- parallel-safe.
|
||||
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.partialize_agg(arg ANYELEMENT)
|
||||
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C VOLATILE;
|
||||
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C STABLE PARALLEL SAFE;
|
||||
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_sfunc(
|
||||
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
|
||||
RETURNS internal
|
||||
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_sfunc'
|
||||
LANGUAGE C IMMUTABLE ;
|
||||
LANGUAGE C IMMUTABLE PARALLEL SAFE;
|
||||
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_ffunc(
|
||||
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
|
||||
RETURNS anyelement
|
||||
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_ffunc'
|
||||
LANGUAGE C IMMUTABLE ;
|
||||
LANGUAGE C IMMUTABLE PARALLEL SAFE;
|
||||
|
||||
CREATE OR REPLACE AGGREGATE _timescaledb_internal.finalize_agg(agg_name TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val anyelement) (
|
||||
SFUNC = _timescaledb_internal.finalize_agg_sfunc,
|
||||
STYPE = internal,
|
||||
FINALFUNC = _timescaledb_internal.finalize_agg_ffunc,
|
||||
FINALFUNC_EXTRA
|
||||
FINALFUNC_EXTRA,
|
||||
PARALLEL = SAFE
|
||||
);
|
||||
|
@ -81,6 +81,7 @@ ${PSQL} -U ${TEST_PGUSER} \
|
||||
sed -e '/<exclude_from_test>/,/<\/exclude_from_test>/d' \
|
||||
-e 's!_[0-9]\{1,\}_[0-9]\{1,\}_chunk!_X_X_chunk!g' \
|
||||
-e 's!^ \{1,\}QUERY PLAN \{1,\}$!QUERY PLAN!' \
|
||||
-e 's!: actual rows!: actual rows!' \
|
||||
-e '/^-\{1,\}$/d' \
|
||||
-e 's!\(_timescaledb_internal.chunks_in([^,]\{1,\}, ARRAY\[\)[^]]\{1,\}\]!\1..]!' \
|
||||
-e 's! Memory: [0-9]\{1,\}kB!!' \
|
||||
|
104
tsl/test/shared/expected/dist_parallel_agg.out
Normal file
104
tsl/test/shared/expected/dist_parallel_agg.out
Normal file
@ -0,0 +1,104 @@
|
||||
-- This file and its contents are licensed under the Timescale License.
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-TIMESCALE for a copy of the license.
|
||||
-- Test that for parallel-safe aggregate function a parallel plan is generated
|
||||
-- on data nodes, and for unsafe it is not. We use a manually created safe
|
||||
-- function and not a builtin one, to check that we can in fact create a
|
||||
-- function that is parallelized, to prevent a false negative (i.e. it's not
|
||||
-- parallelized, but for a different reason, not because it's unsafe).
|
||||
-- Create a relatively big table on one data node to test parallel plans and
|
||||
-- avoid flakiness.
|
||||
create table metrics_dist1(like metrics_dist);
|
||||
select table_name from create_distributed_hypertable('metrics_dist1', 'time', 'device_id',
|
||||
data_nodes => '{"data_node_1"}');
|
||||
WARNING: only one data node was assigned to the hypertable
|
||||
table_name
|
||||
metrics_dist1
|
||||
(1 row)
|
||||
|
||||
insert into metrics_dist1 select * from metrics_dist order by metrics_dist limit 20000;
|
||||
\set safe 'create or replace aggregate ts_debug_shippable_safe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = safe);'
|
||||
\set unsafe 'create or replace aggregate ts_debug_shippable_unsafe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = unsafe);'
|
||||
:safe
|
||||
call distributed_exec(:'safe');
|
||||
:unsafe
|
||||
call distributed_exec(:'unsafe');
|
||||
call distributed_exec($$ set parallel_tuple_cost = 0; $$);
|
||||
call distributed_exec($$ set parallel_setup_cost = 0; $$);
|
||||
call distributed_exec($$ set max_parallel_workers_per_gather = 1; $$);
|
||||
set timescaledb.enable_remote_explain = 1;
|
||||
set enable_partitionwise_aggregate = 1;
|
||||
\set analyze 'explain (analyze, verbose, costs off, timing off, summary off)'
|
||||
:analyze
|
||||
select count(*) from metrics_dist1;
|
||||
QUERY PLAN
|
||||
Custom Scan (DataNodeScan) (actual rows=1 loops=1)
|
||||
Output: (count(*))
|
||||
Relations: Aggregate on (public.metrics_dist1)
|
||||
Data node: data_node_1
|
||||
Fetcher Type: Row by row
|
||||
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
|
||||
Remote SQL: SELECT count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..])
|
||||
Remote EXPLAIN:
|
||||
Finalize Aggregate (actual rows=1 loops=1)
|
||||
Output: count(*)
|
||||
-> Gather (actual rows=2 loops=1)
|
||||
Output: (PARTIAL count(*))
|
||||
Workers Planned: 1
|
||||
Workers Launched: 1
|
||||
-> Partial Aggregate (actual rows=1 loops=2)
|
||||
Output: PARTIAL count(*)
|
||||
Worker 0: actual rows=1 loops=1
|
||||
-> Parallel Append (actual rows=10000 loops=2)
|
||||
Worker 0: actual rows=0 loops=1
|
||||
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1)
|
||||
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1)
|
||||
|
||||
(22 rows)
|
||||
|
||||
:analyze
|
||||
select ts_debug_shippable_safe_count(*) from metrics_dist1;
|
||||
QUERY PLAN
|
||||
Custom Scan (DataNodeScan) (actual rows=1 loops=1)
|
||||
Output: (ts_debug_shippable_safe_count(*))
|
||||
Relations: Aggregate on (public.metrics_dist1)
|
||||
Data node: data_node_1
|
||||
Fetcher Type: Row by row
|
||||
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
|
||||
Remote SQL: SELECT public.ts_debug_shippable_safe_count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..])
|
||||
Remote EXPLAIN:
|
||||
Finalize Aggregate (actual rows=1 loops=1)
|
||||
Output: public.ts_debug_shippable_safe_count(*)
|
||||
-> Gather (actual rows=2 loops=1)
|
||||
Output: (PARTIAL public.ts_debug_shippable_safe_count(*))
|
||||
Workers Planned: 1
|
||||
Workers Launched: 1
|
||||
-> Partial Aggregate (actual rows=1 loops=2)
|
||||
Output: PARTIAL public.ts_debug_shippable_safe_count(*)
|
||||
Worker 0: actual rows=1 loops=1
|
||||
-> Parallel Append (actual rows=10000 loops=2)
|
||||
Worker 0: actual rows=0 loops=1
|
||||
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1)
|
||||
-> Parallel Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1)
|
||||
|
||||
(22 rows)
|
||||
|
||||
:analyze
|
||||
select ts_debug_shippable_unsafe_count(*) from metrics_dist1;
|
||||
QUERY PLAN
|
||||
Custom Scan (DataNodeScan) (actual rows=1 loops=1)
|
||||
Output: (ts_debug_shippable_unsafe_count(*))
|
||||
Relations: Aggregate on (public.metrics_dist1)
|
||||
Data node: data_node_1
|
||||
Fetcher Type: Row by row
|
||||
Chunks: _dist_hyper_X_X_chunk, _dist_hyper_X_X_chunk
|
||||
Remote SQL: SELECT public.ts_debug_shippable_unsafe_count(*) FROM public.metrics_dist1 WHERE _timescaledb_internal.chunks_in(public.metrics_dist1.*, ARRAY[..])
|
||||
Remote EXPLAIN:
|
||||
Aggregate (actual rows=1 loops=1)
|
||||
Output: public.ts_debug_shippable_unsafe_count(*)
|
||||
-> Append (actual rows=20000 loops=1)
|
||||
-> Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=17990 loops=1)
|
||||
-> Seq Scan on _timescaledb_internal._dist_hyper_X_X_chunk (actual rows=2010 loops=1)
|
||||
|
||||
(14 rows)
|
||||
|
@ -22,8 +22,8 @@ if((${PG_VERSION_MAJOR} GREATER_EQUAL "14"))
|
||||
endif()
|
||||
|
||||
if(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
list(APPEND TEST_FILES_SHARED dist_remote_error.sql timestamp_limits.sql
|
||||
with_clause_parser.sql)
|
||||
list(APPEND TEST_FILES_SHARED dist_parallel_agg.sql dist_remote_error.sql
|
||||
timestamp_limits.sql with_clause_parser.sql)
|
||||
list(APPEND TEST_TEMPLATES_SHARED constify_now.sql.in)
|
||||
endif(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
|
||||
|
42
tsl/test/shared/sql/dist_parallel_agg.sql
Normal file
42
tsl/test/shared/sql/dist_parallel_agg.sql
Normal file
@ -0,0 +1,42 @@
|
||||
-- This file and its contents are licensed under the Timescale License.
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-TIMESCALE for a copy of the license.
|
||||
|
||||
-- Test that for parallel-safe aggregate function a parallel plan is generated
|
||||
-- on data nodes, and for unsafe it is not. We use a manually created safe
|
||||
-- function and not a builtin one, to check that we can in fact create a
|
||||
-- function that is parallelized, to prevent a false negative (i.e. it's not
|
||||
-- parallelized, but for a different reason, not because it's unsafe).
|
||||
|
||||
-- Create a relatively big table on one data node to test parallel plans and
|
||||
-- avoid flakiness.
|
||||
create table metrics_dist1(like metrics_dist);
|
||||
select table_name from create_distributed_hypertable('metrics_dist1', 'time', 'device_id',
|
||||
data_nodes => '{"data_node_1"}');
|
||||
insert into metrics_dist1 select * from metrics_dist order by metrics_dist limit 20000;
|
||||
|
||||
\set safe 'create or replace aggregate ts_debug_shippable_safe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = safe);'
|
||||
\set unsafe 'create or replace aggregate ts_debug_shippable_unsafe_count(*) (sfunc = int8inc, combinefunc=int8pl, stype = bigint, initcond = 0, parallel = unsafe);'
|
||||
|
||||
:safe
|
||||
call distributed_exec(:'safe');
|
||||
:unsafe
|
||||
call distributed_exec(:'unsafe');
|
||||
|
||||
call distributed_exec($$ set parallel_tuple_cost = 0; $$);
|
||||
call distributed_exec($$ set parallel_setup_cost = 0; $$);
|
||||
call distributed_exec($$ set max_parallel_workers_per_gather = 1; $$);
|
||||
|
||||
set timescaledb.enable_remote_explain = 1;
|
||||
set enable_partitionwise_aggregate = 1;
|
||||
|
||||
\set analyze 'explain (analyze, verbose, costs off, timing off, summary off)'
|
||||
|
||||
:analyze
|
||||
select count(*) from metrics_dist1;
|
||||
|
||||
:analyze
|
||||
select ts_debug_shippable_safe_count(*) from metrics_dist1;
|
||||
|
||||
:analyze
|
||||
select ts_debug_shippable_unsafe_count(*) from metrics_dist1;
|
Loading…
x
Reference in New Issue
Block a user