Determine data type of vector agg at plan stage

This patch adds the support for the dynamic detection of the data type
for a vectorized aggregate. In addition, it removes the hard-coded
integer data type and initializes the decompression_map properly. This
also fixes an invalid memory access.
This commit is contained in:
Jan Nidzwetzki 2023-11-06 14:30:59 +01:00 committed by Jan Nidzwetzki
parent e90280a07e
commit ff88de9b1c
4 changed files with 56 additions and 23 deletions

View File

@ -93,6 +93,18 @@ typedef struct DecompressChunkPath
*/ */
bool perform_vectorized_aggregation; bool perform_vectorized_aggregation;
/*
* Columns that are used for vectorized aggregates. The list contains for each attribute -1 if
* this is not an vectorized aggregate column or the Oid of the data type of the attribute.
*
* When creating vectorized aggregates, the decompression logic is not able to determine the
* type of the compressed column based on the output column since we emit partial aggregates
* for this attribute and the raw attribute is not found in the targetlist. So, build a map
* with the used data types here, which is used later to create the compression info
* properly.
*/
List *aggregated_column_type;
List *compressed_pathkeys; List *compressed_pathkeys;
bool needs_sequence_num; bool needs_sequence_num;
bool reverse; bool reverse;

View File

@ -142,12 +142,13 @@ decompress_chunk_state_create(CustomScan *cscan)
chunk_state->csstate.methods = &chunk_state->exec_methods; chunk_state->csstate.methods = &chunk_state->exec_methods;
Assert(IsA(cscan->custom_private, List)); Assert(IsA(cscan->custom_private, List));
Assert(list_length(cscan->custom_private) == 5); Assert(list_length(cscan->custom_private) == 6);
List *settings = linitial(cscan->custom_private); List *settings = linitial(cscan->custom_private);
chunk_state->decompression_map = lsecond(cscan->custom_private); chunk_state->decompression_map = lsecond(cscan->custom_private);
chunk_state->is_segmentby_column = lthird(cscan->custom_private); chunk_state->is_segmentby_column = lthird(cscan->custom_private);
chunk_state->bulk_decompression_column = lfourth(cscan->custom_private); chunk_state->bulk_decompression_column = lfourth(cscan->custom_private);
chunk_state->sortinfo = lfifth(cscan->custom_private); chunk_state->aggregated_column_type = lfifth(cscan->custom_private);
chunk_state->sortinfo = lsixth(cscan->custom_private);
chunk_state->custom_scan_tlist = cscan->custom_scan_tlist; chunk_state->custom_scan_tlist = cscan->custom_scan_tlist;
Assert(IsA(settings, IntList)); Assert(IsA(settings, IntList));
@ -162,6 +163,16 @@ decompress_chunk_state_create(CustomScan *cscan)
Assert(IsA(cscan->custom_exprs, List)); Assert(IsA(cscan->custom_exprs, List));
Assert(list_length(cscan->custom_exprs) == 1); Assert(list_length(cscan->custom_exprs) == 1);
chunk_state->vectorized_quals_original = linitial(cscan->custom_exprs); chunk_state->vectorized_quals_original = linitial(cscan->custom_exprs);
Assert(list_length(chunk_state->decompression_map) ==
list_length(chunk_state->is_segmentby_column));
#ifdef USE_ASSERT_CHECKING
if (chunk_state->perform_vectorized_aggregation)
{
Assert(list_length(chunk_state->decompression_map) ==
list_length(chunk_state->aggregated_column_type));
}
#endif
return (Node *) chunk_state; return (Node *) chunk_state;
} }
@ -307,8 +318,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
ListCell *dest_cell; ListCell *dest_cell;
ListCell *is_segmentby_cell; ListCell *is_segmentby_cell;
Assert(list_length(chunk_state->decompression_map) ==
list_length(chunk_state->is_segmentby_column));
forboth (dest_cell, forboth (dest_cell,
chunk_state->decompression_map, chunk_state->decompression_map,
is_segmentby_cell, is_segmentby_cell,
@ -363,6 +373,15 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
} }
if (column.output_attno > 0) if (column.output_attno > 0)
{
if (chunk_state->perform_vectorized_aggregation &&
lfirst_int(list_nth_cell(chunk_state->aggregated_column_type, compressed_index)) !=
-1)
{
column.typid = lfirst_int(
list_nth_cell(chunk_state->aggregated_column_type, compressed_index));
}
else
{ {
/* normal column that is also present in decompressed chunk */ /* normal column that is also present in decompressed chunk */
Form_pg_attribute attribute = Form_pg_attribute attribute =
@ -370,6 +389,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
column.typid = attribute->atttypid; column.typid = attribute->atttypid;
column.value_bytes = get_typlen(column.typid); column.value_bytes = get_typlen(column.typid);
}
if (list_nth_int(chunk_state->is_segmentby_column, compressed_index)) if (list_nth_int(chunk_state->is_segmentby_column, compressed_index))
column.type = SEGMENTBY_COLUMN; column.type = SEGMENTBY_COLUMN;
@ -630,21 +650,8 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref)
{ {
Assert(chunk_state->enable_bulk_decompression); Assert(chunk_state->enable_bulk_decompression);
Assert(column_description->bulk_decompression_supported); Assert(column_description->bulk_decompression_supported);
/* Due to the needed manipulation of the target list to emit partials (see
* decompress_chunk_plan_create), PostgreSQL is not able to determine the type of the
* compressed column automatically. So, correct the column type to the correct value. */
Assert(list_length(aggref->args) == 1); Assert(list_length(aggref->args) == 1);
#ifdef USE_ASSERT_CHECKING
TargetEntry *tlentry = (TargetEntry *) linitial(aggref->args);
Assert(IsA(tlentry->expr, Var));
Var *input_var = castNode(Var, tlentry->expr);
Assert(input_var->vartype == INT4OID);
#endif
column_description->typid = INT4OID;
while (true) while (true)
{ {
TupleTableSlot *compressed_slot = TupleTableSlot *compressed_slot =

View File

@ -50,6 +50,7 @@ typedef struct DecompressChunkState
List *decompression_map; List *decompression_map;
List *is_segmentby_column; List *is_segmentby_column;
List *bulk_decompression_column; List *bulk_decompression_column;
List *aggregated_column_type;
List *custom_scan_tlist; List *custom_scan_tlist;
int num_total_columns; int num_total_columns;
int num_compressed_columns; int num_compressed_columns;

View File

@ -268,6 +268,18 @@ build_decompression_map(PlannerInfo *root, DecompressChunkPath *path, List *scan
.fd = *compression_info, .bulk_decompression_possible = bulk_decompression_possible .fd = *compression_info, .bulk_decompression_possible = bulk_decompression_possible
}; };
} }
if (path->perform_vectorized_aggregation)
{
Assert(list_length(path->custom_path.path.parent->reltarget->exprs) == 1);
Var *var = linitial(path->custom_path.path.parent->reltarget->exprs);
Assert((Index) var->varno == path->custom_path.path.parent->relid);
if (var->varattno == destination_attno_in_uncompressed_chunk)
path->aggregated_column_type =
lappend_int(path->aggregated_column_type, var->vartype);
else
path->aggregated_column_type = lappend_int(path->aggregated_column_type, -1);
}
} }
/* /*
@ -891,10 +903,11 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
*/ */
decompress_plan->custom_exprs = list_make1(vectorized_quals); decompress_plan->custom_exprs = list_make1(vectorized_quals);
decompress_plan->custom_private = list_make5(settings, decompress_plan->custom_private = list_make6(settings,
dcpath->decompression_map, dcpath->decompression_map,
dcpath->is_segmentby_column, dcpath->is_segmentby_column,
dcpath->bulk_decompression_column, dcpath->bulk_decompression_column,
dcpath->aggregated_column_type,
sort_options); sort_options);
return &decompress_plan->scan.plan; return &decompress_plan->scan.plan;