diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index ad3843353..8d06ae0e6 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -159,7 +159,7 @@ static void decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, TupleTableSlot *compressed_slot, int i) { - CompressionColumnDescription *column_description = &dcontext->template_columns[i]; + CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i]; CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; column_values->arrow = NULL; const AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno); @@ -177,7 +177,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state * The column will have a default value for the entire batch, * set it now. */ - column_values->decompression_type = DT_Default; + column_values->decompression_type = DT_Scalar; *column_values->output_value = getmissingattr(dcontext->decompressed_slot->tts_tupleDescriptor, @@ -396,22 +396,21 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat Var *var = castNode(Var, linitial(args)); CompressionColumnDescription *column_description = NULL; int column_index = 0; - for (; column_index < dcontext->num_total_columns; column_index++) + for (; column_index < dcontext->num_data_columns; column_index++) { - column_description = &dcontext->template_columns[column_index]; + column_description = &dcontext->compressed_chunk_columns[column_index]; if (column_description->output_attno == var->varattno) { break; } } - Ensure(column_index < dcontext->num_total_columns, + Ensure(column_index < dcontext->num_data_columns, "decompressed column %d not found in batch", var->varattno); Assert(column_description != NULL); Assert(column_description->typid == var->vartype); Ensure(column_description->type == COMPRESSED_COLUMN, "only compressed columns are supported in vectorized quals"); - Assert(column_index < dcontext->num_compressed_columns); CompressedColumnValues *column_values = &batch_state->compressed_columns[column_index]; @@ -444,7 +443,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat * with this default value, check if it passes the predicate, and apply * it to the entire batch. */ - Assert(column_values->decompression_type == DT_Default); + Assert(column_values->decompression_type == DT_Scalar); /* * We saved the actual default value into the decompressed scan slot @@ -548,7 +547,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat /* Translate the result if the column had a default value. */ if (column_values->arrow == NULL) { - Assert(column_values->decompression_type == DT_Default); + Assert(column_values->decompression_type == DT_Scalar); if (!(default_value_predicate_result[0] & 1)) { /* @@ -804,20 +803,20 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, MemoryContextReset(batch_state->per_batch_context); - for (int i = 0; i < dcontext->num_total_columns; i++) + for (int i = 0; i < dcontext->num_columns_with_metadata; i++) { - CompressionColumnDescription *column_description = &dcontext->template_columns[i]; + CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i]; switch (column_description->type) { case COMPRESSED_COLUMN: { - Assert(i < dcontext->num_compressed_columns); /* * We decompress the compressed columns on demand, so that we can * skip decompressing some columns if the entire batch doesn't pass * the quals. Skip them for now. */ + Assert(i < dcontext->num_data_columns); CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; column_values->decompression_type = DT_Invalid; column_values->arrow = NULL; @@ -830,25 +829,32 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, * and our output tuples are read-only, so it's enough to only * save it once per batch, which we do here. */ + Assert(i < dcontext->num_data_columns); + CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; + column_values->decompression_type = DT_Scalar; AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno); - decompressed_tuple->tts_values[attr] = - slot_getattr(compressed_slot, - column_description->compressed_scan_attno, - &decompressed_tuple->tts_isnull[attr]); + Datum *output_value = &decompressed_tuple->tts_values[attr]; + bool *output_isnull = &decompressed_tuple->tts_isnull[attr]; + column_values->output_value = output_value; + column_values->output_isnull = output_isnull; + column_values->arrow = NULL; + + *output_value = slot_getattr(compressed_slot, + column_description->compressed_scan_attno, + output_isnull); /* * Note that if it's not a by-value type, we should copy it into * the slot context. */ - if (!column_description->by_value && - DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL) + if (!column_description->by_value && !*output_isnull && + DatumGetPointer(*output_value) != NULL) { if (column_description->value_bytes < 0) { /* This is a varlena type. */ - decompressed_tuple->tts_values[attr] = PointerGetDatum( - detoaster_detoast_attr_copy((struct varlena *) - decompressed_tuple->tts_values[attr], + *output_value = PointerGetDatum( + detoaster_detoast_attr_copy((struct varlena *) *output_value, &dcontext->detoaster, batch_state->per_batch_context)); } @@ -858,9 +864,9 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, void *tmp = MemoryContextAlloc(batch_state->per_batch_context, column_description->value_bytes); memcpy(tmp, - DatumGetPointer(decompressed_tuple->tts_values[attr]), + DatumGetPointer(*output_value), column_description->value_bytes); - decompressed_tuple->tts_values[attr] = PointerGetDatum(tmp); + *output_value = PointerGetDatum(tmp); } } break; @@ -923,8 +929,8 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, * We have some rows in the batch that pass the vectorized filters, so * we have to decompress the rest of the compressed columns. */ - const int num_compressed_columns = dcontext->num_compressed_columns; - for (int i = 0; i < num_compressed_columns; i++) + const int num_data_columns = dcontext->num_data_columns; + for (int i = 0; i < num_data_columns; i++) { CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; if (column_values->decompression_type == DT_Invalid) @@ -965,14 +971,14 @@ store_text_datum(CompressedColumnValues *column_values, int arrow_row) * Doesn't check the quals. */ static void -make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_compressed_columns) +make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_data_columns) { TupleTableSlot *decompressed_scan_slot = &batch_state->decompressed_scan_slot_data.base; Assert(batch_state->total_batch_rows > 0); Assert(batch_state->next_batch_row < batch_state->total_batch_rows); - for (int i = 0; i < num_compressed_columns; i++) + for (int i = 0; i < num_data_columns; i++) { CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; if (column_values->decompression_type == DT_Iterator) @@ -1035,7 +1041,7 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com else { /* A compressed column with default value, do nothing. */ - Assert(column_values->decompression_type == DT_Default); + Assert(column_values->decompression_type == DT_Scalar); } } @@ -1101,7 +1107,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc TupleTableSlot *decompressed_scan_slot = &batch_state->decompressed_scan_slot_data.base; const bool reverse = dcontext->reverse; - const int num_compressed_columns = dcontext->num_compressed_columns; + const int num_data_columns = dcontext->num_data_columns; for (; batch_state->next_batch_row < batch_state->total_batch_rows; batch_state->next_batch_row++) @@ -1116,7 +1122,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc * This row doesn't pass the vectorized quals. Advance the iterated * compressed columns if we have any. */ - for (int i = 0; i < num_compressed_columns; i++) + for (int i = 0; i < num_data_columns; i++) { CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; if (column_values->decompression_type == DT_Iterator) @@ -1131,7 +1137,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc continue; } - make_next_tuple(batch_state, arrow_row, num_compressed_columns); + make_next_tuple(batch_state, arrow_row, num_data_columns); if (!postgres_qual(dcontext, batch_state)) { @@ -1153,7 +1159,7 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc * row-by-row have also ended. */ Assert(batch_state->next_batch_row == batch_state->total_batch_rows); - for (int i = 0; i < num_compressed_columns; i++) + for (int i = 0; i < num_data_columns; i++) { CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; if (column_values->decompression_type == DT_Iterator) @@ -1191,8 +1197,8 @@ compressed_batch_save_first_tuple(DecompressContext *dcontext, DecompressBatchSt * vectorized decompression is disabled with sorted merge. */ #ifdef USE_ASSERT_CHECKING - const int num_compressed_columns = dcontext->num_compressed_columns; - for (int i = 0; i < num_compressed_columns; i++) + const int num_data_columns = dcontext->num_data_columns; + for (int i = 0; i < num_data_columns; i++) { CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; Assert(column_values->decompression_type != DT_Invalid); @@ -1202,7 +1208,7 @@ compressed_batch_save_first_tuple(DecompressContext *dcontext, DecompressBatchSt /* Make the first tuple and save it. */ Assert(batch_state->next_batch_row == 0); const uint16 arrow_row = dcontext->reverse ? batch_state->total_batch_rows - 1 : 0; - make_next_tuple(batch_state, arrow_row, dcontext->num_compressed_columns); + make_next_tuple(batch_state, arrow_row, dcontext->num_data_columns); ExecCopySlot(first_tuple_slot, &batch_state->decompressed_scan_slot_data.base); /* diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.h b/tsl/src/nodes/decompress_chunk/compressed_batch.h index 486f3e9c6..b4e93d1d8 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.h +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.h @@ -14,13 +14,23 @@ typedef struct ArrowArray ArrowArray; typedef enum { DT_ArrowTextDict = -4, + DT_ArrowText = -3, - DT_Default = -2, + + /* + * The decompressed value is already in the decompressed slot. This is used + * for segmentby and compressed columns with default value in batch. + */ + DT_Scalar = -2, + DT_Iterator = -1, + DT_Invalid = 0, + /* * Any positive number is also valid for the decompression type. It means - * arrow array of a fixed-size by-value type, with size given by the number. + * arrow array of a fixed-size by-value type, with size in bytes given by + * the number. */ } DecompressionType; @@ -93,6 +103,12 @@ typedef struct DecompressBatchState */ uint64 *restrict vector_qual_result; + /* + * This follows DecompressContext.compressed_chunk_columns, but does not + * include the trailing metadata columns, but only the leading data columns. + * These columns are compressed and segmentby columns, their total number is + * given by DecompressContext.num_data_columns. + */ CompressedColumnValues compressed_columns[FLEXIBLE_ARRAY_MEMBER]; } DecompressBatchState; diff --git a/tsl/src/nodes/decompress_chunk/decompress_context.h b/tsl/src/nodes/decompress_chunk/decompress_context.h index 0d084ace1..fd9d0855b 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_context.h +++ b/tsl/src/nodes/decompress_chunk/decompress_context.h @@ -48,9 +48,23 @@ typedef struct CompressionColumnDescription typedef struct DecompressContext { - CompressionColumnDescription *template_columns; - int num_total_columns; - int num_compressed_columns; + /* + * Note that this array contains only those columns that are decompressed + * (output_attno != 0), and the order is different from the compressed chunk + * tuple order: first go the actual data columns, and after that the metadata + * columns. + */ + CompressionColumnDescription *compressed_chunk_columns; + + /* + * This includes all decompressed columns (output_attno != 0), including the + * metadata columns. + */ + int num_columns_with_metadata; + + /* This excludes the metadata columns. */ + int num_data_columns; + List *vectorized_quals_constified; bool reverse; bool batch_sorted_merge; /* Merge append optimization enabled */ diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index f6959b656..c73b52aa4 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -219,13 +219,15 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) node->custom_ps = lappend(node->custom_ps, ExecInitNode(compressed_scan, estate, eflags)); /* - * Determine which columns we are going to decompress. Since in the hottest - * loop we work only with compressed columns, we'll put them in front of the - * array. So first, count how many compressed and not compressed columns - * we have. + * Count the actual data columns we have to decompress, skipping the + * metadata columns. We only need the metadata columns when initializing the + * compressed batch, so they are not saved in the compressed batch itself, + * it tracks only the data columns. We put the metadata columns to the end + * of the array to have the same column indexes in compressed batch state + * and in decompression context. */ - int num_compressed = 0; - int num_total = 0; + int num_data_columns = 0; + int num_columns_with_metadata = 0; ListCell *dest_cell; ListCell *is_segmentby_cell; @@ -242,22 +244,22 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) continue; } - if (output_attno > 0 && !lfirst_int(is_segmentby_cell)) + if (output_attno > 0) { /* - * Not a metadata column and not a segmentby column, hence a - * compressed one. + * Not a metadata column. */ - num_compressed++; + num_data_columns++; } - num_total++; + num_columns_with_metadata++; } - Assert(num_compressed <= num_total); - dcontext->num_compressed_columns = num_compressed; - dcontext->num_total_columns = num_total; - dcontext->template_columns = palloc0(sizeof(CompressionColumnDescription) * num_total); + Assert(num_data_columns <= num_columns_with_metadata); + dcontext->num_data_columns = num_data_columns; + dcontext->num_columns_with_metadata = num_columns_with_metadata; + dcontext->compressed_chunk_columns = + palloc0(sizeof(CompressionColumnDescription) * num_columns_with_metadata); dcontext->decompressed_slot = node->ss.ss_ScanTupleSlot; dcontext->ps = &node->ss.ps; @@ -268,7 +270,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) * separate indices for them. */ int current_compressed = 0; - int current_not_compressed = num_compressed; + int current_not_compressed = num_data_columns; for (int compressed_index = 0; compressed_index < list_length(chunk_state->decompression_map); compressed_index++) { @@ -316,20 +318,22 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) } } - if (column.type == COMPRESSED_COLUMN) + if (column.output_attno > 0) { - Assert(current_compressed < num_total); - dcontext->template_columns[current_compressed++] = column; + /* Data column. */ + Assert(current_compressed < num_columns_with_metadata); + dcontext->compressed_chunk_columns[current_compressed++] = column; } else { - Assert(current_not_compressed < num_total); - dcontext->template_columns[current_not_compressed++] = column; + /* Metadata column. */ + Assert(current_not_compressed < num_columns_with_metadata); + dcontext->compressed_chunk_columns[current_not_compressed++] = column; } } - Assert(current_compressed == num_compressed); - Assert(current_not_compressed == num_total); + Assert(current_compressed == num_data_columns); + Assert(current_not_compressed == num_columns_with_metadata); /* * Choose which batch queue we are going to use: heap for batch sorted @@ -338,7 +342,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) if (dcontext->batch_sorted_merge) { chunk_state->batch_queue = - batch_queue_heap_create(num_compressed, + batch_queue_heap_create(num_data_columns, chunk_state->sortinfo, dcontext->decompressed_slot->tts_tupleDescriptor, &BatchQueueFunctionsHeap); @@ -347,7 +351,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) else { chunk_state->batch_queue = - batch_queue_fifo_create(num_compressed, &BatchQueueFunctionsFifo); + batch_queue_fifo_create(num_data_columns, &BatchQueueFunctionsFifo); chunk_state->exec_methods.ExecCustomScan = decompress_chunk_exec_fifo; } diff --git a/tsl/src/nodes/vector_agg/exec.c b/tsl/src/nodes/vector_agg/exec.c index a212826ca..f08dba0bb 100644 --- a/tsl/src/nodes/vector_agg/exec.c +++ b/tsl/src/nodes/vector_agg/exec.c @@ -81,9 +81,9 @@ vector_agg_exec(CustomScanState *vector_agg_state) DecompressContext *dcontext = &decompress_state->decompress_context; CompressionColumnDescription *value_column_description = NULL; - for (int i = 0; i < dcontext->num_total_columns; i++) + for (int i = 0; i < dcontext->num_data_columns; i++) { - CompressionColumnDescription *current_column = &dcontext->template_columns[i]; + CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i]; if (current_column->output_attno == var->varattno) { value_column_description = current_column; @@ -132,7 +132,8 @@ vector_agg_exec(CustomScanState *vector_agg_state) Assert(dcontext->enable_bulk_decompression); Assert(value_column_description->bulk_decompression_supported); CompressedColumnValues *values = - &batch_state->compressed_columns[value_column_description - dcontext->template_columns]; + &batch_state->compressed_columns[value_column_description - + dcontext->compressed_chunk_columns]; Assert(values->decompression_type != DT_Invalid); arrow = values->arrow; }