diff --git a/sql/partialize_finalize.sql b/sql/partialize_finalize.sql index a41cd97c3..8ec8e914c 100644 --- a/sql/partialize_finalize.sql +++ b/sql/partialize_finalize.sql @@ -2,30 +2,24 @@ -- Please see the included NOTICE for copyright information and -- LICENSE-APACHE for a copy of the license. --- These wrapper functions are used to push down aggregation to data nodes. --- They can be marked parallel-safe, and the parallel plan will be chosen --- depending on whether the underlying aggregate function itself is --- parallel-safe. - CREATE OR REPLACE FUNCTION _timescaledb_internal.partialize_agg(arg ANYELEMENT) -RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C STABLE PARALLEL SAFE; +RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C VOLATILE; CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_sfunc( tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT) RETURNS internal AS '@MODULE_PATHNAME@', 'ts_finalize_agg_sfunc' -LANGUAGE C IMMUTABLE PARALLEL SAFE; +LANGUAGE C IMMUTABLE; CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_ffunc( tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT) RETURNS anyelement AS '@MODULE_PATHNAME@', 'ts_finalize_agg_ffunc' -LANGUAGE C IMMUTABLE PARALLEL SAFE; +LANGUAGE C IMMUTABLE; CREATE OR REPLACE AGGREGATE _timescaledb_internal.finalize_agg(agg_name TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val anyelement) ( SFUNC = _timescaledb_internal.finalize_agg_sfunc, STYPE = internal, FINALFUNC = _timescaledb_internal.finalize_agg_ffunc, - FINALFUNC_EXTRA, - PARALLEL = SAFE + FINALFUNC_EXTRA ); diff --git a/tsl/test/expected/partialize_finalize.out b/tsl/test/expected/partialize_finalize.out index 59740dc4f..eb46e5eb1 100644 --- a/tsl/test/expected/partialize_finalize.out +++ b/tsl/test/expected/partialize_finalize.out @@ -392,3 +392,296 @@ WARNING: type bigint t (1 row) +-- Issue 4922 +CREATE TABLE issue4922 ( + time TIMESTAMPTZ NOT NULL, + value INTEGER +); +SELECT create_hypertable('issue4922', 'time'); + create_hypertable +------------------------ + (2,public,issue4922,t) +(1 row) + +-- helper function: integer -> pseudorandom integer [0..100]. +CREATE OR REPLACE FUNCTION mix(x INTEGER) RETURNS INTEGER AS $$ SELECT (((hashint4(x) / (pow(2, 31) - 1) + 1) / 2) * 100)::INTEGER $$ LANGUAGE SQL; +INSERT INTO issue4922 (time, value) +SELECT '2022-01-01 00:00:00-03'::timestamptz + interval '1 year' * mix(x), mix(x) +FROM generate_series(1, 100000) x(x); +SET force_parallel_mode = 'on'; +SET parallel_setup_cost = 0; +SELECT + sum(value), + avg(value), + min(value), + max(value), + count(*) +FROM issue4922; + sum | avg | min | max | count +---------+---------------------+-----+-----+-------- + 5001129 | 50.0112900000000000 | 0 | 100 | 100000 +(1 row) + +-- The results should be the EQUAL TO the previous query +SELECT + _timescaledb_internal.finalize_agg('pg_catalog.sum(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::bigint) AS sum, + _timescaledb_internal.finalize_agg('pg_catalog.avg(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg, + _timescaledb_internal.finalize_agg('pg_catalog.min(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::integer) AS min, + _timescaledb_internal.finalize_agg('pg_catalog.max(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::integer) AS max, + _timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count +FROM ( + SELECT + _timescaledb_internal.partialize_agg(sum(value)) AS partial_sum, + _timescaledb_internal.partialize_agg(avg(value)) AS partial_avg, + _timescaledb_internal.partialize_agg(min(value)) AS partial_min, + _timescaledb_internal.partialize_agg(max(value)) AS partial_max, + _timescaledb_internal.partialize_agg(count(*)) AS partial_count + FROM public.issue4922) AS a; + sum | avg | min | max | count +---------+---------------------+-----+-----+-------- + 5001129 | 50.0112900000000000 | 0 | 100 | 100000 +(1 row) + +-- Check for parallel planning +EXPLAIN (COSTS OFF) +SELECT + sum(value), + avg(value), + min(value), + max(value), + count(*) +FROM issue4922; + QUERY PLAN +----------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Append + -> Parallel Seq Scan on _hyper_2_4_chunk + -> Parallel Seq Scan on _hyper_2_5_chunk + -> Parallel Seq Scan on _hyper_2_6_chunk + -> Parallel Seq Scan on _hyper_2_7_chunk + -> Parallel Seq Scan on _hyper_2_8_chunk + -> Parallel Seq Scan on _hyper_2_9_chunk + -> Parallel Seq Scan on _hyper_2_10_chunk + -> Parallel Seq Scan on _hyper_2_11_chunk + -> Parallel Seq Scan on _hyper_2_12_chunk + -> Parallel Seq Scan on _hyper_2_13_chunk + -> Parallel Seq Scan on _hyper_2_14_chunk + -> Parallel Seq Scan on _hyper_2_15_chunk + -> Parallel Seq Scan on _hyper_2_16_chunk + -> Parallel Seq Scan on _hyper_2_17_chunk + -> Parallel Seq Scan on _hyper_2_18_chunk + -> Parallel Seq Scan on _hyper_2_19_chunk + -> Parallel Seq Scan on _hyper_2_20_chunk + -> Parallel Seq Scan on _hyper_2_21_chunk + -> Parallel Seq Scan on _hyper_2_22_chunk + -> Parallel Seq Scan on _hyper_2_23_chunk + -> Parallel Seq Scan on _hyper_2_24_chunk + -> Parallel Seq Scan on _hyper_2_25_chunk + -> Parallel Seq Scan on _hyper_2_26_chunk + -> Parallel Seq Scan on _hyper_2_27_chunk + -> Parallel Seq Scan on _hyper_2_28_chunk + -> Parallel Seq Scan on _hyper_2_29_chunk + -> Parallel Seq Scan on _hyper_2_30_chunk + -> Parallel Seq Scan on _hyper_2_31_chunk + -> Parallel Seq Scan on _hyper_2_32_chunk + -> Parallel Seq Scan on _hyper_2_33_chunk + -> Parallel Seq Scan on _hyper_2_34_chunk + -> Parallel Seq Scan on _hyper_2_35_chunk + -> Parallel Seq Scan on _hyper_2_36_chunk + -> Parallel Seq Scan on _hyper_2_37_chunk + -> Parallel Seq Scan on _hyper_2_38_chunk + -> Parallel Seq Scan on _hyper_2_39_chunk + -> Parallel Seq Scan on _hyper_2_40_chunk + -> Parallel Seq Scan on _hyper_2_41_chunk + -> Parallel Seq Scan on _hyper_2_42_chunk + -> Parallel Seq Scan on _hyper_2_43_chunk + -> Parallel Seq Scan on _hyper_2_44_chunk + -> Parallel Seq Scan on _hyper_2_45_chunk + -> Parallel Seq Scan on _hyper_2_46_chunk + -> Parallel Seq Scan on _hyper_2_47_chunk + -> Parallel Seq Scan on _hyper_2_48_chunk + -> Parallel Seq Scan on _hyper_2_49_chunk + -> Parallel Seq Scan on _hyper_2_50_chunk + -> Parallel Seq Scan on _hyper_2_51_chunk + -> Parallel Seq Scan on _hyper_2_52_chunk + -> Parallel Seq Scan on _hyper_2_53_chunk + -> Parallel Seq Scan on _hyper_2_54_chunk + -> Parallel Seq Scan on _hyper_2_55_chunk + -> Parallel Seq Scan on _hyper_2_56_chunk + -> Parallel Seq Scan on _hyper_2_57_chunk + -> Parallel Seq Scan on _hyper_2_58_chunk + -> Parallel Seq Scan on _hyper_2_59_chunk + -> Parallel Seq Scan on _hyper_2_60_chunk + -> Parallel Seq Scan on _hyper_2_61_chunk + -> Parallel Seq Scan on _hyper_2_62_chunk + -> Parallel Seq Scan on _hyper_2_63_chunk + -> Parallel Seq Scan on _hyper_2_64_chunk + -> Parallel Seq Scan on _hyper_2_65_chunk + -> Parallel Seq Scan on _hyper_2_66_chunk + -> Parallel Seq Scan on _hyper_2_67_chunk + -> Parallel Seq Scan on _hyper_2_68_chunk + -> Parallel Seq Scan on _hyper_2_69_chunk + -> Parallel Seq Scan on _hyper_2_70_chunk + -> Parallel Seq Scan on _hyper_2_71_chunk + -> Parallel Seq Scan on _hyper_2_72_chunk + -> Parallel Seq Scan on _hyper_2_73_chunk + -> Parallel Seq Scan on _hyper_2_74_chunk + -> Parallel Seq Scan on _hyper_2_75_chunk + -> Parallel Seq Scan on _hyper_2_76_chunk + -> Parallel Seq Scan on _hyper_2_77_chunk + -> Parallel Seq Scan on _hyper_2_78_chunk + -> Parallel Seq Scan on _hyper_2_79_chunk + -> Parallel Seq Scan on _hyper_2_80_chunk + -> Parallel Seq Scan on _hyper_2_81_chunk + -> Parallel Seq Scan on _hyper_2_82_chunk + -> Parallel Seq Scan on _hyper_2_83_chunk + -> Parallel Seq Scan on _hyper_2_84_chunk + -> Parallel Seq Scan on _hyper_2_85_chunk + -> Parallel Seq Scan on _hyper_2_86_chunk + -> Parallel Seq Scan on _hyper_2_87_chunk + -> Parallel Seq Scan on _hyper_2_88_chunk + -> Parallel Seq Scan on _hyper_2_89_chunk + -> Parallel Seq Scan on _hyper_2_90_chunk + -> Parallel Seq Scan on _hyper_2_91_chunk + -> Parallel Seq Scan on _hyper_2_92_chunk + -> Parallel Seq Scan on _hyper_2_93_chunk + -> Parallel Seq Scan on _hyper_2_94_chunk + -> Parallel Seq Scan on _hyper_2_95_chunk + -> Parallel Seq Scan on _hyper_2_96_chunk + -> Parallel Seq Scan on _hyper_2_97_chunk + -> Parallel Seq Scan on _hyper_2_98_chunk + -> Parallel Seq Scan on _hyper_2_99_chunk + -> Parallel Seq Scan on _hyper_2_100_chunk + -> Parallel Seq Scan on _hyper_2_101_chunk + -> Parallel Seq Scan on _hyper_2_102_chunk + -> Parallel Seq Scan on _hyper_2_103_chunk + -> Parallel Seq Scan on _hyper_2_104_chunk +(106 rows) + +-- Make sure even forcing the parallel mode those functions are not safe for parallel +EXPLAIN (COSTS OFF) +SELECT + _timescaledb_internal.finalize_agg('pg_catalog.sum(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::bigint) AS sum, + _timescaledb_internal.finalize_agg('pg_catalog.avg(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg, + _timescaledb_internal.finalize_agg('pg_catalog.min(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::integer) AS min, + _timescaledb_internal.finalize_agg('pg_catalog.max(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::integer) AS max, + _timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count +FROM ( + SELECT + _timescaledb_internal.partialize_agg(sum(value)) AS partial_sum, + _timescaledb_internal.partialize_agg(avg(value)) AS partial_avg, + _timescaledb_internal.partialize_agg(min(value)) AS partial_min, + _timescaledb_internal.partialize_agg(max(value)) AS partial_max, + _timescaledb_internal.partialize_agg(count(*)) AS partial_count + FROM public.issue4922) AS a; + QUERY PLAN +-------------------------------------------------- + Aggregate + -> Partial Aggregate + -> Append + -> Seq Scan on _hyper_2_4_chunk + -> Seq Scan on _hyper_2_5_chunk + -> Seq Scan on _hyper_2_6_chunk + -> Seq Scan on _hyper_2_7_chunk + -> Seq Scan on _hyper_2_8_chunk + -> Seq Scan on _hyper_2_9_chunk + -> Seq Scan on _hyper_2_10_chunk + -> Seq Scan on _hyper_2_11_chunk + -> Seq Scan on _hyper_2_12_chunk + -> Seq Scan on _hyper_2_13_chunk + -> Seq Scan on _hyper_2_14_chunk + -> Seq Scan on _hyper_2_15_chunk + -> Seq Scan on _hyper_2_16_chunk + -> Seq Scan on _hyper_2_17_chunk + -> Seq Scan on _hyper_2_18_chunk + -> Seq Scan on _hyper_2_19_chunk + -> Seq Scan on _hyper_2_20_chunk + -> Seq Scan on _hyper_2_21_chunk + -> Seq Scan on _hyper_2_22_chunk + -> Seq Scan on _hyper_2_23_chunk + -> Seq Scan on _hyper_2_24_chunk + -> Seq Scan on _hyper_2_25_chunk + -> Seq Scan on _hyper_2_26_chunk + -> Seq Scan on _hyper_2_27_chunk + -> Seq Scan on _hyper_2_28_chunk + -> Seq Scan on _hyper_2_29_chunk + -> Seq Scan on _hyper_2_30_chunk + -> Seq Scan on _hyper_2_31_chunk + -> Seq Scan on _hyper_2_32_chunk + -> Seq Scan on _hyper_2_33_chunk + -> Seq Scan on _hyper_2_34_chunk + -> Seq Scan on _hyper_2_35_chunk + -> Seq Scan on _hyper_2_36_chunk + -> Seq Scan on _hyper_2_37_chunk + -> Seq Scan on _hyper_2_38_chunk + -> Seq Scan on _hyper_2_39_chunk + -> Seq Scan on _hyper_2_40_chunk + -> Seq Scan on _hyper_2_41_chunk + -> Seq Scan on _hyper_2_42_chunk + -> Seq Scan on _hyper_2_43_chunk + -> Seq Scan on _hyper_2_44_chunk + -> Seq Scan on _hyper_2_45_chunk + -> Seq Scan on _hyper_2_46_chunk + -> Seq Scan on _hyper_2_47_chunk + -> Seq Scan on _hyper_2_48_chunk + -> Seq Scan on _hyper_2_49_chunk + -> Seq Scan on _hyper_2_50_chunk + -> Seq Scan on _hyper_2_51_chunk + -> Seq Scan on _hyper_2_52_chunk + -> Seq Scan on _hyper_2_53_chunk + -> Seq Scan on _hyper_2_54_chunk + -> Seq Scan on _hyper_2_55_chunk + -> Seq Scan on _hyper_2_56_chunk + -> Seq Scan on _hyper_2_57_chunk + -> Seq Scan on _hyper_2_58_chunk + -> Seq Scan on _hyper_2_59_chunk + -> Seq Scan on _hyper_2_60_chunk + -> Seq Scan on _hyper_2_61_chunk + -> Seq Scan on _hyper_2_62_chunk + -> Seq Scan on _hyper_2_63_chunk + -> Seq Scan on _hyper_2_64_chunk + -> Seq Scan on _hyper_2_65_chunk + -> Seq Scan on _hyper_2_66_chunk + -> Seq Scan on _hyper_2_67_chunk + -> Seq Scan on _hyper_2_68_chunk + -> Seq Scan on _hyper_2_69_chunk + -> Seq Scan on _hyper_2_70_chunk + -> Seq Scan on _hyper_2_71_chunk + -> Seq Scan on _hyper_2_72_chunk + -> Seq Scan on _hyper_2_73_chunk + -> Seq Scan on _hyper_2_74_chunk + -> Seq Scan on _hyper_2_75_chunk + -> Seq Scan on _hyper_2_76_chunk + -> Seq Scan on _hyper_2_77_chunk + -> Seq Scan on _hyper_2_78_chunk + -> Seq Scan on _hyper_2_79_chunk + -> Seq Scan on _hyper_2_80_chunk + -> Seq Scan on _hyper_2_81_chunk + -> Seq Scan on _hyper_2_82_chunk + -> Seq Scan on _hyper_2_83_chunk + -> Seq Scan on _hyper_2_84_chunk + -> Seq Scan on _hyper_2_85_chunk + -> Seq Scan on _hyper_2_86_chunk + -> Seq Scan on _hyper_2_87_chunk + -> Seq Scan on _hyper_2_88_chunk + -> Seq Scan on _hyper_2_89_chunk + -> Seq Scan on _hyper_2_90_chunk + -> Seq Scan on _hyper_2_91_chunk + -> Seq Scan on _hyper_2_92_chunk + -> Seq Scan on _hyper_2_93_chunk + -> Seq Scan on _hyper_2_94_chunk + -> Seq Scan on _hyper_2_95_chunk + -> Seq Scan on _hyper_2_96_chunk + -> Seq Scan on _hyper_2_97_chunk + -> Seq Scan on _hyper_2_98_chunk + -> Seq Scan on _hyper_2_99_chunk + -> Seq Scan on _hyper_2_100_chunk + -> Seq Scan on _hyper_2_101_chunk + -> Seq Scan on _hyper_2_102_chunk + -> Seq Scan on _hyper_2_103_chunk + -> Seq Scan on _hyper_2_104_chunk +(104 rows) + diff --git a/tsl/test/sql/partialize_finalize.sql b/tsl/test/sql/partialize_finalize.sql index 2aebb201d..746022abc 100644 --- a/tsl/test/sql/partialize_finalize.sql +++ b/tsl/test/sql/partialize_finalize.sql @@ -296,3 +296,73 @@ select _timescaledb_internal.finalize_agg( 'aggregate_to_test_ffunc_extra(int, a with cte as (SELECT _timescaledb_internal.partialize_agg(aggregate_to_test_ffunc_extra(8, 1::bigint)) as part) select _timescaledb_internal.finalize_agg( 'aggregate_to_test_ffunc_extra(int, anyelement)', null, null, array[array['pg_catalog'::name, 'int4'::name], array['pg_catalog', 'int8']], part, null::text) is null from cte; + + +-- Issue 4922 +CREATE TABLE issue4922 ( + time TIMESTAMPTZ NOT NULL, + value INTEGER +); + +SELECT create_hypertable('issue4922', 'time'); + +-- helper function: integer -> pseudorandom integer [0..100]. +CREATE OR REPLACE FUNCTION mix(x INTEGER) RETURNS INTEGER AS $$ SELECT (((hashint4(x) / (pow(2, 31) - 1) + 1) / 2) * 100)::INTEGER $$ LANGUAGE SQL; + +INSERT INTO issue4922 (time, value) +SELECT '2022-01-01 00:00:00-03'::timestamptz + interval '1 year' * mix(x), mix(x) +FROM generate_series(1, 100000) x(x); + +SET force_parallel_mode = 'on'; +SET parallel_setup_cost = 0; + +SELECT + sum(value), + avg(value), + min(value), + max(value), + count(*) +FROM issue4922; + +-- The results should be the EQUAL TO the previous query +SELECT + _timescaledb_internal.finalize_agg('pg_catalog.sum(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::bigint) AS sum, + _timescaledb_internal.finalize_agg('pg_catalog.avg(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg, + _timescaledb_internal.finalize_agg('pg_catalog.min(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::integer) AS min, + _timescaledb_internal.finalize_agg('pg_catalog.max(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::integer) AS max, + _timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count +FROM ( + SELECT + _timescaledb_internal.partialize_agg(sum(value)) AS partial_sum, + _timescaledb_internal.partialize_agg(avg(value)) AS partial_avg, + _timescaledb_internal.partialize_agg(min(value)) AS partial_min, + _timescaledb_internal.partialize_agg(max(value)) AS partial_max, + _timescaledb_internal.partialize_agg(count(*)) AS partial_count + FROM public.issue4922) AS a; + +-- Check for parallel planning +EXPLAIN (COSTS OFF) +SELECT + sum(value), + avg(value), + min(value), + max(value), + count(*) +FROM issue4922; + +-- Make sure even forcing the parallel mode those functions are not safe for parallel +EXPLAIN (COSTS OFF) +SELECT + _timescaledb_internal.finalize_agg('pg_catalog.sum(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::bigint) AS sum, + _timescaledb_internal.finalize_agg('pg_catalog.avg(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg, + _timescaledb_internal.finalize_agg('pg_catalog.min(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::integer) AS min, + _timescaledb_internal.finalize_agg('pg_catalog.max(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::integer) AS max, + _timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count +FROM ( + SELECT + _timescaledb_internal.partialize_agg(sum(value)) AS partial_sum, + _timescaledb_internal.partialize_agg(avg(value)) AS partial_avg, + _timescaledb_internal.partialize_agg(min(value)) AS partial_min, + _timescaledb_internal.partialize_agg(max(value)) AS partial_max, + _timescaledb_internal.partialize_agg(count(*)) AS partial_count + FROM public.issue4922) AS a;