From 36bff37daf68b37de3bc757d6820d86df5d82130 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Tue, 8 Oct 2019 11:12:35 +0200 Subject: [PATCH] Add support for non-partial paths to parallel ChunkAppend PostgreSQL will produce plans where a parallel append has children that are not partial paths. This patch adds support for those plans to ChunkAppend. --- src/chunk_append/chunk_append.c | 10 +++++- src/chunk_append/chunk_append.h | 1 + src/chunk_append/exec.c | 26 +++++++++++++-- src/chunk_append/exec.h | 2 ++ src/chunk_append/planner.c | 6 ++-- src/planner.c | 8 ----- test/expected/parallel-10.out | 56 +++++++++++++++++++++++++++++++++ test/expected/parallel-11.out | 56 +++++++++++++++++++++++++++++++++ test/expected/parallel-9.6.out | 46 +++++++++++++++++++++++++++ test/sql/parallel.sql.in | 16 ++++++++++ 10 files changed, 213 insertions(+), 14 deletions(-) diff --git a/src/chunk_append/chunk_append.c b/src/chunk_append/chunk_append.c index 8bda4109b..cd2746876 100644 --- a/src/chunk_append/chunk_append.c +++ b/src/chunk_append/chunk_append.c @@ -130,8 +130,16 @@ ts_chunk_append_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht, switch (nodeTag(subpath)) { case T_AppendPath: - children = castNode(AppendPath, subpath)->subpaths; + { + AppendPath *append = castNode(AppendPath, subpath); + +#if PG11_GE + if (append->path.parallel_aware && append->first_partial_path > 0) + path->first_partial_path = append->first_partial_path; +#endif + children = append->subpaths; break; + } case T_MergeAppendPath: /* * check if ordered append is applicable, only assert ordered here diff --git a/src/chunk_append/chunk_append.h b/src/chunk_append/chunk_append.h index 6a8b31bc7..62767ea33 100644 --- a/src/chunk_append/chunk_append.h +++ b/src/chunk_append/chunk_append.h @@ -19,6 +19,7 @@ typedef struct ChunkAppendPath bool runtime_exclusion; bool pushdown_limit; int limit_tuples; + int first_partial_path; } ChunkAppendPath; extern Path *ts_chunk_append_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht, diff --git a/src/chunk_append/exec.c b/src/chunk_append/exec.c index c0be28c0c..fe606a318 100644 --- a/src/chunk_append/exec.c +++ b/src/chunk_append/exec.c @@ -80,6 +80,7 @@ Node * chunk_append_state_create(CustomScan *cscan) { ChunkAppendState *state; + List *settings = linitial(cscan->custom_private); state = (ChunkAppendState *) newNode(sizeof(ChunkAppendState), T_CustomScanState); @@ -89,12 +90,14 @@ chunk_append_state_create(CustomScan *cscan) state->initial_ri_clauses = lsecond(cscan->custom_private); state->sort_options = lfourth(cscan->custom_private); - state->startup_exclusion = (bool) linitial_oid(linitial(cscan->custom_private)); - state->runtime_exclusion = (bool) lsecond_oid(linitial(cscan->custom_private)); - state->limit = lthird_oid(linitial(cscan->custom_private)); + state->startup_exclusion = (bool) linitial_int(settings); + state->runtime_exclusion = (bool) lsecond_int(settings); + state->limit = lthird_int(settings); + state->first_partial_plan = lfourth_int(settings); state->filtered_subplans = state->initial_subplans; state->filtered_ri_clauses = state->initial_ri_clauses; + state->filtered_first_partial_plan = state->first_partial_plan; state->current = INVALID_SUBPLAN_INDEX; state->choose_next_subplan = choose_next_subplan_non_parallel; @@ -115,6 +118,8 @@ do_startup_exclusion(ChunkAppendState *state) ListCell *lc_plan; ListCell *lc_clauses; ListCell *lc_constraints; + int i = -1; + int filtered_first_partial_plan = state->first_partial_plan; /* * create skeleton plannerinfo for estimate_expression_value @@ -144,6 +149,8 @@ do_startup_exclusion(ChunkAppendState *state) ListCell *lc; Scan *scan = chunk_append_get_scan_plan(lfirst(lc_plan)); + i++; + /* * If this is a base rel (chunk), check if it can be * excluded from the scan. Otherwise, fall through. @@ -159,7 +166,12 @@ do_startup_exclusion(ChunkAppendState *state) restrictinfos = constify_restrictinfos(&root, restrictinfos); if (can_exclude_chunk(lfirst(lc_constraints), restrictinfos)) + { + if (i < state->first_partial_plan) + filtered_first_partial_plan--; + continue; + } /* * if this node does runtime exclusion we keep the constified @@ -185,6 +197,7 @@ do_startup_exclusion(ChunkAppendState *state) state->filtered_subplans = filtered_children; state->filtered_ri_clauses = filtered_ri_clauses; state->filtered_constraints = filtered_constraints; + state->filtered_first_partial_plan = filtered_first_partial_plan; } /* @@ -489,6 +502,13 @@ choose_next_subplan_for_worker(ChunkAppendState *state) Assert(next_plan >= 0 && next_plan < state->num_subplans); state->current = next_plan; + /* + * if this is not a partial plan we mark it as finished + * immediately so it does not get assigned another worker + */ + if (next_plan < state->filtered_first_partial_plan) + pstate->finished[next_plan] = true; + /* advance next_plan for next worker */ pstate->next_plan = get_next_subplan(state, state->current); /* diff --git a/src/chunk_append/exec.h b/src/chunk_append/exec.h index 200ed3a2f..b75c07b1f 100644 --- a/src/chunk_append/exec.h +++ b/src/chunk_append/exec.h @@ -25,6 +25,8 @@ typedef struct ChunkAppendState MemoryContext exclusion_ctx; int num_subplans; + int first_partial_plan; + int filtered_first_partial_plan; int current; Oid ht_reloid; diff --git a/src/chunk_append/planner.c b/src/chunk_append/planner.c index 052b01d6d..0f78df2b4 100644 --- a/src/chunk_append/planner.c +++ b/src/chunk_append/planner.c @@ -261,8 +261,10 @@ chunk_append_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *path, L if (capath->pushdown_limit && capath->limit_tuples > 0) limit = capath->limit_tuples; - custom_private = list_make1( - list_make3_oid((Oid) capath->startup_exclusion, (Oid) capath->runtime_exclusion, limit)); + custom_private = list_make1(list_make4_int(capath->startup_exclusion, + capath->runtime_exclusion, + limit, + capath->first_partial_path)); custom_private = lappend(custom_private, chunk_ri_clauses); custom_private = lappend(custom_private, chunk_rt_indexes); custom_private = lappend(custom_private, sort_options); diff --git a/src/planner.c b/src/planner.c index 458c9c22f..207176fc9 100644 --- a/src/planner.c +++ b/src/planner.c @@ -216,14 +216,6 @@ should_chunk_append(PlannerInfo *root, RelOptInfo *rel, Path *path, bool ordered { ListCell *lc; -#if PG11_GE - AppendPath *append = castNode(AppendPath, path); - - /* ChunkAppend doesnt support mixing partial and non-partial paths */ - if (append->path.parallel_aware && append->first_partial_path > 0) - return false; -#endif - foreach (lc, rel->baserestrictinfo) { RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); diff --git a/test/expected/parallel-10.out b/test/expected/parallel-10.out index 49f82d0de..9d993f73a 100644 --- a/test/expected/parallel-10.out +++ b/test/expected/parallel-10.out @@ -11,6 +11,8 @@ SELECT END AS "PREFIX", 'EXPLAIN (costs off)' AS "PREFIX_NO_ANALYZE" \gset +\set CHUNK1 _timescaledb_internal._hyper_1_1_chunk +\set CHUNK2 _timescaledb_internal._hyper_1_2_chunk CREATE TABLE test (i int, j double precision, ts timestamp); SELECT create_hypertable('test','i',chunk_time_interval:=500000); NOTICE: adding not-null constraint to column "i" @@ -339,6 +341,60 @@ SELECT count(*) FROM "test" WHERE i >= 500000 AND length(version()) > 0; (1 row) RESET max_parallel_workers_per_gather; +-- test partial and non-partial plans +-- these will not be parallel on PG < 11 +ALTER TABLE :CHUNK1 SET (parallel_workers=0); +ALTER TABLE :CHUNK2 SET (parallel_workers=2); +:PREFIX SELECT count(*) FROM "test" WHERE i > 400000 AND length(version()) > 0; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------- + Gather (actual rows=1 loops=1) + Workers Planned: 1 + Workers Launched: 1 + Single Copy: true + -> Aggregate (actual rows=1 loops=1) + -> Result (actual rows=599999 loops=1) + One-Time Filter: (length(version()) > 0) + -> Custom Scan (ChunkAppend) on test (actual rows=599999 loops=1) + Chunks excluded during startup: 0 + -> Result (actual rows=99999 loops=1) + One-Time Filter: (length(version()) > 0) + -> Index Only Scan using _hyper_1_1_chunk_test_i_idx on _hyper_1_1_chunk (actual rows=99999 loops=1) + Index Cond: (i > 400000) + Heap Fetches: 0 + -> Result (actual rows=500000 loops=1) + One-Time Filter: (length(version()) > 0) + -> Seq Scan on _hyper_1_2_chunk (actual rows=500000 loops=1) + Filter: (i > 400000) +(18 rows) + +ALTER TABLE :CHUNK1 SET (parallel_workers=2); +ALTER TABLE :CHUNK2 SET (parallel_workers=0); +:PREFIX SELECT count(*) FROM "test" WHERE i < 600000 AND length(version()) > 0; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------- + Gather (actual rows=1 loops=1) + Workers Planned: 1 + Workers Launched: 1 + Single Copy: true + -> Aggregate (actual rows=1 loops=1) + -> Result (actual rows=600000 loops=1) + One-Time Filter: (length(version()) > 0) + -> Custom Scan (ChunkAppend) on test (actual rows=600000 loops=1) + Chunks excluded during startup: 0 + -> Result (actual rows=500000 loops=1) + One-Time Filter: (length(version()) > 0) + -> Seq Scan on _hyper_1_1_chunk (actual rows=500000 loops=1) + Filter: (i < 600000) + -> Result (actual rows=100000 loops=1) + One-Time Filter: (length(version()) > 0) + -> Index Only Scan using _hyper_1_2_chunk_test_i_idx on _hyper_1_2_chunk (actual rows=100000 loops=1) + Index Cond: (i < 600000) + Heap Fetches: 0 +(18 rows) + +ALTER TABLE :CHUNK1 RESET (parallel_workers); +ALTER TABLE :CHUNK2 RESET (parallel_workers); -- now() is not marked parallel safe in PostgreSQL < 12 so using now() -- in a query will prevent parallelism but CURRENT_TIMESTAMP and -- transaction_timestamp() are marked parallel safe diff --git a/test/expected/parallel-11.out b/test/expected/parallel-11.out index 0c031d195..0e8c6f001 100644 --- a/test/expected/parallel-11.out +++ b/test/expected/parallel-11.out @@ -11,6 +11,8 @@ SELECT END AS "PREFIX", 'EXPLAIN (costs off)' AS "PREFIX_NO_ANALYZE" \gset +\set CHUNK1 _timescaledb_internal._hyper_1_1_chunk +\set CHUNK2 _timescaledb_internal._hyper_1_2_chunk CREATE TABLE test (i int, j double precision, ts timestamp); SELECT create_hypertable('test','i',chunk_time_interval:=500000); NOTICE: adding not-null constraint to column "i" @@ -337,6 +339,60 @@ SELECT count(*) FROM "test" WHERE i >= 500000 AND length(version()) > 0; (1 row) RESET max_parallel_workers_per_gather; +-- test partial and non-partial plans +-- these will not be parallel on PG < 11 +ALTER TABLE :CHUNK1 SET (parallel_workers=0); +ALTER TABLE :CHUNK2 SET (parallel_workers=2); +:PREFIX SELECT count(*) FROM "test" WHERE i > 400000 AND length(version()) > 0; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------- + Finalize Aggregate (actual rows=1 loops=1) + -> Gather (actual rows=3 loops=1) + Workers Planned: 2 + Workers Launched: 2 + -> Partial Aggregate (actual rows=1 loops=3) + -> Result (actual rows=200000 loops=3) + One-Time Filter: (length(version()) > 0) + -> Parallel Custom Scan (ChunkAppend) on test (actual rows=200000 loops=3) + Chunks excluded during startup: 0 + -> Result (actual rows=99999 loops=1) + One-Time Filter: (length(version()) > 0) + -> Index Only Scan using _hyper_1_1_chunk_test_i_idx on _hyper_1_1_chunk (actual rows=99999 loops=1) + Index Cond: (i > 400000) + Heap Fetches: 99999 + -> Result (actual rows=250000 loops=2) + One-Time Filter: (length(version()) > 0) + -> Parallel Seq Scan on _hyper_1_2_chunk (actual rows=250000 loops=2) + Filter: (i > 400000) +(18 rows) + +ALTER TABLE :CHUNK1 SET (parallel_workers=2); +ALTER TABLE :CHUNK2 SET (parallel_workers=0); +:PREFIX SELECT count(*) FROM "test" WHERE i < 600000 AND length(version()) > 0; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------- + Finalize Aggregate (actual rows=1 loops=1) + -> Gather (actual rows=3 loops=1) + Workers Planned: 2 + Workers Launched: 2 + -> Partial Aggregate (actual rows=1 loops=3) + -> Result (actual rows=200000 loops=3) + One-Time Filter: (length(version()) > 0) + -> Parallel Custom Scan (ChunkAppend) on test (actual rows=200000 loops=3) + Chunks excluded during startup: 0 + -> Result (actual rows=100000 loops=1) + One-Time Filter: (length(version()) > 0) + -> Index Only Scan using _hyper_1_2_chunk_test_i_idx on _hyper_1_2_chunk (actual rows=100000 loops=1) + Index Cond: (i < 600000) + Heap Fetches: 100000 + -> Result (actual rows=250000 loops=2) + One-Time Filter: (length(version()) > 0) + -> Parallel Seq Scan on _hyper_1_1_chunk (actual rows=250000 loops=2) + Filter: (i < 600000) +(18 rows) + +ALTER TABLE :CHUNK1 RESET (parallel_workers); +ALTER TABLE :CHUNK2 RESET (parallel_workers); -- now() is not marked parallel safe in PostgreSQL < 12 so using now() -- in a query will prevent parallelism but CURRENT_TIMESTAMP and -- transaction_timestamp() are marked parallel safe diff --git a/test/expected/parallel-9.6.out b/test/expected/parallel-9.6.out index 749d9641f..5ef6b2c44 100644 --- a/test/expected/parallel-9.6.out +++ b/test/expected/parallel-9.6.out @@ -11,6 +11,8 @@ SELECT END AS "PREFIX", 'EXPLAIN (costs off)' AS "PREFIX_NO_ANALYZE" \gset +\set CHUNK1 _timescaledb_internal._hyper_1_1_chunk +\set CHUNK2 _timescaledb_internal._hyper_1_2_chunk CREATE TABLE test (i int, j double precision, ts timestamp); SELECT create_hypertable('test','i',chunk_time_interval:=500000); NOTICE: adding not-null constraint to column "i" @@ -332,6 +334,50 @@ SELECT count(*) FROM "test" WHERE i >= 500000 AND length(version()) > 0; (1 row) RESET max_parallel_workers_per_gather; +-- test partial and non-partial plans +-- these will not be parallel on PG < 11 +ALTER TABLE :CHUNK1 SET (parallel_workers=0); +ALTER TABLE :CHUNK2 SET (parallel_workers=2); +:PREFIX SELECT count(*) FROM "test" WHERE i > 400000 AND length(version()) > 0; + QUERY PLAN +----------------------------------------------------------------------------------------------- + Aggregate + -> Result + One-Time Filter: (length(version()) > 0) + -> Custom Scan (ChunkAppend) on test + Chunks excluded during startup: 0 + -> Result + One-Time Filter: (length(version()) > 0) + -> Index Only Scan using _hyper_1_1_chunk_test_i_idx on _hyper_1_1_chunk + Index Cond: (i > 400000) + -> Result + One-Time Filter: (length(version()) > 0) + -> Seq Scan on _hyper_1_2_chunk + Filter: (i > 400000) +(13 rows) + +ALTER TABLE :CHUNK1 SET (parallel_workers=2); +ALTER TABLE :CHUNK2 SET (parallel_workers=0); +:PREFIX SELECT count(*) FROM "test" WHERE i < 600000 AND length(version()) > 0; + QUERY PLAN +----------------------------------------------------------------------------------------------- + Aggregate + -> Result + One-Time Filter: (length(version()) > 0) + -> Custom Scan (ChunkAppend) on test + Chunks excluded during startup: 0 + -> Result + One-Time Filter: (length(version()) > 0) + -> Seq Scan on _hyper_1_1_chunk + Filter: (i < 600000) + -> Result + One-Time Filter: (length(version()) > 0) + -> Index Only Scan using _hyper_1_2_chunk_test_i_idx on _hyper_1_2_chunk + Index Cond: (i < 600000) +(13 rows) + +ALTER TABLE :CHUNK1 RESET (parallel_workers); +ALTER TABLE :CHUNK2 RESET (parallel_workers); -- now() is not marked parallel safe in PostgreSQL < 12 so using now() -- in a query will prevent parallelism but CURRENT_TIMESTAMP and -- transaction_timestamp() are marked parallel safe diff --git a/test/sql/parallel.sql.in b/test/sql/parallel.sql.in index 6f1fa3b59..ec857e606 100644 --- a/test/sql/parallel.sql.in +++ b/test/sql/parallel.sql.in @@ -14,6 +14,9 @@ SELECT 'EXPLAIN (costs off)' AS "PREFIX_NO_ANALYZE" \gset +\set CHUNK1 _timescaledb_internal._hyper_1_1_chunk +\set CHUNK2 _timescaledb_internal._hyper_1_2_chunk + CREATE TABLE test (i int, j double precision, ts timestamp); SELECT create_hypertable('test','i',chunk_time_interval:=500000); --has to be big enough to force at least 2 workers below. @@ -91,6 +94,19 @@ SELECT count(*) FROM "test" WHERE i >= 500000 AND length(version()) > 0; RESET max_parallel_workers_per_gather; +-- test partial and non-partial plans +-- these will not be parallel on PG < 11 +ALTER TABLE :CHUNK1 SET (parallel_workers=0); +ALTER TABLE :CHUNK2 SET (parallel_workers=2); +:PREFIX SELECT count(*) FROM "test" WHERE i > 400000 AND length(version()) > 0; + +ALTER TABLE :CHUNK1 SET (parallel_workers=2); +ALTER TABLE :CHUNK2 SET (parallel_workers=0); +:PREFIX SELECT count(*) FROM "test" WHERE i < 600000 AND length(version()) > 0; + +ALTER TABLE :CHUNK1 RESET (parallel_workers); +ALTER TABLE :CHUNK2 RESET (parallel_workers); + -- now() is not marked parallel safe in PostgreSQL < 12 so using now() -- in a query will prevent parallelism but CURRENT_TIMESTAMP and -- transaction_timestamp() are marked parallel safe