Treat segmentby columns same as compressed columns with default value (#6817)

This is a minor refactoring that will later allow to simplify the
vectorized aggregation code. No functional or performance changes are
expected.
This commit is contained in:
Alexander Kuzmenkov 2024-04-12 11:10:16 +02:00 committed by GitHub
parent 6e7b6e9a6e
commit 4420561485
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 108 additions and 67 deletions

View File

@ -159,7 +159,7 @@ static void
decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state,
TupleTableSlot *compressed_slot, int i)
{
CompressionColumnDescription *column_description = &dcontext->template_columns[i];
CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i];
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->arrow = NULL;
const AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno);
@ -177,7 +177,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
* The column will have a default value for the entire batch,
* set it now.
*/
column_values->decompression_type = DT_Default;
column_values->decompression_type = DT_Scalar;
*column_values->output_value =
getmissingattr(dcontext->decompressed_slot->tts_tupleDescriptor,
@ -396,22 +396,21 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
Var *var = castNode(Var, linitial(args));
CompressionColumnDescription *column_description = NULL;
int column_index = 0;
for (; column_index < dcontext->num_total_columns; column_index++)
for (; column_index < dcontext->num_data_columns; column_index++)
{
column_description = &dcontext->template_columns[column_index];
column_description = &dcontext->compressed_chunk_columns[column_index];
if (column_description->output_attno == var->varattno)
{
break;
}
}
Ensure(column_index < dcontext->num_total_columns,
Ensure(column_index < dcontext->num_data_columns,
"decompressed column %d not found in batch",
var->varattno);
Assert(column_description != NULL);
Assert(column_description->typid == var->vartype);
Ensure(column_description->type == COMPRESSED_COLUMN,
"only compressed columns are supported in vectorized quals");
Assert(column_index < dcontext->num_compressed_columns);
CompressedColumnValues *column_values = &batch_state->compressed_columns[column_index];
@ -444,7 +443,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
* with this default value, check if it passes the predicate, and apply
* it to the entire batch.
*/
Assert(column_values->decompression_type == DT_Default);
Assert(column_values->decompression_type == DT_Scalar);
/*
* We saved the actual default value into the decompressed scan slot
@ -548,7 +547,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
/* Translate the result if the column had a default value. */
if (column_values->arrow == NULL)
{
Assert(column_values->decompression_type == DT_Default);
Assert(column_values->decompression_type == DT_Scalar);
if (!(default_value_predicate_result[0] & 1))
{
/*
@ -804,20 +803,20 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
MemoryContextReset(batch_state->per_batch_context);
for (int i = 0; i < dcontext->num_total_columns; i++)
for (int i = 0; i < dcontext->num_columns_with_metadata; i++)
{
CompressionColumnDescription *column_description = &dcontext->template_columns[i];
CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i];
switch (column_description->type)
{
case COMPRESSED_COLUMN:
{
Assert(i < dcontext->num_compressed_columns);
/*
* We decompress the compressed columns on demand, so that we can
* skip decompressing some columns if the entire batch doesn't pass
* the quals. Skip them for now.
*/
Assert(i < dcontext->num_data_columns);
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->decompression_type = DT_Invalid;
column_values->arrow = NULL;
@ -830,25 +829,32 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
* and our output tuples are read-only, so it's enough to only
* save it once per batch, which we do here.
*/
Assert(i < dcontext->num_data_columns);
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->decompression_type = DT_Scalar;
AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno);
decompressed_tuple->tts_values[attr] =
slot_getattr(compressed_slot,
Datum *output_value = &decompressed_tuple->tts_values[attr];
bool *output_isnull = &decompressed_tuple->tts_isnull[attr];
column_values->output_value = output_value;
column_values->output_isnull = output_isnull;
column_values->arrow = NULL;
*output_value = slot_getattr(compressed_slot,
column_description->compressed_scan_attno,
&decompressed_tuple->tts_isnull[attr]);
output_isnull);
/*
* Note that if it's not a by-value type, we should copy it into
* the slot context.
*/
if (!column_description->by_value &&
DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL)
if (!column_description->by_value && !*output_isnull &&
DatumGetPointer(*output_value) != NULL)
{
if (column_description->value_bytes < 0)
{
/* This is a varlena type. */
decompressed_tuple->tts_values[attr] = PointerGetDatum(
detoaster_detoast_attr_copy((struct varlena *)
decompressed_tuple->tts_values[attr],
*output_value = PointerGetDatum(
detoaster_detoast_attr_copy((struct varlena *) *output_value,
&dcontext->detoaster,
batch_state->per_batch_context));
}
@ -858,9 +864,9 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
void *tmp = MemoryContextAlloc(batch_state->per_batch_context,
column_description->value_bytes);
memcpy(tmp,
DatumGetPointer(decompressed_tuple->tts_values[attr]),
DatumGetPointer(*output_value),
column_description->value_bytes);
decompressed_tuple->tts_values[attr] = PointerGetDatum(tmp);
*output_value = PointerGetDatum(tmp);
}
}
break;
@ -923,8 +929,8 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
* We have some rows in the batch that pass the vectorized filters, so
* we have to decompress the rest of the compressed columns.
*/
const int num_compressed_columns = dcontext->num_compressed_columns;
for (int i = 0; i < num_compressed_columns; i++)
const int num_data_columns = dcontext->num_data_columns;
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Invalid)
@ -965,14 +971,14 @@ store_text_datum(CompressedColumnValues *column_values, int arrow_row)
* Doesn't check the quals.
*/
static void
make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_compressed_columns)
make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_data_columns)
{
TupleTableSlot *decompressed_scan_slot = &batch_state->decompressed_scan_slot_data.base;
Assert(batch_state->total_batch_rows > 0);
Assert(batch_state->next_batch_row < batch_state->total_batch_rows);
for (int i = 0; i < num_compressed_columns; i++)
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Iterator)
@ -1035,7 +1041,7 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com
else
{
/* A compressed column with default value, do nothing. */
Assert(column_values->decompression_type == DT_Default);
Assert(column_values->decompression_type == DT_Scalar);
}
}
@ -1101,7 +1107,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc
TupleTableSlot *decompressed_scan_slot = &batch_state->decompressed_scan_slot_data.base;
const bool reverse = dcontext->reverse;
const int num_compressed_columns = dcontext->num_compressed_columns;
const int num_data_columns = dcontext->num_data_columns;
for (; batch_state->next_batch_row < batch_state->total_batch_rows;
batch_state->next_batch_row++)
@ -1116,7 +1122,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc
* This row doesn't pass the vectorized quals. Advance the iterated
* compressed columns if we have any.
*/
for (int i = 0; i < num_compressed_columns; i++)
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Iterator)
@ -1131,7 +1137,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc
continue;
}
make_next_tuple(batch_state, arrow_row, num_compressed_columns);
make_next_tuple(batch_state, arrow_row, num_data_columns);
if (!postgres_qual(dcontext, batch_state))
{
@ -1153,7 +1159,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc
* row-by-row have also ended.
*/
Assert(batch_state->next_batch_row == batch_state->total_batch_rows);
for (int i = 0; i < num_compressed_columns; i++)
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Iterator)
@ -1191,8 +1197,8 @@ compressed_batch_save_first_tuple(DecompressContext *dcontext, DecompressBatchSt
* vectorized decompression is disabled with sorted merge.
*/
#ifdef USE_ASSERT_CHECKING
const int num_compressed_columns = dcontext->num_compressed_columns;
for (int i = 0; i < num_compressed_columns; i++)
const int num_data_columns = dcontext->num_data_columns;
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
Assert(column_values->decompression_type != DT_Invalid);
@ -1202,7 +1208,7 @@ compressed_batch_save_first_tuple(DecompressContext *dcontext, DecompressBatchSt
/* Make the first tuple and save it. */
Assert(batch_state->next_batch_row == 0);
const uint16 arrow_row = dcontext->reverse ? batch_state->total_batch_rows - 1 : 0;
make_next_tuple(batch_state, arrow_row, dcontext->num_compressed_columns);
make_next_tuple(batch_state, arrow_row, dcontext->num_data_columns);
ExecCopySlot(first_tuple_slot, &batch_state->decompressed_scan_slot_data.base);
/*

View File

@ -14,13 +14,23 @@ typedef struct ArrowArray ArrowArray;
typedef enum
{
DT_ArrowTextDict = -4,
DT_ArrowText = -3,
DT_Default = -2,
/*
* The decompressed value is already in the decompressed slot. This is used
* for segmentby and compressed columns with default value in batch.
*/
DT_Scalar = -2,
DT_Iterator = -1,
DT_Invalid = 0,
/*
* Any positive number is also valid for the decompression type. It means
* arrow array of a fixed-size by-value type, with size given by the number.
* arrow array of a fixed-size by-value type, with size in bytes given by
* the number.
*/
} DecompressionType;
@ -93,6 +103,12 @@ typedef struct DecompressBatchState
*/
uint64 *restrict vector_qual_result;
/*
* This follows DecompressContext.compressed_chunk_columns, but does not
* include the trailing metadata columns, but only the leading data columns.
* These columns are compressed and segmentby columns, their total number is
* given by DecompressContext.num_data_columns.
*/
CompressedColumnValues compressed_columns[FLEXIBLE_ARRAY_MEMBER];
} DecompressBatchState;

View File

@ -48,9 +48,23 @@ typedef struct CompressionColumnDescription
typedef struct DecompressContext
{
CompressionColumnDescription *template_columns;
int num_total_columns;
int num_compressed_columns;
/*
* Note that this array contains only those columns that are decompressed
* (output_attno != 0), and the order is different from the compressed chunk
* tuple order: first go the actual data columns, and after that the metadata
* columns.
*/
CompressionColumnDescription *compressed_chunk_columns;
/*
* This includes all decompressed columns (output_attno != 0), including the
* metadata columns.
*/
int num_columns_with_metadata;
/* This excludes the metadata columns. */
int num_data_columns;
List *vectorized_quals_constified;
bool reverse;
bool batch_sorted_merge; /* Merge append optimization enabled */

View File

@ -219,13 +219,15 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
node->custom_ps = lappend(node->custom_ps, ExecInitNode(compressed_scan, estate, eflags));
/*
* Determine which columns we are going to decompress. Since in the hottest
* loop we work only with compressed columns, we'll put them in front of the
* array. So first, count how many compressed and not compressed columns
* we have.
* Count the actual data columns we have to decompress, skipping the
* metadata columns. We only need the metadata columns when initializing the
* compressed batch, so they are not saved in the compressed batch itself,
* it tracks only the data columns. We put the metadata columns to the end
* of the array to have the same column indexes in compressed batch state
* and in decompression context.
*/
int num_compressed = 0;
int num_total = 0;
int num_data_columns = 0;
int num_columns_with_metadata = 0;
ListCell *dest_cell;
ListCell *is_segmentby_cell;
@ -242,22 +244,22 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
continue;
}
if (output_attno > 0 && !lfirst_int(is_segmentby_cell))
if (output_attno > 0)
{
/*
* Not a metadata column and not a segmentby column, hence a
* compressed one.
* Not a metadata column.
*/
num_compressed++;
num_data_columns++;
}
num_total++;
num_columns_with_metadata++;
}
Assert(num_compressed <= num_total);
dcontext->num_compressed_columns = num_compressed;
dcontext->num_total_columns = num_total;
dcontext->template_columns = palloc0(sizeof(CompressionColumnDescription) * num_total);
Assert(num_data_columns <= num_columns_with_metadata);
dcontext->num_data_columns = num_data_columns;
dcontext->num_columns_with_metadata = num_columns_with_metadata;
dcontext->compressed_chunk_columns =
palloc0(sizeof(CompressionColumnDescription) * num_columns_with_metadata);
dcontext->decompressed_slot = node->ss.ss_ScanTupleSlot;
dcontext->ps = &node->ss.ps;
@ -268,7 +270,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
* separate indices for them.
*/
int current_compressed = 0;
int current_not_compressed = num_compressed;
int current_not_compressed = num_data_columns;
for (int compressed_index = 0; compressed_index < list_length(chunk_state->decompression_map);
compressed_index++)
{
@ -316,20 +318,22 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
}
}
if (column.type == COMPRESSED_COLUMN)
if (column.output_attno > 0)
{
Assert(current_compressed < num_total);
dcontext->template_columns[current_compressed++] = column;
/* Data column. */
Assert(current_compressed < num_columns_with_metadata);
dcontext->compressed_chunk_columns[current_compressed++] = column;
}
else
{
Assert(current_not_compressed < num_total);
dcontext->template_columns[current_not_compressed++] = column;
/* Metadata column. */
Assert(current_not_compressed < num_columns_with_metadata);
dcontext->compressed_chunk_columns[current_not_compressed++] = column;
}
}
Assert(current_compressed == num_compressed);
Assert(current_not_compressed == num_total);
Assert(current_compressed == num_data_columns);
Assert(current_not_compressed == num_columns_with_metadata);
/*
* Choose which batch queue we are going to use: heap for batch sorted
@ -338,7 +342,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
if (dcontext->batch_sorted_merge)
{
chunk_state->batch_queue =
batch_queue_heap_create(num_compressed,
batch_queue_heap_create(num_data_columns,
chunk_state->sortinfo,
dcontext->decompressed_slot->tts_tupleDescriptor,
&BatchQueueFunctionsHeap);
@ -347,7 +351,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
else
{
chunk_state->batch_queue =
batch_queue_fifo_create(num_compressed, &BatchQueueFunctionsFifo);
batch_queue_fifo_create(num_data_columns, &BatchQueueFunctionsFifo);
chunk_state->exec_methods.ExecCustomScan = decompress_chunk_exec_fifo;
}

View File

@ -81,9 +81,9 @@ vector_agg_exec(CustomScanState *vector_agg_state)
DecompressContext *dcontext = &decompress_state->decompress_context;
CompressionColumnDescription *value_column_description = NULL;
for (int i = 0; i < dcontext->num_total_columns; i++)
for (int i = 0; i < dcontext->num_data_columns; i++)
{
CompressionColumnDescription *current_column = &dcontext->template_columns[i];
CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i];
if (current_column->output_attno == var->varattno)
{
value_column_description = current_column;
@ -132,7 +132,8 @@ vector_agg_exec(CustomScanState *vector_agg_state)
Assert(dcontext->enable_bulk_decompression);
Assert(value_column_description->bulk_decompression_supported);
CompressedColumnValues *values =
&batch_state->compressed_columns[value_column_description - dcontext->template_columns];
&batch_state->compressed_columns[value_column_description -
dcontext->compressed_chunk_columns];
Assert(values->decompression_type != DT_Invalid);
arrow = values->arrow;
}