Add support for non-partial paths to parallel ChunkAppend

PostgreSQL will produce plans where a parallel append has children
that are not partial paths. This patch adds support for those plans
to ChunkAppend.
This commit is contained in:
Sven Klemm 2019-10-08 11:12:35 +02:00 committed by Sven Klemm
parent fc9a3e8bc6
commit 36bff37daf
10 changed files with 213 additions and 14 deletions

View File

@ -130,8 +130,16 @@ ts_chunk_append_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht,
switch (nodeTag(subpath))
{
case T_AppendPath:
children = castNode(AppendPath, subpath)->subpaths;
{
AppendPath *append = castNode(AppendPath, subpath);
#if PG11_GE
if (append->path.parallel_aware && append->first_partial_path > 0)
path->first_partial_path = append->first_partial_path;
#endif
children = append->subpaths;
break;
}
case T_MergeAppendPath:
/*
* check if ordered append is applicable, only assert ordered here

View File

@ -19,6 +19,7 @@ typedef struct ChunkAppendPath
bool runtime_exclusion;
bool pushdown_limit;
int limit_tuples;
int first_partial_path;
} ChunkAppendPath;
extern Path *ts_chunk_append_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht,

View File

@ -80,6 +80,7 @@ Node *
chunk_append_state_create(CustomScan *cscan)
{
ChunkAppendState *state;
List *settings = linitial(cscan->custom_private);
state = (ChunkAppendState *) newNode(sizeof(ChunkAppendState), T_CustomScanState);
@ -89,12 +90,14 @@ chunk_append_state_create(CustomScan *cscan)
state->initial_ri_clauses = lsecond(cscan->custom_private);
state->sort_options = lfourth(cscan->custom_private);
state->startup_exclusion = (bool) linitial_oid(linitial(cscan->custom_private));
state->runtime_exclusion = (bool) lsecond_oid(linitial(cscan->custom_private));
state->limit = lthird_oid(linitial(cscan->custom_private));
state->startup_exclusion = (bool) linitial_int(settings);
state->runtime_exclusion = (bool) lsecond_int(settings);
state->limit = lthird_int(settings);
state->first_partial_plan = lfourth_int(settings);
state->filtered_subplans = state->initial_subplans;
state->filtered_ri_clauses = state->initial_ri_clauses;
state->filtered_first_partial_plan = state->first_partial_plan;
state->current = INVALID_SUBPLAN_INDEX;
state->choose_next_subplan = choose_next_subplan_non_parallel;
@ -115,6 +118,8 @@ do_startup_exclusion(ChunkAppendState *state)
ListCell *lc_plan;
ListCell *lc_clauses;
ListCell *lc_constraints;
int i = -1;
int filtered_first_partial_plan = state->first_partial_plan;
/*
* create skeleton plannerinfo for estimate_expression_value
@ -144,6 +149,8 @@ do_startup_exclusion(ChunkAppendState *state)
ListCell *lc;
Scan *scan = chunk_append_get_scan_plan(lfirst(lc_plan));
i++;
/*
* If this is a base rel (chunk), check if it can be
* excluded from the scan. Otherwise, fall through.
@ -159,7 +166,12 @@ do_startup_exclusion(ChunkAppendState *state)
restrictinfos = constify_restrictinfos(&root, restrictinfos);
if (can_exclude_chunk(lfirst(lc_constraints), restrictinfos))
{
if (i < state->first_partial_plan)
filtered_first_partial_plan--;
continue;
}
/*
* if this node does runtime exclusion we keep the constified
@ -185,6 +197,7 @@ do_startup_exclusion(ChunkAppendState *state)
state->filtered_subplans = filtered_children;
state->filtered_ri_clauses = filtered_ri_clauses;
state->filtered_constraints = filtered_constraints;
state->filtered_first_partial_plan = filtered_first_partial_plan;
}
/*
@ -489,6 +502,13 @@ choose_next_subplan_for_worker(ChunkAppendState *state)
Assert(next_plan >= 0 && next_plan < state->num_subplans);
state->current = next_plan;
/*
* if this is not a partial plan we mark it as finished
* immediately so it does not get assigned another worker
*/
if (next_plan < state->filtered_first_partial_plan)
pstate->finished[next_plan] = true;
/* advance next_plan for next worker */
pstate->next_plan = get_next_subplan(state, state->current);
/*

View File

@ -25,6 +25,8 @@ typedef struct ChunkAppendState
MemoryContext exclusion_ctx;
int num_subplans;
int first_partial_plan;
int filtered_first_partial_plan;
int current;
Oid ht_reloid;

View File

@ -261,8 +261,10 @@ chunk_append_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *path, L
if (capath->pushdown_limit && capath->limit_tuples > 0)
limit = capath->limit_tuples;
custom_private = list_make1(
list_make3_oid((Oid) capath->startup_exclusion, (Oid) capath->runtime_exclusion, limit));
custom_private = list_make1(list_make4_int(capath->startup_exclusion,
capath->runtime_exclusion,
limit,
capath->first_partial_path));
custom_private = lappend(custom_private, chunk_ri_clauses);
custom_private = lappend(custom_private, chunk_rt_indexes);
custom_private = lappend(custom_private, sort_options);

View File

@ -216,14 +216,6 @@ should_chunk_append(PlannerInfo *root, RelOptInfo *rel, Path *path, bool ordered
{
ListCell *lc;
#if PG11_GE
AppendPath *append = castNode(AppendPath, path);
/* ChunkAppend doesnt support mixing partial and non-partial paths */
if (append->path.parallel_aware && append->first_partial_path > 0)
return false;
#endif
foreach (lc, rel->baserestrictinfo)
{
RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);

View File

@ -11,6 +11,8 @@ SELECT
END AS "PREFIX",
'EXPLAIN (costs off)' AS "PREFIX_NO_ANALYZE"
\gset
\set CHUNK1 _timescaledb_internal._hyper_1_1_chunk
\set CHUNK2 _timescaledb_internal._hyper_1_2_chunk
CREATE TABLE test (i int, j double precision, ts timestamp);
SELECT create_hypertable('test','i',chunk_time_interval:=500000);
NOTICE: adding not-null constraint to column "i"
@ -339,6 +341,60 @@ SELECT count(*) FROM "test" WHERE i >= 500000 AND length(version()) > 0;
(1 row)
RESET max_parallel_workers_per_gather;
-- test partial and non-partial plans
-- these will not be parallel on PG < 11
ALTER TABLE :CHUNK1 SET (parallel_workers=0);
ALTER TABLE :CHUNK2 SET (parallel_workers=2);
:PREFIX SELECT count(*) FROM "test" WHERE i > 400000 AND length(version()) > 0;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------
Gather (actual rows=1 loops=1)
Workers Planned: 1
Workers Launched: 1
Single Copy: true
-> Aggregate (actual rows=1 loops=1)
-> Result (actual rows=599999 loops=1)
One-Time Filter: (length(version()) > 0)
-> Custom Scan (ChunkAppend) on test (actual rows=599999 loops=1)
Chunks excluded during startup: 0
-> Result (actual rows=99999 loops=1)
One-Time Filter: (length(version()) > 0)
-> Index Only Scan using _hyper_1_1_chunk_test_i_idx on _hyper_1_1_chunk (actual rows=99999 loops=1)
Index Cond: (i > 400000)
Heap Fetches: 0
-> Result (actual rows=500000 loops=1)
One-Time Filter: (length(version()) > 0)
-> Seq Scan on _hyper_1_2_chunk (actual rows=500000 loops=1)
Filter: (i > 400000)
(18 rows)
ALTER TABLE :CHUNK1 SET (parallel_workers=2);
ALTER TABLE :CHUNK2 SET (parallel_workers=0);
:PREFIX SELECT count(*) FROM "test" WHERE i < 600000 AND length(version()) > 0;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------
Gather (actual rows=1 loops=1)
Workers Planned: 1
Workers Launched: 1
Single Copy: true
-> Aggregate (actual rows=1 loops=1)
-> Result (actual rows=600000 loops=1)
One-Time Filter: (length(version()) > 0)
-> Custom Scan (ChunkAppend) on test (actual rows=600000 loops=1)
Chunks excluded during startup: 0
-> Result (actual rows=500000 loops=1)
One-Time Filter: (length(version()) > 0)
-> Seq Scan on _hyper_1_1_chunk (actual rows=500000 loops=1)
Filter: (i < 600000)
-> Result (actual rows=100000 loops=1)
One-Time Filter: (length(version()) > 0)
-> Index Only Scan using _hyper_1_2_chunk_test_i_idx on _hyper_1_2_chunk (actual rows=100000 loops=1)
Index Cond: (i < 600000)
Heap Fetches: 0
(18 rows)
ALTER TABLE :CHUNK1 RESET (parallel_workers);
ALTER TABLE :CHUNK2 RESET (parallel_workers);
-- now() is not marked parallel safe in PostgreSQL < 12 so using now()
-- in a query will prevent parallelism but CURRENT_TIMESTAMP and
-- transaction_timestamp() are marked parallel safe

View File

@ -11,6 +11,8 @@ SELECT
END AS "PREFIX",
'EXPLAIN (costs off)' AS "PREFIX_NO_ANALYZE"
\gset
\set CHUNK1 _timescaledb_internal._hyper_1_1_chunk
\set CHUNK2 _timescaledb_internal._hyper_1_2_chunk
CREATE TABLE test (i int, j double precision, ts timestamp);
SELECT create_hypertable('test','i',chunk_time_interval:=500000);
NOTICE: adding not-null constraint to column "i"
@ -337,6 +339,60 @@ SELECT count(*) FROM "test" WHERE i >= 500000 AND length(version()) > 0;
(1 row)
RESET max_parallel_workers_per_gather;
-- test partial and non-partial plans
-- these will not be parallel on PG < 11
ALTER TABLE :CHUNK1 SET (parallel_workers=0);
ALTER TABLE :CHUNK2 SET (parallel_workers=2);
:PREFIX SELECT count(*) FROM "test" WHERE i > 400000 AND length(version()) > 0;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual rows=1 loops=1)
-> Gather (actual rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (actual rows=1 loops=3)
-> Result (actual rows=200000 loops=3)
One-Time Filter: (length(version()) > 0)
-> Parallel Custom Scan (ChunkAppend) on test (actual rows=200000 loops=3)
Chunks excluded during startup: 0
-> Result (actual rows=99999 loops=1)
One-Time Filter: (length(version()) > 0)
-> Index Only Scan using _hyper_1_1_chunk_test_i_idx on _hyper_1_1_chunk (actual rows=99999 loops=1)
Index Cond: (i > 400000)
Heap Fetches: 99999
-> Result (actual rows=250000 loops=2)
One-Time Filter: (length(version()) > 0)
-> Parallel Seq Scan on _hyper_1_2_chunk (actual rows=250000 loops=2)
Filter: (i > 400000)
(18 rows)
ALTER TABLE :CHUNK1 SET (parallel_workers=2);
ALTER TABLE :CHUNK2 SET (parallel_workers=0);
:PREFIX SELECT count(*) FROM "test" WHERE i < 600000 AND length(version()) > 0;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual rows=1 loops=1)
-> Gather (actual rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (actual rows=1 loops=3)
-> Result (actual rows=200000 loops=3)
One-Time Filter: (length(version()) > 0)
-> Parallel Custom Scan (ChunkAppend) on test (actual rows=200000 loops=3)
Chunks excluded during startup: 0
-> Result (actual rows=100000 loops=1)
One-Time Filter: (length(version()) > 0)
-> Index Only Scan using _hyper_1_2_chunk_test_i_idx on _hyper_1_2_chunk (actual rows=100000 loops=1)
Index Cond: (i < 600000)
Heap Fetches: 100000
-> Result (actual rows=250000 loops=2)
One-Time Filter: (length(version()) > 0)
-> Parallel Seq Scan on _hyper_1_1_chunk (actual rows=250000 loops=2)
Filter: (i < 600000)
(18 rows)
ALTER TABLE :CHUNK1 RESET (parallel_workers);
ALTER TABLE :CHUNK2 RESET (parallel_workers);
-- now() is not marked parallel safe in PostgreSQL < 12 so using now()
-- in a query will prevent parallelism but CURRENT_TIMESTAMP and
-- transaction_timestamp() are marked parallel safe

View File

@ -11,6 +11,8 @@ SELECT
END AS "PREFIX",
'EXPLAIN (costs off)' AS "PREFIX_NO_ANALYZE"
\gset
\set CHUNK1 _timescaledb_internal._hyper_1_1_chunk
\set CHUNK2 _timescaledb_internal._hyper_1_2_chunk
CREATE TABLE test (i int, j double precision, ts timestamp);
SELECT create_hypertable('test','i',chunk_time_interval:=500000);
NOTICE: adding not-null constraint to column "i"
@ -332,6 +334,50 @@ SELECT count(*) FROM "test" WHERE i >= 500000 AND length(version()) > 0;
(1 row)
RESET max_parallel_workers_per_gather;
-- test partial and non-partial plans
-- these will not be parallel on PG < 11
ALTER TABLE :CHUNK1 SET (parallel_workers=0);
ALTER TABLE :CHUNK2 SET (parallel_workers=2);
:PREFIX SELECT count(*) FROM "test" WHERE i > 400000 AND length(version()) > 0;
QUERY PLAN
-----------------------------------------------------------------------------------------------
Aggregate
-> Result
One-Time Filter: (length(version()) > 0)
-> Custom Scan (ChunkAppend) on test
Chunks excluded during startup: 0
-> Result
One-Time Filter: (length(version()) > 0)
-> Index Only Scan using _hyper_1_1_chunk_test_i_idx on _hyper_1_1_chunk
Index Cond: (i > 400000)
-> Result
One-Time Filter: (length(version()) > 0)
-> Seq Scan on _hyper_1_2_chunk
Filter: (i > 400000)
(13 rows)
ALTER TABLE :CHUNK1 SET (parallel_workers=2);
ALTER TABLE :CHUNK2 SET (parallel_workers=0);
:PREFIX SELECT count(*) FROM "test" WHERE i < 600000 AND length(version()) > 0;
QUERY PLAN
-----------------------------------------------------------------------------------------------
Aggregate
-> Result
One-Time Filter: (length(version()) > 0)
-> Custom Scan (ChunkAppend) on test
Chunks excluded during startup: 0
-> Result
One-Time Filter: (length(version()) > 0)
-> Seq Scan on _hyper_1_1_chunk
Filter: (i < 600000)
-> Result
One-Time Filter: (length(version()) > 0)
-> Index Only Scan using _hyper_1_2_chunk_test_i_idx on _hyper_1_2_chunk
Index Cond: (i < 600000)
(13 rows)
ALTER TABLE :CHUNK1 RESET (parallel_workers);
ALTER TABLE :CHUNK2 RESET (parallel_workers);
-- now() is not marked parallel safe in PostgreSQL < 12 so using now()
-- in a query will prevent parallelism but CURRENT_TIMESTAMP and
-- transaction_timestamp() are marked parallel safe

View File

@ -14,6 +14,9 @@ SELECT
'EXPLAIN (costs off)' AS "PREFIX_NO_ANALYZE"
\gset
\set CHUNK1 _timescaledb_internal._hyper_1_1_chunk
\set CHUNK2 _timescaledb_internal._hyper_1_2_chunk
CREATE TABLE test (i int, j double precision, ts timestamp);
SELECT create_hypertable('test','i',chunk_time_interval:=500000);
--has to be big enough to force at least 2 workers below.
@ -91,6 +94,19 @@ SELECT count(*) FROM "test" WHERE i >= 500000 AND length(version()) > 0;
RESET max_parallel_workers_per_gather;
-- test partial and non-partial plans
-- these will not be parallel on PG < 11
ALTER TABLE :CHUNK1 SET (parallel_workers=0);
ALTER TABLE :CHUNK2 SET (parallel_workers=2);
:PREFIX SELECT count(*) FROM "test" WHERE i > 400000 AND length(version()) > 0;
ALTER TABLE :CHUNK1 SET (parallel_workers=2);
ALTER TABLE :CHUNK2 SET (parallel_workers=0);
:PREFIX SELECT count(*) FROM "test" WHERE i < 600000 AND length(version()) > 0;
ALTER TABLE :CHUNK1 RESET (parallel_workers);
ALTER TABLE :CHUNK2 RESET (parallel_workers);
-- now() is not marked parallel safe in PostgreSQL < 12 so using now()
-- in a query will prevent parallelism but CURRENT_TIMESTAMP and
-- transaction_timestamp() are marked parallel safe