Refactor expression handling for compressed UPDATE/DELETE

This commit is contained in:
Sven Klemm 2024-05-02 11:59:15 +02:00 committed by Sven Klemm
parent 5bf6a3f502
commit 0075bb0c81
4 changed files with 137 additions and 32 deletions

View File

@ -20,6 +20,7 @@ set(SOURCES
event_trigger.c
extension.c
extension_constants.c
expression_utils.c
gapfill.c
guc.c
histogram.c

107
src/expression_utils.c Normal file
View File

@ -0,0 +1,107 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include <catalog/pg_type.h>
#include <nodes/primnodes.h>
#include <utils/lsyscache.h>
#include "expression_utils.h"
#include "export.h"
/*
* This function is meant to extract the expression components to be used in a ScanKey.
*
* It will work on the following expression types:
* - Var OP Expr
*
* Var OP Var is not supported as that will not work with scankeys.
*
*/
bool TSDLLEXPORT
ts_extract_expr_args(Expr *expr, Var **var, Expr **arg_value, Oid *opno, Oid *opcode)
{
List *args;
Oid expr_opno, expr_opcode;
switch (nodeTag(expr))
{
case T_OpExpr:
{
OpExpr *opexpr = castNode(OpExpr, expr);
args = opexpr->args;
expr_opno = opexpr->opno;
expr_opcode = opexpr->opfuncid;
if (opexpr->opresulttype != BOOLOID)
return false;
break;
}
case T_ScalarArrayOpExpr:
{
ScalarArrayOpExpr *sa_opexpr = castNode(ScalarArrayOpExpr, expr);
args = sa_opexpr->args;
expr_opno = sa_opexpr->opno;
expr_opcode = sa_opexpr->opfuncid;
break;
}
default:
return false;
}
if (list_length(args) != 2)
return false;
Expr *leftop = linitial(args);
Expr *rightop = lsecond(args);
if (IsA(leftop, RelabelType))
leftop = castNode(RelabelType, leftop)->arg;
if (IsA(rightop, RelabelType))
rightop = castNode(RelabelType, rightop)->arg;
if (IsA(leftop, Var) && !IsA(rightop, Var))
{
/* ignore system columns */
if (castNode(Var, leftop)->varattno <= 0)
return false;
*var = castNode(Var, leftop);
*arg_value = rightop;
*opno = expr_opno;
if (opcode)
*opcode = expr_opcode;
return true;
}
else if (IsA(rightop, Var) && !IsA(leftop, Var))
{
/* ignore system columns */
if (castNode(Var, rightop)->varattno <= 0)
return false;
*var = castNode(Var, rightop);
*arg_value = leftop;
expr_opno = get_commutator(expr_opno);
if (!OidIsValid(expr_opno))
return false;
if (opcode)
{
expr_opcode = get_opcode(expr_opno);
if (!OidIsValid(expr_opcode))
return false;
*opcode = expr_opcode;
}
*opno = expr_opno;
return true;
}
return false;
}

14
src/expression_utils.h Normal file
View File

@ -0,0 +1,14 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include <nodes/primnodes.h>
#include <utils/lsyscache.h>
#include "export.h"
bool TSDLLEXPORT ts_extract_expr_args(Expr *expr, Var **var, Expr **arg_value, Oid *opno,
Oid *opcode);

View File

@ -53,6 +53,7 @@
#include "debug_point.h"
#include "deltadelta.h"
#include "dictionary.h"
#include "expression_utils.h"
#include "gorilla.h"
#include "guc.h"
#include "nodes/chunk_dispatch/chunk_insert_state.h"
@ -2538,8 +2539,8 @@ find_matching_index(Relation comp_chunk_rel, List **index_filters, List **heap_f
* be used to build scan keys later.
*/
static void
fill_predicate_context(Chunk *ch, CompressionSettings *settings, List *predicates,
List **heap_filters, List **index_filters, List **is_null)
process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, List **heap_filters,
List **index_filters, List **is_null)
{
ListCell *lc;
foreach (lc, predicates)
@ -2548,44 +2549,26 @@ fill_predicate_context(Chunk *ch, CompressionSettings *settings, List *predicate
Var *var;
char *column_name;
switch (nodeTag(node))
{
case T_OpExpr:
{
OpExpr *opexpr = (OpExpr *) node;
Oid opno = opexpr->opno;
RegProcedure opcode = opexpr->opfuncid;
OpExpr *opexpr = castNode(OpExpr, node);
Oid opno;
RegProcedure opcode;
Oid collation = opexpr->inputcollid;
Expr *leftop, *rightop;
Expr *expr;
Const *arg_value;
leftop = linitial(opexpr->args);
rightop = lsecond(opexpr->args);
if (IsA(leftop, RelabelType))
leftop = ((RelabelType *) leftop)->arg;
if (IsA(rightop, RelabelType))
rightop = ((RelabelType *) rightop)->arg;
if (IsA(leftop, Var) && IsA(rightop, Const))
{
var = (Var *) leftop;
arg_value = (Const *) rightop;
}
else if (IsA(rightop, Var) && IsA(leftop, Const))
{
var = (Var *) rightop;
arg_value = (Const *) leftop;
opno = get_commutator(opno);
if (!OidIsValid(opno))
continue;
opcode = get_opcode(opno);
if (!OidIsValid(opcode))
continue;
}
else
if (!ts_extract_expr_args(&opexpr->xpr, &var, &expr, &opno, &opcode))
continue;
if (!IsA(expr, Const))
continue;
arg_value = castNode(Const, expr);
/* ignore system-defined attributes */
if (var->varattno <= 0)
continue;
@ -3124,7 +3107,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
CompressionSettings *settings = ts_compression_settings_get(comp_chunk->table_id);
fill_predicate_context(chunk, settings, predicates, &heap_filters, &index_filters, &is_null);
process_predicates(chunk, settings, predicates, &heap_filters, &index_filters, &is_null);
chunk_rel = table_open(chunk->table_id, RowExclusiveLock);
comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock);