mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-17 19:13:16 +08:00
Fix UPSERT for PG14
Adjust code to upstream UPDATE/DELETE changes. PG14 refactored the way UPDATEs and DELETEs are planned and executed. https://github.com/postgres/postgres/commit/86dc90056d
This commit is contained in:
parent
bfe3603d57
commit
8d12b28a00
@ -113,7 +113,7 @@ pg14_debug_latest = {
|
||||
"pg": PG14_LATEST,
|
||||
"tsdb_build_args": "-DWARNINGS_AS_ERRORS=OFF -DEXPERIMENTAL=ON",
|
||||
"coverage": False,
|
||||
"installcheck_args": "IGNORES='dist_hypertable-14 dist_partial_agg dist_query dist_triggers insert-14 upsert'"
|
||||
"installcheck_args": "IGNORES='dist_hypertable-14 dist_partial_agg dist_query'"
|
||||
}
|
||||
m["include"].append(build_debug_config(pg14_debug_latest))
|
||||
|
||||
|
@ -87,12 +87,6 @@ ts_chunk_dispatch_get_on_conflict_set(const ChunkDispatch *dispatch)
|
||||
return get_modifytable(dispatch)->onConflictSet;
|
||||
}
|
||||
|
||||
Node *
|
||||
ts_chunk_dispatch_get_on_conflict_where(const ChunkDispatch *dispatch)
|
||||
{
|
||||
return get_modifytable(dispatch)->onConflictWhere;
|
||||
}
|
||||
|
||||
CmdType
|
||||
ts_chunk_dispatch_get_cmd_type(const ChunkDispatch *dispatch)
|
||||
{
|
||||
|
@ -54,7 +54,6 @@ extern List *ts_chunk_dispatch_get_returning_clauses(const ChunkDispatch *dispat
|
||||
extern List *ts_chunk_dispatch_get_arbiter_indexes(const ChunkDispatch *dispatch);
|
||||
extern OnConflictAction ts_chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch);
|
||||
extern List *ts_chunk_dispatch_get_on_conflict_set(const ChunkDispatch *dispatch);
|
||||
extern Node *ts_chunk_dispatch_get_on_conflict_where(const ChunkDispatch *dispatch);
|
||||
extern CmdType ts_chunk_dispatch_get_cmd_type(const ChunkDispatch *dispatch);
|
||||
|
||||
#endif /* TIMESCALEDB_CHUNK_DISPATCH_H */
|
||||
|
@ -198,6 +198,7 @@ translate_clause(List *inclause, TupleConversionMap *chunk_map, Index varno, Rel
|
||||
return clause;
|
||||
}
|
||||
|
||||
#if PG14_LT
|
||||
/*
|
||||
* adjust_hypertable_tlist - from Postgres source code `adjust_partition_tlist`
|
||||
* Adjust the targetlist entries for a given chunk to account for
|
||||
@ -267,6 +268,40 @@ adjust_hypertable_tlist(List *tlist, TupleConversionMap *map)
|
||||
}
|
||||
return new_tlist;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if PG14_GE
|
||||
/*
|
||||
* adjust_chunk_colnos
|
||||
* Adjust the list of UPDATE target column numbers to account for
|
||||
* attribute differences between the parent and the partition.
|
||||
*
|
||||
* adapted from postgres adjust_partition_colnos
|
||||
*/
|
||||
static List *
|
||||
adjust_chunk_colnos(List *colnos, ResultRelInfo *chunk_rri)
|
||||
{
|
||||
List *new_colnos = NIL;
|
||||
TupleConversionMap *map = ExecGetChildToRootMap(chunk_rri);
|
||||
AttrMap *attrMap;
|
||||
ListCell *lc;
|
||||
|
||||
Assert(map != NULL); /* else we shouldn't be here */
|
||||
attrMap = map->attrMap;
|
||||
|
||||
foreach (lc, colnos)
|
||||
{
|
||||
AttrNumber parentattrno = lfirst_int(lc);
|
||||
|
||||
if (parentattrno <= 0 || parentattrno > attrMap->maplen ||
|
||||
attrMap->attnums[parentattrno - 1] == 0)
|
||||
elog(ERROR, "unexpected attno %d in target column list", parentattrno);
|
||||
new_colnos = lappend_int(new_colnos, attrMap->attnums[parentattrno - 1]);
|
||||
}
|
||||
|
||||
return new_colnos;
|
||||
}
|
||||
#endif
|
||||
|
||||
static inline ResultRelInfo *
|
||||
get_chunk_rri(ChunkInsertState *state)
|
||||
@ -280,70 +315,6 @@ get_hyper_rri(ChunkDispatch *dispatch)
|
||||
return dispatch->hypertable_result_rel_info;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create the ON CONFLICT state for a chunk.
|
||||
*
|
||||
* The hypertable root is used as a template. A shallow copy can be made,
|
||||
* e.g., if tuple descriptors match exactly.
|
||||
*/
|
||||
static void
|
||||
init_basic_on_conflict_state(ResultRelInfo *hyper_rri, ResultRelInfo *chunk_rri)
|
||||
{
|
||||
OnConflictSetState *onconfl = makeNode(OnConflictSetState);
|
||||
|
||||
/* If no tuple conversion between the chunk and root hyper relation is
|
||||
* needed, we can get away with a (mostly) shallow copy */
|
||||
memcpy(onconfl, hyper_rri->ri_onConflict, sizeof(OnConflictSetState));
|
||||
|
||||
chunk_rri->ri_onConflict = onconfl;
|
||||
}
|
||||
|
||||
static ExprState *
|
||||
create_on_conflict_where_qual(List *clause)
|
||||
{
|
||||
return ExecInitQual(clause, NULL);
|
||||
}
|
||||
|
||||
static TupleDesc
|
||||
get_default_confl_tupdesc(ChunkInsertState *state, ChunkDispatch *dispatch)
|
||||
{
|
||||
return get_hyper_rri(dispatch)->ri_onConflict->oc_ProjSlot->tts_tupleDescriptor;
|
||||
}
|
||||
|
||||
static TupleTableSlot *
|
||||
get_default_confl_slot(ChunkInsertState *state, ChunkDispatch *dispatch)
|
||||
{
|
||||
return get_hyper_rri(dispatch)->ri_onConflict->oc_ProjSlot;
|
||||
}
|
||||
|
||||
static TupleTableSlot *
|
||||
get_confl_slot(ChunkInsertState *state, ChunkDispatch *dispatch, TupleDesc projtupdesc)
|
||||
{
|
||||
ResultRelInfo *chunk_rri = get_chunk_rri(state);
|
||||
|
||||
/* PG12 has a per-relation projection slot for ON CONFLICT. Usually,
|
||||
* these slots are tied to the executor's tuple table
|
||||
* (estate->es_tupleTable), which tracks all slots and cleans them up
|
||||
* at the end of exection. This doesn't work well in our case, since
|
||||
* chunk insert states do not necessarily live to the end of execution
|
||||
* (in order to keep memory usage down when inserting into lots of
|
||||
* chunks). Therefore, we do NOT tie these slots to the executor
|
||||
* state, and instead manage their lifecycles ourselves. */
|
||||
chunk_rri->ri_onConflict->oc_ProjSlot = MakeSingleTupleTableSlot(projtupdesc, &TTSOpsVirtual);
|
||||
|
||||
return chunk_rri->ri_onConflict->oc_ProjSlot;
|
||||
}
|
||||
|
||||
static TupleTableSlot *
|
||||
get_default_existing_slot(ChunkInsertState *state, ChunkDispatch *dispatch)
|
||||
{
|
||||
ResultRelInfo *chunk_rri = get_chunk_rri(state);
|
||||
|
||||
chunk_rri->ri_onConflict->oc_Existing = table_slot_create(state->rel, NULL);
|
||||
|
||||
return chunk_rri->ri_onConflict->oc_Existing;
|
||||
}
|
||||
|
||||
/*
|
||||
* Setup ON CONFLICT state for a chunk.
|
||||
*
|
||||
@ -356,25 +327,70 @@ setup_on_conflict_state(ChunkInsertState *state, ChunkDispatch *dispatch,
|
||||
TupleConversionMap *chunk_map)
|
||||
{
|
||||
TupleConversionMap *map = state->hyper_to_chunk_map;
|
||||
ResultRelInfo *chunk_rri = get_chunk_rri(state);
|
||||
ResultRelInfo *hyper_rri = get_hyper_rri(dispatch);
|
||||
ResultRelInfo *chunk_rri = state->result_relation_info;
|
||||
ResultRelInfo *hyper_rri = dispatch->hypertable_result_rel_info;
|
||||
Relation chunk_rel = state->result_relation_info->ri_RelationDesc;
|
||||
Relation hyper_rel = dispatch->hypertable_result_rel_info->ri_RelationDesc;
|
||||
Relation hyper_rel = hyper_rri->ri_RelationDesc;
|
||||
ModifyTableState *mtstate = castNode(ModifyTableState, dispatch->dispatch_state->mtstate);
|
||||
ModifyTable *mt = castNode(ModifyTable, mtstate->ps.plan);
|
||||
|
||||
Assert(ts_chunk_dispatch_get_on_conflict_action(dispatch) == ONCONFLICT_UPDATE);
|
||||
init_basic_on_conflict_state(hyper_rri, chunk_rri);
|
||||
|
||||
/* Setup default slots for ON CONFLICT handling, in case of no tuple
|
||||
* conversion. */
|
||||
state->existing_slot = get_default_existing_slot(state, dispatch);
|
||||
state->conflproj_tupdesc = get_default_confl_tupdesc(state, dispatch);
|
||||
state->conflproj_slot = get_default_confl_slot(state, dispatch);
|
||||
OnConflictSetState *onconfl = makeNode(OnConflictSetState);
|
||||
memcpy(onconfl, hyper_rri->ri_onConflict, sizeof(OnConflictSetState));
|
||||
chunk_rri->ri_onConflict = onconfl;
|
||||
|
||||
if (map)
|
||||
#if PG14_GE
|
||||
chunk_rri->ri_RootToPartitionMap = map;
|
||||
#endif
|
||||
|
||||
Assert(mt->onConflictSet);
|
||||
Assert(hyper_rri->ri_onConflict != NULL);
|
||||
|
||||
/*
|
||||
* Need a separate existing slot for each partition, as the
|
||||
* partition could be of a different AM, even if the tuple
|
||||
* descriptors match.
|
||||
*/
|
||||
onconfl->oc_Existing = table_slot_create(chunk_rri->ri_RelationDesc, NULL);
|
||||
state->existing_slot = onconfl->oc_Existing;
|
||||
|
||||
/*
|
||||
* If the chunk's tuple descriptor matches exactly the hypertable
|
||||
* (the common case), we can re-use most of the parent's ON
|
||||
* CONFLICT SET state, skipping a bunch of work. Otherwise, we
|
||||
* need to create state specific to this partition.
|
||||
*/
|
||||
if (!map)
|
||||
{
|
||||
/*
|
||||
* It's safe to reuse these from the hypertable, as we
|
||||
* only process one tuple at a time (therefore we won't
|
||||
* overwrite needed data in slots), and the results of
|
||||
* projections are independent of the underlying storage.
|
||||
* Projections and where clauses themselves don't store state
|
||||
* / are independent of the underlying storage.
|
||||
*/
|
||||
onconfl->oc_ProjSlot = hyper_rri->ri_onConflict->oc_ProjSlot;
|
||||
onconfl->oc_ProjInfo = hyper_rri->ri_onConflict->oc_ProjInfo;
|
||||
onconfl->oc_WhereClause = hyper_rri->ri_onConflict->oc_WhereClause;
|
||||
state->conflproj_slot = onconfl->oc_ProjSlot;
|
||||
}
|
||||
else
|
||||
{
|
||||
ExprContext *econtext = hyper_rri->ri_onConflict->oc_ProjInfo->pi_exprContext;
|
||||
Node *onconflict_where = ts_chunk_dispatch_get_on_conflict_where(dispatch);
|
||||
List *onconflset;
|
||||
#if PG14_GE
|
||||
List *onconflcols;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Translate expressions in onConflictSet to account for
|
||||
* different attribute numbers. For that, map partition
|
||||
* varattnos twice: first to catch the EXCLUDED
|
||||
* pseudo-relation (INNER_VAR), and second to handle the main
|
||||
* target relation (firstVarno).
|
||||
*/
|
||||
onconflset = copyObject(mt->onConflictSet);
|
||||
|
||||
Assert(map->outdesc == RelationGetDescr(chunk_rel));
|
||||
|
||||
@ -383,30 +399,53 @@ setup_on_conflict_state(ChunkInsertState *state, ChunkDispatch *dispatch,
|
||||
RelationGetDescr(hyper_rel),
|
||||
gettext_noop("could not convert row type"));
|
||||
|
||||
onconflset = translate_clause(ts_chunk_dispatch_get_on_conflict_set(dispatch),
|
||||
onconflset = translate_clause(onconflset,
|
||||
chunk_map,
|
||||
hyper_rri->ri_RangeTableIndex,
|
||||
hyper_rel,
|
||||
chunk_rel);
|
||||
|
||||
#if PG14_LT
|
||||
onconflset = adjust_hypertable_tlist(onconflset, state->hyper_to_chunk_map);
|
||||
#else
|
||||
chunk_rri->ri_ChildToRootMap = chunk_map;
|
||||
chunk_rri->ri_ChildToRootMapValid = true;
|
||||
|
||||
/* Finally, adjust the target colnos to match the chunk. */
|
||||
if (chunk_map)
|
||||
onconflcols = adjust_chunk_colnos(mt->onConflictCols, chunk_rri);
|
||||
else
|
||||
onconflcols = mt->onConflictCols;
|
||||
#endif
|
||||
|
||||
/* create the tuple slot for the UPDATE SET projection */
|
||||
state->conflproj_tupdesc = ExecTypeFromTL(onconflset);
|
||||
state->conflproj_slot = get_confl_slot(state, dispatch, state->conflproj_tupdesc);
|
||||
onconfl->oc_ProjSlot = table_slot_create(chunk_rel, NULL);
|
||||
state->conflproj_slot = onconfl->oc_ProjSlot;
|
||||
|
||||
/* build UPDATE SET projection state */
|
||||
chunk_rri->ri_onConflict->oc_ProjInfo =
|
||||
ExecBuildProjectionInfo(onconflset,
|
||||
econtext,
|
||||
state->conflproj_slot,
|
||||
NULL,
|
||||
RelationGetDescr(chunk_rel));
|
||||
#if PG14_LT
|
||||
ExprContext *econtext = hyper_rri->ri_onConflict->oc_ProjInfo->pi_exprContext;
|
||||
onconfl->oc_ProjInfo = ExecBuildProjectionInfo(onconflset,
|
||||
econtext,
|
||||
state->conflproj_slot,
|
||||
NULL,
|
||||
RelationGetDescr(chunk_rel));
|
||||
#else
|
||||
onconfl->oc_ProjInfo = ExecBuildUpdateProjection(onconflset,
|
||||
true,
|
||||
onconflcols,
|
||||
RelationGetDescr(chunk_rel),
|
||||
mtstate->ps.ps_ExprContext,
|
||||
onconfl->oc_ProjSlot,
|
||||
&mtstate->ps);
|
||||
#endif
|
||||
|
||||
Node *onconflict_where = mt->onConflictWhere;
|
||||
|
||||
/*
|
||||
* Map attribute numbers in the WHERE clause, if it exists.
|
||||
*/
|
||||
if (NULL != onconflict_where)
|
||||
if (onconflict_where && chunk_map)
|
||||
{
|
||||
List *clause = translate_clause(castNode(List, onconflict_where),
|
||||
chunk_map,
|
||||
@ -414,7 +453,7 @@ setup_on_conflict_state(ChunkInsertState *state, ChunkDispatch *dispatch,
|
||||
hyper_rel,
|
||||
chunk_rel);
|
||||
|
||||
chunk_rri->ri_onConflict->oc_WhereClause = create_on_conflict_where_qual(clause);
|
||||
chunk_rri->ri_onConflict->oc_WhereClause = ExecInitQual(clause, NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,8 +38,6 @@ typedef struct ChunkInsertState
|
||||
* tuple descriptor on the slots when switching between chunks.
|
||||
*/
|
||||
|
||||
/* Tuple descriptor for projected tuple in ON CONFLICT handling */
|
||||
TupleDesc conflproj_tupdesc;
|
||||
/* Pointer to slot for projected tuple in ON CONFLICT handling */
|
||||
TupleTableSlot *conflproj_slot;
|
||||
/* Pointer to slot for tuple replaced in ON CONFLICT DO UPDATE
|
||||
|
@ -54,8 +54,6 @@ static bool ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resul
|
||||
ItemPointer conflictTid, TupleTableSlot *planSlot,
|
||||
TupleTableSlot *excludedSlot, EState *estate, bool canSetTag,
|
||||
TupleTableSlot **returning);
|
||||
static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate, EState *estate,
|
||||
ResultRelInfo *targetRelInfo, TupleTableSlot *slot);
|
||||
static void ExecCheckTupleVisible(EState *estate, Relation rel, TupleTableSlot *slot);
|
||||
static void ExecCheckTIDVisible(EState *estate, ResultRelInfo *relinfo, ItemPointer tid,
|
||||
TupleTableSlot *tempSlot);
|
||||
@ -400,13 +398,11 @@ hypertable_insert_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *be
|
||||
{
|
||||
HypertableInsertPath *hipath = (HypertableInsertPath *) best_path;
|
||||
CustomScan *cscan = makeNode(CustomScan);
|
||||
ModifyTable *mt = linitial(custom_plans);
|
||||
ModifyTable *mt = linitial_node(ModifyTable, custom_plans);
|
||||
FdwRoutine *fdwroutine = NULL;
|
||||
|
||||
Assert(IsA(mt, ModifyTable));
|
||||
|
||||
cscan->methods = &hypertable_insert_plan_methods;
|
||||
cscan->custom_plans = list_make1(mt);
|
||||
cscan->custom_plans = custom_plans;
|
||||
cscan->scan.scanrelid = 0;
|
||||
|
||||
/* Copy costs, etc., from the original plan */
|
||||
@ -515,6 +511,18 @@ ts_hypertable_insert_path_create(PlannerInfo *root, ModifyTablePath *mtpath)
|
||||
elog(ERROR, "multiple top-level subpaths found during INSERT");
|
||||
#endif
|
||||
|
||||
#if PG14_GE
|
||||
/* PG14 only copies child rows and width if returningLists is not
|
||||
* empty. Since we do not know target chunks during planning we
|
||||
* do not have that information when postgres creates the path.
|
||||
*/
|
||||
if (mtpath->returningLists == NIL)
|
||||
{
|
||||
mtpath->path.rows = mtpath->subpath->rows;
|
||||
mtpath->path.pathtarget->width = mtpath->subpath->pathtarget->width;
|
||||
}
|
||||
#endif
|
||||
|
||||
Index rti = linitial_int(mtpath->resultRelations);
|
||||
RangeTblEntry *rte = planner_rt_fetch(rti, root);
|
||||
|
||||
@ -1152,7 +1160,6 @@ exec_chunk_insert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, Tuple
|
||||
MemoryContext oldContext;
|
||||
|
||||
Assert(!mtstate->mt_partition_tuple_routing);
|
||||
slot = ExecPrepareTupleRouting(mtstate, estate, resultRelInfo, slot);
|
||||
|
||||
ExecMaterializeSlot(slot);
|
||||
|
||||
@ -2181,56 +2188,6 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo,
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* ExecPrepareTupleRouting --- prepare for routing one tuple
|
||||
*
|
||||
* Determine the partition in which the tuple in slot is to be inserted,
|
||||
* and return its ResultRelInfo in *partRelInfo. The return value is
|
||||
* a slot holding the tuple of the partition rowtype.
|
||||
*
|
||||
* This also sets the transition table information in mtstate based on the
|
||||
* selected partition.
|
||||
*
|
||||
* based on function with same name from executor/nodeModifyTable.c
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
ExecPrepareTupleRouting(ModifyTableState *mtstate, EState *estate, ResultRelInfo *targetRelInfo,
|
||||
TupleTableSlot *slot)
|
||||
{
|
||||
TupleConversionMap *map;
|
||||
|
||||
/*
|
||||
* If we're capturing transition tuples, we might need to convert from the
|
||||
* partition rowtype to root partitioned table's rowtype. But if there
|
||||
* are no BEFORE triggers on the partition that could change the tuple, we
|
||||
* can just remember the original unconverted tuple to avoid a needless
|
||||
* round trip conversion.
|
||||
*/
|
||||
if (mtstate->mt_transition_capture != NULL)
|
||||
{
|
||||
bool has_before_insert_row_trig;
|
||||
|
||||
has_before_insert_row_trig =
|
||||
(targetRelInfo->ri_TrigDesc && targetRelInfo->ri_TrigDesc->trig_insert_before_row);
|
||||
|
||||
mtstate->mt_transition_capture->tcs_original_insert_tuple =
|
||||
!has_before_insert_row_trig ? slot : NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Convert the tuple, if necessary.
|
||||
*/
|
||||
map = targetRelInfo->ri_RootToPartitionMap;
|
||||
if (map != NULL)
|
||||
{
|
||||
TupleTableSlot *new_slot = targetRelInfo->ri_PartitionTupleSlot;
|
||||
|
||||
slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
|
||||
}
|
||||
|
||||
return slot;
|
||||
}
|
||||
|
||||
/*
|
||||
* ExecCheckTupleVisible -- verify tuple is visible
|
||||
*
|
||||
|
Loading…
x
Reference in New Issue
Block a user