From ff88de9b1c0310559fef35b0eabd643d93c46137 Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Mon, 6 Nov 2023 14:30:59 +0100 Subject: [PATCH] Determine data type of vector agg at plan stage This patch adds the support for the dynamic detection of the data type for a vectorized aggregate. In addition, it removes the hard-coded integer data type and initializes the decompression_map properly. This also fixes an invalid memory access. --- .../nodes/decompress_chunk/decompress_chunk.h | 12 +++++ tsl/src/nodes/decompress_chunk/exec.c | 51 +++++++++++-------- tsl/src/nodes/decompress_chunk/exec.h | 1 + tsl/src/nodes/decompress_chunk/planner.c | 15 +++++- 4 files changed, 56 insertions(+), 23 deletions(-) diff --git a/tsl/src/nodes/decompress_chunk/decompress_chunk.h b/tsl/src/nodes/decompress_chunk/decompress_chunk.h index 7c2cd8283..ae774f435 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_chunk.h +++ b/tsl/src/nodes/decompress_chunk/decompress_chunk.h @@ -93,6 +93,18 @@ typedef struct DecompressChunkPath */ bool perform_vectorized_aggregation; + /* + * Columns that are used for vectorized aggregates. The list contains for each attribute -1 if + * this is not an vectorized aggregate column or the Oid of the data type of the attribute. + * + * When creating vectorized aggregates, the decompression logic is not able to determine the + * type of the compressed column based on the output column since we emit partial aggregates + * for this attribute and the raw attribute is not found in the targetlist. So, build a map + * with the used data types here, which is used later to create the compression info + * properly. + */ + List *aggregated_column_type; + List *compressed_pathkeys; bool needs_sequence_num; bool reverse; diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index 4dcbfc3fe..7eded996f 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -142,12 +142,13 @@ decompress_chunk_state_create(CustomScan *cscan) chunk_state->csstate.methods = &chunk_state->exec_methods; Assert(IsA(cscan->custom_private, List)); - Assert(list_length(cscan->custom_private) == 5); + Assert(list_length(cscan->custom_private) == 6); List *settings = linitial(cscan->custom_private); chunk_state->decompression_map = lsecond(cscan->custom_private); chunk_state->is_segmentby_column = lthird(cscan->custom_private); chunk_state->bulk_decompression_column = lfourth(cscan->custom_private); - chunk_state->sortinfo = lfifth(cscan->custom_private); + chunk_state->aggregated_column_type = lfifth(cscan->custom_private); + chunk_state->sortinfo = lsixth(cscan->custom_private); chunk_state->custom_scan_tlist = cscan->custom_scan_tlist; Assert(IsA(settings, IntList)); @@ -162,6 +163,16 @@ decompress_chunk_state_create(CustomScan *cscan) Assert(IsA(cscan->custom_exprs, List)); Assert(list_length(cscan->custom_exprs) == 1); chunk_state->vectorized_quals_original = linitial(cscan->custom_exprs); + Assert(list_length(chunk_state->decompression_map) == + list_length(chunk_state->is_segmentby_column)); + +#ifdef USE_ASSERT_CHECKING + if (chunk_state->perform_vectorized_aggregation) + { + Assert(list_length(chunk_state->decompression_map) == + list_length(chunk_state->aggregated_column_type)); + } +#endif return (Node *) chunk_state; } @@ -307,8 +318,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) ListCell *dest_cell; ListCell *is_segmentby_cell; - Assert(list_length(chunk_state->decompression_map) == - list_length(chunk_state->is_segmentby_column)); + forboth (dest_cell, chunk_state->decompression_map, is_segmentby_cell, @@ -364,12 +374,22 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) if (column.output_attno > 0) { - /* normal column that is also present in decompressed chunk */ - Form_pg_attribute attribute = - TupleDescAttr(desc, AttrNumberGetAttrOffset(column.output_attno)); + if (chunk_state->perform_vectorized_aggregation && + lfirst_int(list_nth_cell(chunk_state->aggregated_column_type, compressed_index)) != + -1) + { + column.typid = lfirst_int( + list_nth_cell(chunk_state->aggregated_column_type, compressed_index)); + } + else + { + /* normal column that is also present in decompressed chunk */ + Form_pg_attribute attribute = + TupleDescAttr(desc, AttrNumberGetAttrOffset(column.output_attno)); - column.typid = attribute->atttypid; - column.value_bytes = get_typlen(column.typid); + column.typid = attribute->atttypid; + column.value_bytes = get_typlen(column.typid); + } if (list_nth_int(chunk_state->is_segmentby_column, compressed_index)) column.type = SEGMENTBY_COLUMN; @@ -630,21 +650,8 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) { Assert(chunk_state->enable_bulk_decompression); Assert(column_description->bulk_decompression_supported); - - /* Due to the needed manipulation of the target list to emit partials (see - * decompress_chunk_plan_create), PostgreSQL is not able to determine the type of the - * compressed column automatically. So, correct the column type to the correct value. */ Assert(list_length(aggref->args) == 1); -#ifdef USE_ASSERT_CHECKING - TargetEntry *tlentry = (TargetEntry *) linitial(aggref->args); - Assert(IsA(tlentry->expr, Var)); - Var *input_var = castNode(Var, tlentry->expr); - Assert(input_var->vartype == INT4OID); -#endif - - column_description->typid = INT4OID; - while (true) { TupleTableSlot *compressed_slot = diff --git a/tsl/src/nodes/decompress_chunk/exec.h b/tsl/src/nodes/decompress_chunk/exec.h index 1e9d3b072..73c144885 100644 --- a/tsl/src/nodes/decompress_chunk/exec.h +++ b/tsl/src/nodes/decompress_chunk/exec.h @@ -50,6 +50,7 @@ typedef struct DecompressChunkState List *decompression_map; List *is_segmentby_column; List *bulk_decompression_column; + List *aggregated_column_type; List *custom_scan_tlist; int num_total_columns; int num_compressed_columns; diff --git a/tsl/src/nodes/decompress_chunk/planner.c b/tsl/src/nodes/decompress_chunk/planner.c index 1353af574..548e7a22c 100644 --- a/tsl/src/nodes/decompress_chunk/planner.c +++ b/tsl/src/nodes/decompress_chunk/planner.c @@ -268,6 +268,18 @@ build_decompression_map(PlannerInfo *root, DecompressChunkPath *path, List *scan .fd = *compression_info, .bulk_decompression_possible = bulk_decompression_possible }; } + + if (path->perform_vectorized_aggregation) + { + Assert(list_length(path->custom_path.path.parent->reltarget->exprs) == 1); + Var *var = linitial(path->custom_path.path.parent->reltarget->exprs); + Assert((Index) var->varno == path->custom_path.path.parent->relid); + if (var->varattno == destination_attno_in_uncompressed_chunk) + path->aggregated_column_type = + lappend_int(path->aggregated_column_type, var->vartype); + else + path->aggregated_column_type = lappend_int(path->aggregated_column_type, -1); + } } /* @@ -891,10 +903,11 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat */ decompress_plan->custom_exprs = list_make1(vectorized_quals); - decompress_plan->custom_private = list_make5(settings, + decompress_plan->custom_private = list_make6(settings, dcpath->decompression_map, dcpath->is_segmentby_column, dcpath->bulk_decompression_column, + dcpath->aggregated_column_type, sort_options); return &decompress_plan->scan.plan;