Adjust hypertable insert for PG14

PG14 refactors the INSERT path and removes the result relation from
executor state which means plan nodes don't have easy access to
the current result relation and can no longer modify it. This patch
changes the chunk tuple routing for PG14 to pull in code from
ModifyTable which unfortunately is static and adjust it to allow
chunk tuple routing.
This commit is contained in:
Sven Klemm 2021-10-04 14:39:55 +02:00 committed by Sven Klemm
parent 9e53cbb6d3
commit 26c5f27683
9 changed files with 1835 additions and 93 deletions

View File

@ -50,7 +50,7 @@ get_modifytable(const ChunkDispatch *dispatch)
bool bool
ts_chunk_dispatch_has_returning(const ChunkDispatch *dispatch) ts_chunk_dispatch_has_returning(const ChunkDispatch *dispatch)
{ {
if (NULL == dispatch->dispatch_state) if (!dispatch->dispatch_state)
return false; return false;
return get_modifytable(dispatch)->returningLists != NIL; return get_modifytable(dispatch)->returningLists != NIL;
} }
@ -76,7 +76,7 @@ ts_chunk_dispatch_get_arbiter_indexes(const ChunkDispatch *dispatch)
OnConflictAction OnConflictAction
ts_chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch) ts_chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch)
{ {
if (NULL == dispatch->dispatch_state) if (!dispatch->dispatch_state)
return ONCONFLICT_NONE; return ONCONFLICT_NONE;
return get_modifytable(dispatch)->onConflictAction; return get_modifytable(dispatch)->onConflictAction;
} }

View File

@ -90,7 +90,7 @@ chunk_dispatch_exec(CustomScanState *node)
point = ts_hyperspace_calculate_point(ht->space, slot); point = ts_hyperspace_calculate_point(ht->space, slot);
/* Save the main table's (hypertable's) ResultRelInfo */ /* Save the main table's (hypertable's) ResultRelInfo */
if (NULL == dispatch->hypertable_result_rel_info) if (!dispatch->hypertable_result_rel_info)
{ {
#if PG14_LT #if PG14_LT
Assert(RelationGetRelid(estate->es_result_relation_info->ri_RelationDesc) == Assert(RelationGetRelid(estate->es_result_relation_info->ri_RelationDesc) ==

View File

@ -79,21 +79,19 @@ create_chunk_rri_constraint_expr(ResultRelInfo *rri, Relation rel)
* The ResultRelInfo holds the executor state (e.g., open relation, indexes, and * The ResultRelInfo holds the executor state (e.g., open relation, indexes, and
* options) for the result relation where tuples will be stored. * options) for the result relation where tuples will be stored.
* *
* The first ResultRelInfo in the executor's array (corresponding to the main * The Hypertable ResultRelInfo is used as a template for the chunk's new ResultRelInfo.
* table's) is used as a template for the chunk's new ResultRelInfo.
*/ */
static inline ResultRelInfo * static inline ResultRelInfo *
create_chunk_result_relation_info(ChunkDispatch *dispatch, Relation rel) create_chunk_result_relation_info(ChunkDispatch *dispatch, Relation rel)
{ {
ResultRelInfo *rri, *rri_orig; ResultRelInfo *rri;
Index hyper_rti = dispatch->hypertable_result_rel_info->ri_RangeTableIndex; ResultRelInfo *rri_orig = dispatch->hypertable_result_rel_info;
rri = palloc0(sizeof(ResultRelInfo)); Index hyper_rti = rri_orig->ri_RangeTableIndex;
NodeSetTag(rri, T_ResultRelInfo); rri = makeNode(ResultRelInfo);
InitResultRelInfo(rri, rel, hyper_rti, NULL, dispatch->estate->es_instrument); InitResultRelInfo(rri, rel, hyper_rti, NULL, dispatch->estate->es_instrument);
/* Copy options from the main table's (hypertable's) result relation info */ /* Copy options from the main table's (hypertable's) result relation info */
rri_orig = dispatch->hypertable_result_rel_info;
rri->ri_WithCheckOptions = rri_orig->ri_WithCheckOptions; rri->ri_WithCheckOptions = rri_orig->ri_WithCheckOptions;
rri->ri_WithCheckOptionExprs = rri_orig->ri_WithCheckOptionExprs; rri->ri_WithCheckOptionExprs = rri_orig->ri_WithCheckOptionExprs;
#if PG14_LT #if PG14_LT
@ -102,7 +100,7 @@ create_chunk_result_relation_info(ChunkDispatch *dispatch, Relation rel)
rri->ri_projectReturning = rri_orig->ri_projectReturning; rri->ri_projectReturning = rri_orig->ri_projectReturning;
rri->ri_FdwState = NULL; rri->ri_FdwState = NULL;
rri->ri_usesFdwDirectModify = dispatch->hypertable_result_rel_info->ri_usesFdwDirectModify; rri->ri_usesFdwDirectModify = rri_orig->ri_usesFdwDirectModify;
if (RelationGetForm(rel)->relkind == RELKIND_FOREIGN_TABLE) if (RelationGetForm(rel)->relkind == RELKIND_FOREIGN_TABLE)
rri->ri_FdwRoutine = GetFdwRoutineForRelation(rel, true); rri->ri_FdwRoutine = GetFdwRoutineForRelation(rel, true);
@ -372,7 +370,7 @@ setup_on_conflict_state(ChunkInsertState *state, ChunkDispatch *dispatch,
state->conflproj_tupdesc = get_default_confl_tupdesc(state, dispatch); state->conflproj_tupdesc = get_default_confl_tupdesc(state, dispatch);
state->conflproj_slot = get_default_confl_slot(state, dispatch); state->conflproj_slot = get_default_confl_slot(state, dispatch);
if (NULL != map) if (map)
{ {
ExprContext *econtext = hyper_rri->ri_onConflict->oc_ProjInfo->pi_exprContext; ExprContext *econtext = hyper_rri->ri_onConflict->oc_ProjInfo->pi_exprContext;
Node *onconflict_where = ts_chunk_dispatch_get_on_conflict_where(dispatch); Node *onconflict_where = ts_chunk_dispatch_get_on_conflict_where(dispatch);
@ -380,7 +378,7 @@ setup_on_conflict_state(ChunkInsertState *state, ChunkDispatch *dispatch,
Assert(map->outdesc == RelationGetDescr(chunk_rel)); Assert(map->outdesc == RelationGetDescr(chunk_rel));
if (NULL == chunk_map) if (!chunk_map)
chunk_map = convert_tuples_by_name_compat(RelationGetDescr(chunk_rel), chunk_map = convert_tuples_by_name_compat(RelationGetDescr(chunk_rel),
RelationGetDescr(hyper_rel), RelationGetDescr(hyper_rel),
gettext_noop("could not convert row type")); gettext_noop("could not convert row type"));
@ -664,6 +662,10 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
RelationGetDescr(rel), RelationGetDescr(rel),
gettext_noop("could not convert row type")); gettext_noop("could not convert row type"));
// relinfo->ri_RootToPartitionMap = state->hyper_to_chunk_map;
// relinfo->ri_PartitionTupleSlot = table_slot_create(relinfo->ri_RelationDesc,
// &state->estate->es_tupleTable);
adjust_projections(state, dispatch, RelationGetForm(rel)->reltype); adjust_projections(state, dispatch, RelationGetForm(rel)->reltype);
if (has_compressed_chunk) if (has_compressed_chunk)
@ -701,7 +703,7 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
RangeTblEntry *rte = RangeTblEntry *rte =
rt_fetch(resrelinfo->ri_RangeTableIndex, dispatch->estate->es_range_table); rt_fetch(resrelinfo->ri_RangeTableIndex, dispatch->estate->es_range_table);
Assert(NULL != rte); Assert(rte != NULL);
state->user_id = OidIsValid(rte->checkAsUser) ? rte->checkAsUser : GetUserId(); state->user_id = OidIsValid(rte->checkAsUser) ? rte->checkAsUser : GetUserId();
state->chunk_data_nodes = ts_chunk_data_nodes_copy(chunk); state->chunk_data_nodes = ts_chunk_data_nodes_copy(chunk);
@ -715,8 +717,8 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
* to insert into. */ * to insert into. */
resrelinfo->ri_FdwState = state; resrelinfo->ri_FdwState = state;
} }
else if (NULL != resrelinfo->ri_FdwRoutine && !resrelinfo->ri_usesFdwDirectModify && else if (resrelinfo->ri_FdwRoutine && !resrelinfo->ri_usesFdwDirectModify &&
NULL != resrelinfo->ri_FdwRoutine->BeginForeignModify) resrelinfo->ri_FdwRoutine->BeginForeignModify)
{ {
/* /*
* If this is a chunk located one or more data nodes, setup the * If this is a chunk located one or more data nodes, setup the
@ -757,8 +759,7 @@ ts_chunk_insert_state_destroy(ChunkInsertState *state)
{ {
ResultRelInfo *rri = state->result_relation_info; ResultRelInfo *rri = state->result_relation_info;
if (NULL != rri->ri_FdwRoutine && !rri->ri_usesFdwDirectModify && if (rri->ri_FdwRoutine && !rri->ri_usesFdwDirectModify && rri->ri_FdwRoutine->EndForeignModify)
NULL != rri->ri_FdwRoutine->EndForeignModify)
rri->ri_FdwRoutine->EndForeignModify(state->estate, rri); rri->ri_FdwRoutine->EndForeignModify(state->estate, rri);
destroy_on_conflict_state(state); destroy_on_conflict_state(state);
@ -789,7 +790,7 @@ ts_chunk_insert_state_destroy(ChunkInsertState *state)
} }
table_close(state->rel, NoLock); table_close(state->rel, NoLock);
if (NULL != state->slot) if (state->slot)
ExecDropSingleTupleTableSlot(state->slot); ExecDropSingleTupleTableSlot(state->slot);
/* /*
@ -844,7 +845,7 @@ ts_chunk_insert_state_destroy(ChunkInsertState *state)
* of the CIS (if not already deleted), then the per_tuple context, followed * of the CIS (if not already deleted), then the per_tuple context, followed
* by the CIS again (via the callback), and thus a crash. * by the CIS again (via the callback), and thus a crash.
*/ */
if (state->estate->es_per_tuple_exprcontext != NULL) if (state->estate->es_per_tuple_exprcontext)
MemoryContextSetParent(state->mctx, MemoryContextSetParent(state->mctx,
state->estate->es_per_tuple_exprcontext->ecxt_per_tuple_memory); state->estate->es_per_tuple_exprcontext->ecxt_per_tuple_memory);
else else

File diff suppressed because it is too large Load Diff

View File

@ -316,7 +316,7 @@ SELECT 1 \g | grep -v "Planning" | grep -v "Execution"
------------------------------------------------------------------------- -------------------------------------------------------------------------
Result (actual rows=1 loops=1) Result (actual rows=1 loops=1)
CTE insert_cte CTE insert_cte
-> Custom Scan (HypertableInsert) (never executed) -> Custom Scan (HypertableInsert) (actual rows=0 loops=1)
-> Insert on one_space_test (actual rows=0 loops=1) -> Insert on one_space_test (actual rows=0 loops=1)
-> Custom Scan (ChunkDispatch) (actual rows=1 loops=1) -> Custom Scan (ChunkDispatch) (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1) -> Result (actual rows=1 loops=1)

View File

@ -316,7 +316,7 @@ SELECT 1 \g | grep -v "Planning" | grep -v "Execution"
------------------------------------------------------------------------- -------------------------------------------------------------------------
Result (actual rows=1 loops=1) Result (actual rows=1 loops=1)
CTE insert_cte CTE insert_cte
-> Custom Scan (HypertableInsert) (never executed) -> Custom Scan (HypertableInsert) (actual rows=0 loops=1)
-> Insert on one_space_test (actual rows=0 loops=1) -> Insert on one_space_test (actual rows=0 loops=1)
-> Custom Scan (ChunkDispatch) (actual rows=1 loops=1) -> Custom Scan (ChunkDispatch) (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1) -> Result (actual rows=1 loops=1)

View File

@ -316,8 +316,8 @@ SELECT 1 \g | grep -v "Planning" | grep -v "Execution"
------------------------------------------------------------------------- -------------------------------------------------------------------------
Result (actual rows=1 loops=1) Result (actual rows=1 loops=1)
CTE insert_cte CTE insert_cte
-> Custom Scan (HypertableInsert) (never executed) -> Custom Scan (HypertableInsert) (actual rows=0 loops=1)
-> Insert on one_space_test (actual rows=0 loops=1) -> Insert on one_space_test (never executed)
-> Custom Scan (ChunkDispatch) (actual rows=1 loops=1) -> Custom Scan (ChunkDispatch) (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1) -> Result (actual rows=1 loops=1)
(8 rows) (8 rows)

View File

@ -83,72 +83,31 @@ SET series_1 = 8
WHERE series_1 IN (SELECT series_1 FROM "one_Partition" WHERE series_1 > series_val()); WHERE series_1 IN (SELECT series_1 FROM "one_Partition" WHERE series_1 > series_val());
QUERY PLAN QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------- -----------------------------------------------------------------------------------------------------------------------------------------------
Update on "one_Partition" "one_Partition_1" Update on "one_Partition"
Update on "one_Partition" "one_Partition_1" Update on "one_Partition" "one_Partition_2"
Update on _hyper_1_1_chunk "one_Partition_2" Update on _hyper_1_1_chunk "one_Partition_3"
Update on _hyper_1_2_chunk "one_Partition_3" Update on _hyper_1_2_chunk "one_Partition_4"
Update on _hyper_1_3_chunk "one_Partition_4" Update on _hyper_1_3_chunk "one_Partition_5"
-> Hash Join -> Hash Join
Hash Cond: ("one_Partition".series_1 = "one_Partition_1".series_1) Hash Cond: ("one_Partition".series_1 = "one_Partition_1".series_1)
-> HashAggregate -> Append
Group Key: "one_Partition".series_1 -> Seq Scan on "one_Partition" "one_Partition_2"
-> Append -> Seq Scan on _hyper_1_1_chunk "one_Partition_3"
-> Seq Scan on "one_Partition" "one_Partition_5" -> Seq Scan on _hyper_1_2_chunk "one_Partition_4"
Filter: (series_1 > (series_val())::double precision) -> Seq Scan on _hyper_1_3_chunk "one_Partition_5"
-> Index Scan using "_hyper_1_1_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_1_chunk "one_Partition_6"
Index Cond: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_2_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_2_chunk "one_Partition_7"
Index Cond: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_3_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_3_chunk "one_Partition_8"
Index Cond: (series_1 > (series_val())::double precision)
-> Hash
-> Seq Scan on "one_Partition" "one_Partition_1"
-> Hash Join
Hash Cond: ("one_Partition_2".series_1 = "one_Partition".series_1)
-> Seq Scan on _hyper_1_1_chunk "one_Partition_2"
-> Hash -> Hash
-> HashAggregate -> HashAggregate
Group Key: "one_Partition".series_1 Group Key: "one_Partition_1".series_1
-> Append -> Append
-> Seq Scan on "one_Partition" "one_Partition_5" -> Seq Scan on "one_Partition" "one_Partition_6"
Filter: (series_1 > (series_val())::double precision) Filter: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_1_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_1_chunk "one_Partition_6" -> Index Scan using "_hyper_1_1_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_1_chunk "one_Partition_7"
Index Cond: (series_1 > (series_val())::double precision) Index Cond: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_2_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_2_chunk "one_Partition_7" -> Index Scan using "_hyper_1_2_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_2_chunk "one_Partition_8"
Index Cond: (series_1 > (series_val())::double precision) Index Cond: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_3_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_3_chunk "one_Partition_8" -> Index Scan using "_hyper_1_3_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_3_chunk "one_Partition_9"
Index Cond: (series_1 > (series_val())::double precision) Index Cond: (series_1 > (series_val())::double precision)
-> Hash Join (24 rows)
Hash Cond: ("one_Partition_3".series_1 = "one_Partition".series_1)
-> Seq Scan on _hyper_1_2_chunk "one_Partition_3"
-> Hash
-> HashAggregate
Group Key: "one_Partition".series_1
-> Append
-> Seq Scan on "one_Partition" "one_Partition_5"
Filter: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_1_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_1_chunk "one_Partition_6"
Index Cond: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_2_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_2_chunk "one_Partition_7"
Index Cond: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_3_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_3_chunk "one_Partition_8"
Index Cond: (series_1 > (series_val())::double precision)
-> Hash Join
Hash Cond: ("one_Partition_4".series_1 = "one_Partition".series_1)
-> Seq Scan on _hyper_1_3_chunk "one_Partition_4"
-> Hash
-> HashAggregate
Group Key: "one_Partition".series_1
-> Append
-> Seq Scan on "one_Partition" "one_Partition_5"
Filter: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_1_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_1_chunk "one_Partition_6"
Index Cond: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_2_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_2_chunk "one_Partition_7"
Index Cond: (series_1 > (series_val())::double precision)
-> Index Scan using "_hyper_1_3_chunk_one_Partition_timeCustom_series_1_idx" on _hyper_1_3_chunk "one_Partition_8"
Index Cond: (series_1 > (series_val())::double precision)
(65 rows)
SELECT * FROM "one_Partition" ORDER BY "timeCustom", device_id, series_0, series_1, series_2; SELECT * FROM "one_Partition" ORDER BY "timeCustom", device_id, series_0, series_1, series_2;
timeCustom | device_id | series_0 | series_1 | series_2 | series_bool timeCustom | device_id | series_0 | series_1 | series_2 | series_bool

View File

@ -145,7 +145,7 @@ UPDATE jit_test_interval SET temp = temp * 2.3 WHERE id >= 23 and id < 73;
------------------------------------------------------------------------------------ ------------------------------------------------------------------------------------
Update on public.jit_test_interval Update on public.jit_test_interval
-> Index Scan using jit_test_interval_id_idx on public.jit_test_interval -> Index Scan using jit_test_interval_id_idx on public.jit_test_interval
Output: id, (temp * '2.3'::double precision), ctid Output: (temp * '2.3'::double precision), ctid
Index Cond: ((jit_test_interval.id >= 23) AND (jit_test_interval.id < 73)) Index Cond: ((jit_test_interval.id >= 23) AND (jit_test_interval.id < 73))
(4 rows) (4 rows)