mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-15 01:53:41 +08:00
Avoid projection more often in DecompressChunk node (#6859)
Currently its scan targetlist is always the uncompressed tuple, but in some cases we can make the scan targetlist the same as the required output targetlist, thus avoiding the projection.
This commit is contained in:
parent
30666055b9
commit
442e53608c
@ -162,7 +162,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
|
|||||||
CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i];
|
CompressionColumnDescription *column_description = &dcontext->compressed_chunk_columns[i];
|
||||||
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
|
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
|
||||||
column_values->arrow = NULL;
|
column_values->arrow = NULL;
|
||||||
const AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno);
|
const AttrNumber attr = AttrNumberGetAttrOffset(column_description->custom_scan_attno);
|
||||||
column_values->output_value = &compressed_batch_current_tuple(batch_state)->tts_values[attr];
|
column_values->output_value = &compressed_batch_current_tuple(batch_state)->tts_values[attr];
|
||||||
column_values->output_isnull = &compressed_batch_current_tuple(batch_state)->tts_isnull[attr];
|
column_values->output_isnull = &compressed_batch_current_tuple(batch_state)->tts_isnull[attr];
|
||||||
const int value_bytes = get_typlen(column_description->typid);
|
const int value_bytes = get_typlen(column_description->typid);
|
||||||
@ -179,9 +179,13 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
|
|||||||
*/
|
*/
|
||||||
column_values->decompression_type = DT_Scalar;
|
column_values->decompression_type = DT_Scalar;
|
||||||
|
|
||||||
*column_values->output_value =
|
/*
|
||||||
getmissingattr(dcontext->decompressed_slot->tts_tupleDescriptor,
|
* We might use a custom targetlist-based scan tuple which has no
|
||||||
column_description->output_attno,
|
* default values, so the default values are fetched from the
|
||||||
|
* uncompressed chunk tuple descriptor.
|
||||||
|
*/
|
||||||
|
*column_values->output_value = getmissingattr(dcontext->uncompressed_chunk_tdesc,
|
||||||
|
column_description->uncompressed_chunk_attno,
|
||||||
column_values->output_isnull);
|
column_values->output_isnull);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -399,11 +403,35 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat
|
|||||||
for (; column_index < dcontext->num_data_columns; column_index++)
|
for (; column_index < dcontext->num_data_columns; column_index++)
|
||||||
{
|
{
|
||||||
column_description = &dcontext->compressed_chunk_columns[column_index];
|
column_description = &dcontext->compressed_chunk_columns[column_index];
|
||||||
if (column_description->output_attno == var->varattno)
|
if (var->varno == INDEX_VAR)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Reference into custom scan tlist, happens when we are using a
|
||||||
|
* non-default custom scan tuple.
|
||||||
|
*/
|
||||||
|
if (column_description->custom_scan_attno == var->varattno)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Reference into uncompressed chunk tuple.
|
||||||
|
*
|
||||||
|
* Note that this is somewhat redundant, because this branch is
|
||||||
|
* taken when we do not use a custom scan tuple, and in this case
|
||||||
|
* the custom scan attno is the same as the uncompressed chunk attno,
|
||||||
|
* so the above branch would do as well. This difference might
|
||||||
|
* become relevant in the future, if we stop outputting the
|
||||||
|
* columns that are needed only for the vectorized quals.
|
||||||
|
*/
|
||||||
|
if (column_description->uncompressed_chunk_attno == var->varattno)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Ensure(column_index < dcontext->num_data_columns,
|
Ensure(column_index < dcontext->num_data_columns,
|
||||||
"decompressed column %d not found in batch",
|
"decompressed column %d not found in batch",
|
||||||
var->varattno);
|
var->varattno);
|
||||||
@ -734,8 +762,8 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba
|
|||||||
batch_state->per_batch_context = create_per_batch_mctx(dcontext);
|
batch_state->per_batch_context = create_per_batch_mctx(dcontext);
|
||||||
Assert(batch_state->per_batch_context != NULL);
|
Assert(batch_state->per_batch_context != NULL);
|
||||||
|
|
||||||
/* Get a reference to the output TupleTableSlot */
|
/* Get a reference to the decompressed scan TupleTableSlot */
|
||||||
TupleTableSlot *decompressed_slot = dcontext->decompressed_slot;
|
TupleTableSlot *decompressed_slot = dcontext->custom_scan_slot;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This code follows Postgres' MakeTupleTableSlot().
|
* This code follows Postgres' MakeTupleTableSlot().
|
||||||
@ -832,7 +860,7 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
|
|||||||
Assert(i < dcontext->num_data_columns);
|
Assert(i < dcontext->num_data_columns);
|
||||||
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
|
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
|
||||||
column_values->decompression_type = DT_Scalar;
|
column_values->decompression_type = DT_Scalar;
|
||||||
AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno);
|
AttrNumber attr = AttrNumberGetAttrOffset(column_description->custom_scan_attno);
|
||||||
Datum *output_value = &decompressed_tuple->tts_values[attr];
|
Datum *output_value = &decompressed_tuple->tts_values[attr];
|
||||||
bool *output_isnull = &decompressed_tuple->tts_isnull[attr];
|
bool *output_isnull = &decompressed_tuple->tts_isnull[attr];
|
||||||
column_values->output_value = output_value;
|
column_values->output_value = output_value;
|
||||||
|
@ -46,11 +46,6 @@ typedef struct CompressionInfo
|
|||||||
|
|
||||||
} CompressionInfo;
|
} CompressionInfo;
|
||||||
|
|
||||||
typedef struct ColumnCompressionInfo
|
|
||||||
{
|
|
||||||
bool bulk_decompression_possible;
|
|
||||||
} DecompressChunkColumnCompression;
|
|
||||||
|
|
||||||
typedef struct DecompressChunkPath
|
typedef struct DecompressChunkPath
|
||||||
{
|
{
|
||||||
CustomPath custom_path;
|
CustomPath custom_path;
|
||||||
|
@ -31,12 +31,18 @@ typedef struct CompressionColumnDescription
|
|||||||
bool by_value;
|
bool by_value;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Attno of the decompressed column in the output of DecompressChunk node.
|
* Attno of the decompressed column in the scan tuple of DecompressChunk node.
|
||||||
* Negative values are special columns that do not have a representation in
|
* Negative values are special columns that do not have a representation in
|
||||||
* the decompressed chunk, but are still used for decompression. They should
|
* the decompressed chunk, but are still used for decompression. The `type`
|
||||||
* have the respective `type` field.
|
* field is set accordingly for these columns.
|
||||||
*/
|
*/
|
||||||
AttrNumber output_attno;
|
AttrNumber custom_scan_attno;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Attno of this column in the uncompressed chunks. We use it to fetch the
|
||||||
|
* default value from the uncompressed chunk tuple descriptor.
|
||||||
|
*/
|
||||||
|
AttrNumber uncompressed_chunk_attno;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Attno of the compressed column in the input compressed chunk scan.
|
* Attno of the compressed column in the input compressed chunk scan.
|
||||||
@ -76,7 +82,15 @@ typedef struct DecompressContext
|
|||||||
*/
|
*/
|
||||||
MemoryContext bulk_decompression_context;
|
MemoryContext bulk_decompression_context;
|
||||||
|
|
||||||
TupleTableSlot *decompressed_slot;
|
TupleTableSlot *custom_scan_slot;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The scan tuple descriptor might be different from the uncompressed chunk
|
||||||
|
* one, and it doesn't have the default column values in that case, so we
|
||||||
|
* have to fetch the default values from the uncompressed chunk tuple
|
||||||
|
* descriptor which we store here.
|
||||||
|
*/
|
||||||
|
TupleDesc uncompressed_chunk_tdesc;
|
||||||
|
|
||||||
PlanState *ps; /* Set for filtering and instrumentation */
|
PlanState *ps; /* Set for filtering and instrumentation */
|
||||||
|
|
||||||
|
@ -260,10 +260,11 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
|
|||||||
dcontext->num_columns_with_metadata = num_columns_with_metadata;
|
dcontext->num_columns_with_metadata = num_columns_with_metadata;
|
||||||
dcontext->compressed_chunk_columns =
|
dcontext->compressed_chunk_columns =
|
||||||
palloc0(sizeof(CompressionColumnDescription) * num_columns_with_metadata);
|
palloc0(sizeof(CompressionColumnDescription) * num_columns_with_metadata);
|
||||||
dcontext->decompressed_slot = node->ss.ss_ScanTupleSlot;
|
dcontext->custom_scan_slot = node->ss.ss_ScanTupleSlot;
|
||||||
|
dcontext->uncompressed_chunk_tdesc = RelationGetDescr(node->ss.ss_currentRelation);
|
||||||
dcontext->ps = &node->ss.ps;
|
dcontext->ps = &node->ss.ps;
|
||||||
|
|
||||||
TupleDesc desc = dcontext->decompressed_slot->tts_tupleDescriptor;
|
TupleDesc desc = dcontext->custom_scan_slot->tts_tupleDescriptor;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Compressed columns go in front, and the rest go to the back, so we have
|
* Compressed columns go in front, and the rest go to the back, so we have
|
||||||
@ -276,22 +277,22 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
|
|||||||
{
|
{
|
||||||
CompressionColumnDescription column = {
|
CompressionColumnDescription column = {
|
||||||
.compressed_scan_attno = AttrOffsetGetAttrNumber(compressed_index),
|
.compressed_scan_attno = AttrOffsetGetAttrNumber(compressed_index),
|
||||||
.output_attno = list_nth_int(chunk_state->decompression_map, compressed_index),
|
.custom_scan_attno = list_nth_int(chunk_state->decompression_map, compressed_index),
|
||||||
.bulk_decompression_supported =
|
.bulk_decompression_supported =
|
||||||
list_nth_int(chunk_state->bulk_decompression_column, compressed_index)
|
list_nth_int(chunk_state->bulk_decompression_column, compressed_index)
|
||||||
};
|
};
|
||||||
|
|
||||||
if (column.output_attno == 0)
|
if (column.custom_scan_attno == 0)
|
||||||
{
|
{
|
||||||
/* We are asked not to decompress this column, skip it. */
|
/* We are asked not to decompress this column, skip it. */
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (column.output_attno > 0)
|
if (column.custom_scan_attno > 0)
|
||||||
{
|
{
|
||||||
/* normal column that is also present in decompressed chunk */
|
/* normal column that is also present in decompressed chunk */
|
||||||
Form_pg_attribute attribute =
|
Form_pg_attribute attribute =
|
||||||
TupleDescAttr(desc, AttrNumberGetAttrOffset(column.output_attno));
|
TupleDescAttr(desc, AttrNumberGetAttrOffset(column.custom_scan_attno));
|
||||||
|
|
||||||
column.typid = attribute->atttypid;
|
column.typid = attribute->atttypid;
|
||||||
get_typlenbyval(column.typid, &column.value_bytes, &column.by_value);
|
get_typlenbyval(column.typid, &column.value_bytes, &column.by_value);
|
||||||
@ -300,11 +301,26 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
|
|||||||
column.type = SEGMENTBY_COLUMN;
|
column.type = SEGMENTBY_COLUMN;
|
||||||
else
|
else
|
||||||
column.type = COMPRESSED_COLUMN;
|
column.type = COMPRESSED_COLUMN;
|
||||||
|
|
||||||
|
if (cscan->custom_scan_tlist == NIL)
|
||||||
|
{
|
||||||
|
column.uncompressed_chunk_attno = column.custom_scan_attno;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Var *var =
|
||||||
|
castNode(Var,
|
||||||
|
castNode(TargetEntry,
|
||||||
|
list_nth(cscan->custom_scan_tlist,
|
||||||
|
AttrNumberGetAttrOffset(column.custom_scan_attno)))
|
||||||
|
->expr);
|
||||||
|
column.uncompressed_chunk_attno = var->varattno;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* metadata columns */
|
/* metadata columns */
|
||||||
switch (column.output_attno)
|
switch (column.custom_scan_attno)
|
||||||
{
|
{
|
||||||
case DECOMPRESS_CHUNK_COUNT_ID:
|
case DECOMPRESS_CHUNK_COUNT_ID:
|
||||||
column.type = COUNT_COLUMN;
|
column.type = COUNT_COLUMN;
|
||||||
@ -313,15 +329,15 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
|
|||||||
column.type = SEQUENCE_NUM_COLUMN;
|
column.type = SEQUENCE_NUM_COLUMN;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
elog(ERROR, "Invalid column attno \"%d\"", column.output_attno);
|
elog(ERROR, "Invalid column attno \"%d\"", column.custom_scan_attno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (column.output_attno > 0)
|
if (column.custom_scan_attno > 0)
|
||||||
{
|
{
|
||||||
/* Data column. */
|
/* Data column. */
|
||||||
Assert(current_compressed < num_columns_with_metadata);
|
Assert(current_compressed < num_data_columns);
|
||||||
dcontext->compressed_chunk_columns[current_compressed++] = column;
|
dcontext->compressed_chunk_columns[current_compressed++] = column;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -344,7 +360,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
|
|||||||
chunk_state->batch_queue =
|
chunk_state->batch_queue =
|
||||||
batch_queue_heap_create(num_data_columns,
|
batch_queue_heap_create(num_data_columns,
|
||||||
chunk_state->sortinfo,
|
chunk_state->sortinfo,
|
||||||
dcontext->decompressed_slot->tts_tupleDescriptor,
|
dcontext->custom_scan_slot->tts_tupleDescriptor,
|
||||||
&BatchQueueFunctionsHeap);
|
&BatchQueueFunctionsHeap);
|
||||||
chunk_state->exec_methods.ExecCustomScan = decompress_chunk_exec_heap;
|
chunk_state->exec_methods.ExecCustomScan = decompress_chunk_exec_heap;
|
||||||
}
|
}
|
||||||
|
@ -64,16 +64,66 @@ check_for_system_columns(Bitmapset *attrs_used)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
bool bulk_decompression_possible;
|
||||||
|
int custom_scan_attno;
|
||||||
|
} UncompressedColumnInfo;
|
||||||
|
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
/* Can be negative if it's a metadata column, zero if not decompressed. */
|
||||||
|
int uncompressed_chunk_attno;
|
||||||
|
bool bulk_decompression_possible;
|
||||||
|
bool is_segmentby;
|
||||||
|
} CompressedColumnInfo;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Scratch space for mapping out the decompressed columns.
|
* Scratch space for mapping out the decompressed columns.
|
||||||
*/
|
*/
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
|
PlannerInfo *root;
|
||||||
|
|
||||||
|
DecompressChunkPath *decompress_path;
|
||||||
|
|
||||||
|
Bitmapset *uncompressed_attrs_needed;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* decompression_map maps targetlist entries of the compressed scan to tuple
|
* If we produce at least some columns that support bulk decompression.
|
||||||
* attribute number of the uncompressed chunk. Negative values are special
|
*/
|
||||||
* columns in the compressed scan that do not have a representation in the
|
bool have_bulk_decompression_columns;
|
||||||
* uncompressed chunk, but are still used for decompression.
|
|
||||||
|
/*
|
||||||
|
* Maps the uncompressed chunk attno to the respective column compression
|
||||||
|
* info. This lives only during planning so that we can understand on which
|
||||||
|
* columns we can apply vectorized quals, and which uncompressed attno goes
|
||||||
|
* to which custom scan attno (it's not the same if we're using a custom
|
||||||
|
* scan targetlist).
|
||||||
|
*/
|
||||||
|
UncompressedColumnInfo *uncompressed_attno_info;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Maps the compressed chunk attno to the respective column compression info.
|
||||||
|
*/
|
||||||
|
CompressedColumnInfo *compressed_attno_info;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We might use a custom scan targetlist for DecompressChunk node if it
|
||||||
|
* allows us to avoid projection.
|
||||||
|
*/
|
||||||
|
List *custom_scan_targetlist;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Next, we have basically the same data as the above, but expressed as
|
||||||
|
* several Lists, to allow passing them through the custom plan settings.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* decompression_map maps targetlist entries of the compressed scan to
|
||||||
|
* custom scan attnos. Negative values are metadata columns in the compressed
|
||||||
|
* scan that do not have a representation in the uncompressed chunk, but are
|
||||||
|
* still used for decompression.
|
||||||
*/
|
*/
|
||||||
List *decompression_map;
|
List *decompression_map;
|
||||||
|
|
||||||
@ -93,22 +143,88 @@ typedef struct
|
|||||||
*/
|
*/
|
||||||
List *bulk_decompression_column;
|
List *bulk_decompression_column;
|
||||||
|
|
||||||
/*
|
|
||||||
* If we produce at least some columns that support bulk decompression.
|
|
||||||
*/
|
|
||||||
bool have_bulk_decompression_columns;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Maps the uncompressed chunk attno to the respective column compression
|
|
||||||
* info. This lives only during planning so that we can understand on which
|
|
||||||
* columns we can apply vectorized quals.
|
|
||||||
*/
|
|
||||||
DecompressChunkColumnCompression *uncompressed_chunk_attno_to_compression_info;
|
|
||||||
} DecompressionMapContext;
|
} DecompressionMapContext;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Given the scan targetlist and the bitmapset of the needed columns, determine
|
* Try to make the custom scan targetlist that follows the order of the
|
||||||
* which scan columns become which decompressed columns (fill decompression_map).
|
* pathtarget. This would allow us to avoid a projection from scan tuple to
|
||||||
|
* output tuple.
|
||||||
|
* Returns NIL if it's not possible, e.g. if there are whole-row variables or
|
||||||
|
* variables that are used for quals but not for output.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
follow_uncompressed_output_tlist(const DecompressionMapContext *context)
|
||||||
|
{
|
||||||
|
List *result = NIL;
|
||||||
|
Bitmapset *uncompressed_attrs_found = NULL;
|
||||||
|
const CompressionInfo *info = context->decompress_path->info;
|
||||||
|
const PathTarget *pathtarget = context->decompress_path->custom_path.path.pathtarget;
|
||||||
|
int custom_scan_attno = 1;
|
||||||
|
for (int i = 0; i < list_length(pathtarget->exprs); i++)
|
||||||
|
{
|
||||||
|
Expr *expr = list_nth(pathtarget->exprs, i);
|
||||||
|
if (!IsA(expr, Var))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The pathtarget has some non-var expressions, so we won't be able
|
||||||
|
* to build a matching decompressed scan targetlist.
|
||||||
|
*/
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
Var *var = castNode(Var, expr);
|
||||||
|
|
||||||
|
/* This should produce uncompressed chunk columns. */
|
||||||
|
Assert((Index) var->varno == info->chunk_rel->relid);
|
||||||
|
|
||||||
|
const int uncompressed_chunk_attno = var->varattno;
|
||||||
|
if (uncompressed_chunk_attno <= 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The pathtarget has some special vars so we won't be able to
|
||||||
|
* build a matching decompressed scan targetlist.
|
||||||
|
*/
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *attname = get_attname(info->chunk_rte->relid,
|
||||||
|
uncompressed_chunk_attno,
|
||||||
|
/* missing_ok = */ false);
|
||||||
|
|
||||||
|
TargetEntry *target_entry = makeTargetEntry((Expr *) copyObject(var),
|
||||||
|
/* resno = */ custom_scan_attno,
|
||||||
|
/* resname = */ attname,
|
||||||
|
/* resjunk = */ false);
|
||||||
|
target_entry->ressortgroupref =
|
||||||
|
pathtarget->sortgrouprefs ? pathtarget->sortgrouprefs[i] : 0;
|
||||||
|
result = lappend(result, target_entry);
|
||||||
|
|
||||||
|
uncompressed_attrs_found =
|
||||||
|
bms_add_member(uncompressed_attrs_found,
|
||||||
|
uncompressed_chunk_attno - FirstLowInvalidHeapAttributeNumber);
|
||||||
|
|
||||||
|
custom_scan_attno++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!bms_equal(uncompressed_attrs_found, context->uncompressed_attrs_needed))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* There are some variables that are not in the pathtarget that are used
|
||||||
|
* for quals. We still have to have them in the scan tuple in this case.
|
||||||
|
* Note that while we could possibly relax this at execution time for
|
||||||
|
* vectorized quals, the requirement that the qual var be found in the
|
||||||
|
* scan targetlist is a Postgres one.
|
||||||
|
*/
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Given the compressed output targetlist and the bitmapset of the needed
|
||||||
|
* columns, determine which compressed chunk column become which uncompressed
|
||||||
|
* chunk column.
|
||||||
*
|
*
|
||||||
* Note that the uncompressed_attrs_needed bitmap is offset by the
|
* Note that the uncompressed_attrs_needed bitmap is offset by the
|
||||||
* FirstLowInvalidHeapAttributeNumber, similar to RelOptInfo.attr_needed. This
|
* FirstLowInvalidHeapAttributeNumber, similar to RelOptInfo.attr_needed. This
|
||||||
@ -116,10 +232,10 @@ typedef struct
|
|||||||
* attnos.
|
* attnos.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
build_decompression_map(PlannerInfo *root, DecompressionMapContext *context,
|
build_decompression_map(DecompressionMapContext *context, List *compressed_scan_tlist)
|
||||||
DecompressChunkPath *path, List *compressed_scan_tlist,
|
|
||||||
Bitmapset *uncompressed_attrs_needed)
|
|
||||||
{
|
{
|
||||||
|
DecompressChunkPath *path = context->decompress_path;
|
||||||
|
CompressionInfo *info = path->info;
|
||||||
/*
|
/*
|
||||||
* Track which normal and metadata columns we were able to find in the
|
* Track which normal and metadata columns we were able to find in the
|
||||||
* targetlist.
|
* targetlist.
|
||||||
@ -130,12 +246,12 @@ build_decompression_map(PlannerInfo *root, DecompressionMapContext *context,
|
|||||||
Bitmapset *selectedCols = NULL;
|
Bitmapset *selectedCols = NULL;
|
||||||
|
|
||||||
#if PG16_LT
|
#if PG16_LT
|
||||||
selectedCols = path->info->ht_rte->selectedCols;
|
selectedCols = info->ht_rte->selectedCols;
|
||||||
#else
|
#else
|
||||||
if (path->info->ht_rte->perminfoindex > 0)
|
if (info->ht_rte->perminfoindex > 0)
|
||||||
{
|
{
|
||||||
RTEPermissionInfo *perminfo =
|
RTEPermissionInfo *perminfo =
|
||||||
getRTEPermissionInfo(root->parse->rteperminfos, path->info->ht_rte);
|
getRTEPermissionInfo(context->root->parse->rteperminfos, info->ht_rte);
|
||||||
selectedCols = perminfo->selectedCols;
|
selectedCols = perminfo->selectedCols;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@ -152,7 +268,7 @@ build_decompression_map(PlannerInfo *root, DecompressionMapContext *context,
|
|||||||
* be added at decompression time. Always mark it as found.
|
* be added at decompression time. Always mark it as found.
|
||||||
*/
|
*/
|
||||||
if (bms_is_member(TableOidAttributeNumber - FirstLowInvalidHeapAttributeNumber,
|
if (bms_is_member(TableOidAttributeNumber - FirstLowInvalidHeapAttributeNumber,
|
||||||
uncompressed_attrs_needed))
|
context->uncompressed_attrs_needed))
|
||||||
{
|
{
|
||||||
uncompressed_attrs_found =
|
uncompressed_attrs_found =
|
||||||
bms_add_member(uncompressed_attrs_found,
|
bms_add_member(uncompressed_attrs_found,
|
||||||
@ -161,9 +277,11 @@ build_decompression_map(PlannerInfo *root, DecompressionMapContext *context,
|
|||||||
|
|
||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
|
|
||||||
context->uncompressed_chunk_attno_to_compression_info =
|
context->uncompressed_attno_info =
|
||||||
palloc0(sizeof(*context->uncompressed_chunk_attno_to_compression_info) *
|
palloc0(sizeof(*context->uncompressed_attno_info) * (info->chunk_rel->max_attr + 1));
|
||||||
(path->info->chunk_rel->max_attr + 1));
|
|
||||||
|
context->compressed_attno_info =
|
||||||
|
palloc0(sizeof(*context->compressed_attno_info) * (info->compressed_rel->max_attr + 1));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Go over the scan targetlist and determine to which output column each
|
* Go over the scan targetlist and determine to which output column each
|
||||||
@ -180,10 +298,10 @@ build_decompression_map(PlannerInfo *root, DecompressionMapContext *context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
Var *var = castNode(Var, target->expr);
|
Var *var = castNode(Var, target->expr);
|
||||||
Assert((Index) var->varno == path->info->compressed_rel->relid);
|
Assert((Index) var->varno == info->compressed_rel->relid);
|
||||||
AttrNumber compressed_attno = var->varattno;
|
AttrNumber compressed_chunk_attno = var->varattno;
|
||||||
|
|
||||||
if (compressed_attno == InvalidAttrNumber)
|
if (compressed_chunk_attno == InvalidAttrNumber)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We shouldn't have whole-row vars in the compressed scan tlist,
|
* We shouldn't have whole-row vars in the compressed scan tlist,
|
||||||
@ -194,36 +312,37 @@ build_decompression_map(PlannerInfo *root, DecompressionMapContext *context,
|
|||||||
elog(ERROR, "compressed scan targetlist must not have whole-row vars");
|
elog(ERROR, "compressed scan targetlist must not have whole-row vars");
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *column_name = get_attname(path->info->compressed_rte->relid,
|
const char *column_name = get_attname(info->compressed_rte->relid,
|
||||||
compressed_attno,
|
compressed_chunk_attno,
|
||||||
/* missing_ok = */ false);
|
/* missing_ok = */ false);
|
||||||
AttrNumber uncompressed_attno = get_attnum(path->info->chunk_rte->relid, column_name);
|
AttrNumber uncompressed_chunk_attno = get_attnum(info->chunk_rte->relid, column_name);
|
||||||
|
|
||||||
AttrNumber destination_attno = 0;
|
AttrNumber destination_attno = 0;
|
||||||
if (uncompressed_attno != InvalidAttrNumber)
|
if (uncompressed_chunk_attno != InvalidAttrNumber)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Normal column, not a metadata column.
|
* Normal column, not a metadata column.
|
||||||
*/
|
*/
|
||||||
Assert(uncompressed_attno != InvalidAttrNumber);
|
Assert(uncompressed_chunk_attno != InvalidAttrNumber);
|
||||||
|
|
||||||
if (bms_is_member(0 - FirstLowInvalidHeapAttributeNumber, uncompressed_attrs_needed))
|
if (bms_is_member(0 - FirstLowInvalidHeapAttributeNumber,
|
||||||
|
context->uncompressed_attrs_needed))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* attno = 0 means whole-row var. Output all the columns.
|
* attno = 0 means whole-row var. Output all the columns.
|
||||||
*/
|
*/
|
||||||
destination_attno = uncompressed_attno;
|
destination_attno = uncompressed_chunk_attno;
|
||||||
uncompressed_attrs_found =
|
uncompressed_attrs_found =
|
||||||
bms_add_member(uncompressed_attrs_found,
|
bms_add_member(uncompressed_attrs_found,
|
||||||
uncompressed_attno - FirstLowInvalidHeapAttributeNumber);
|
uncompressed_chunk_attno - FirstLowInvalidHeapAttributeNumber);
|
||||||
}
|
}
|
||||||
else if (bms_is_member(uncompressed_attno - FirstLowInvalidHeapAttributeNumber,
|
else if (bms_is_member(uncompressed_chunk_attno - FirstLowInvalidHeapAttributeNumber,
|
||||||
uncompressed_attrs_needed))
|
context->uncompressed_attrs_needed))
|
||||||
{
|
{
|
||||||
destination_attno = uncompressed_attno;
|
destination_attno = uncompressed_chunk_attno;
|
||||||
uncompressed_attrs_found =
|
uncompressed_attrs_found =
|
||||||
bms_add_member(uncompressed_attrs_found,
|
bms_add_member(uncompressed_attrs_found,
|
||||||
uncompressed_attno - FirstLowInvalidHeapAttributeNumber);
|
uncompressed_chunk_attno - FirstLowInvalidHeapAttributeNumber);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -254,33 +373,35 @@ build_decompression_map(PlannerInfo *root, DecompressionMapContext *context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool is_segment = ts_array_is_member(path->info->settings->fd.segmentby, column_name);
|
const bool is_segment = ts_array_is_member(info->settings->fd.segmentby, column_name);
|
||||||
|
|
||||||
context->decompression_map = lappend_int(context->decompression_map, destination_attno);
|
|
||||||
context->is_segmentby_column = lappend_int(context->is_segmentby_column, is_segment);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Determine if we can use bulk decompression for this column.
|
* Determine if we can use bulk decompression for this column.
|
||||||
*/
|
*/
|
||||||
Oid typoid = get_atttype(path->info->chunk_rte->relid, uncompressed_attno);
|
Oid typoid = get_atttype(info->chunk_rte->relid, uncompressed_chunk_attno);
|
||||||
const bool bulk_decompression_possible =
|
const bool bulk_decompression_possible =
|
||||||
!is_segment && destination_attno > 0 &&
|
!is_segment && destination_attno > 0 &&
|
||||||
tsl_get_decompress_all_function(compression_get_default_algorithm(typoid), typoid) !=
|
tsl_get_decompress_all_function(compression_get_default_algorithm(typoid), typoid) !=
|
||||||
NULL;
|
NULL;
|
||||||
context->have_bulk_decompression_columns |= bulk_decompression_possible;
|
context->have_bulk_decompression_columns |= bulk_decompression_possible;
|
||||||
context->bulk_decompression_column =
|
|
||||||
lappend_int(context->bulk_decompression_column, bulk_decompression_possible);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Save information about decompressed columns in uncompressed chunk
|
* Save information about decompressed columns in uncompressed chunk
|
||||||
* for planning of vectorized filters.
|
* for planning of vectorized filters.
|
||||||
*/
|
*/
|
||||||
if (destination_attno > 0)
|
if (uncompressed_chunk_attno != InvalidAttrNumber)
|
||||||
{
|
{
|
||||||
context->uncompressed_chunk_attno_to_compression_info[destination_attno] =
|
context->uncompressed_attno_info[uncompressed_chunk_attno] = (UncompressedColumnInfo){
|
||||||
(DecompressChunkColumnCompression){ .bulk_decompression_possible =
|
.bulk_decompression_possible = bulk_decompression_possible,
|
||||||
bulk_decompression_possible };
|
.custom_scan_attno = InvalidAttrNumber,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
context->compressed_attno_info[compressed_chunk_attno] = (CompressedColumnInfo){
|
||||||
|
.bulk_decompression_possible = bulk_decompression_possible,
|
||||||
|
.uncompressed_chunk_attno = destination_attno,
|
||||||
|
.is_segmentby = is_segment,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -289,17 +410,17 @@ build_decompression_map(PlannerInfo *root, DecompressionMapContext *context,
|
|||||||
* skip attno 0 in this check.
|
* skip attno 0 in this check.
|
||||||
*/
|
*/
|
||||||
Bitmapset *attrs_not_found =
|
Bitmapset *attrs_not_found =
|
||||||
bms_difference(uncompressed_attrs_needed, uncompressed_attrs_found);
|
bms_difference(context->uncompressed_attrs_needed, uncompressed_attrs_found);
|
||||||
int bit = bms_next_member(attrs_not_found, 0 - FirstLowInvalidHeapAttributeNumber);
|
int bit = bms_next_member(attrs_not_found, 0 - FirstLowInvalidHeapAttributeNumber);
|
||||||
if (bit >= 0)
|
if (bit >= 0)
|
||||||
{
|
{
|
||||||
elog(ERROR,
|
elog(ERROR,
|
||||||
"column '%s' (%d) not found in the targetlist for compressed chunk '%s'",
|
"column '%s' (%d) not found in the targetlist for compressed chunk '%s'",
|
||||||
get_attname(path->info->chunk_rte->relid,
|
get_attname(info->chunk_rte->relid,
|
||||||
bit + FirstLowInvalidHeapAttributeNumber,
|
bit + FirstLowInvalidHeapAttributeNumber,
|
||||||
/* missing_ok = */ true),
|
/* missing_ok = */ true),
|
||||||
bit + FirstLowInvalidHeapAttributeNumber,
|
bit + FirstLowInvalidHeapAttributeNumber,
|
||||||
get_rel_name(path->info->compressed_rte->relid));
|
get_rel_name(info->compressed_rte->relid));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (missing_count)
|
if (missing_count)
|
||||||
@ -311,6 +432,85 @@ build_decompression_map(PlannerInfo *root, DecompressionMapContext *context,
|
|||||||
{
|
{
|
||||||
elog(ERROR, "the sequence column was not found in the compressed scan targetlist");
|
elog(ERROR, "the sequence column was not found in the compressed scan targetlist");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If possible, try to make the custom scan targetlist same as the required
|
||||||
|
* output targetlist, so that we can avoid a projection there.
|
||||||
|
*/
|
||||||
|
context->custom_scan_targetlist = follow_uncompressed_output_tlist(context);
|
||||||
|
if (context->custom_scan_targetlist != NIL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The decompression will produce a custom scan tuple, set the custom
|
||||||
|
* scan attnos accordingly.
|
||||||
|
*/
|
||||||
|
int custom_scan_attno = 1;
|
||||||
|
foreach (lc, context->custom_scan_targetlist)
|
||||||
|
{
|
||||||
|
const int uncompressed_chunk_attno =
|
||||||
|
castNode(Var, castNode(TargetEntry, lfirst(lc))->expr)->varattno;
|
||||||
|
context->uncompressed_attno_info[uncompressed_chunk_attno].custom_scan_attno =
|
||||||
|
custom_scan_attno;
|
||||||
|
custom_scan_attno++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The decompression will produce the uncompressed chunk tuple, set the
|
||||||
|
* custom scan attnos accordingly.
|
||||||
|
* Note that we might have dropped columns here, but we can set these
|
||||||
|
* attnos for them just as well, they won't be decompressed anyway
|
||||||
|
* because they are not in the compressed scan output.
|
||||||
|
*/
|
||||||
|
for (int i = 1; i <= info->chunk_rel->max_attr; i++)
|
||||||
|
{
|
||||||
|
UncompressedColumnInfo *uncompressed_info = &context->uncompressed_attno_info[i];
|
||||||
|
uncompressed_info->custom_scan_attno = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Finally, we have to convert the decompression information we've build
|
||||||
|
* into several lists so that it can be passed through the custom path
|
||||||
|
* settings.
|
||||||
|
*/
|
||||||
|
foreach (lc, compressed_scan_tlist)
|
||||||
|
{
|
||||||
|
TargetEntry *target = (TargetEntry *) lfirst(lc);
|
||||||
|
Var *var = castNode(Var, target->expr);
|
||||||
|
Assert((Index) var->varno == info->compressed_rel->relid);
|
||||||
|
const AttrNumber compressed_chunk_attno = var->varattno;
|
||||||
|
Assert(compressed_chunk_attno != InvalidAttrNumber);
|
||||||
|
CompressedColumnInfo *compressed_info =
|
||||||
|
&context->compressed_attno_info[compressed_chunk_attno];
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Note that the decompressed custom scan targetlist might follow
|
||||||
|
* neither its output targetlist (when we need more columns for filters)
|
||||||
|
* nor the uncompressed chunk tuple. So here we have to do this
|
||||||
|
* additional conversion.
|
||||||
|
*/
|
||||||
|
int compressed_column_destination;
|
||||||
|
if (compressed_info->uncompressed_chunk_attno <= 0)
|
||||||
|
{
|
||||||
|
compressed_column_destination = compressed_info->uncompressed_chunk_attno;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UncompressedColumnInfo *uncompressed_info =
|
||||||
|
&context->uncompressed_attno_info[compressed_info->uncompressed_chunk_attno];
|
||||||
|
compressed_column_destination = uncompressed_info->custom_scan_attno;
|
||||||
|
}
|
||||||
|
|
||||||
|
context->decompression_map =
|
||||||
|
lappend_int(context->decompression_map, compressed_column_destination);
|
||||||
|
context->is_segmentby_column =
|
||||||
|
lappend_int(context->is_segmentby_column, compressed_info->is_segmentby);
|
||||||
|
context->bulk_decompression_column =
|
||||||
|
lappend_int(context->bulk_decompression_column,
|
||||||
|
compressed_info->bulk_decompression_possible);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* replace vars that reference the compressed table with ones that reference the
|
/* replace vars that reference the compressed table with ones that reference the
|
||||||
@ -602,8 +802,7 @@ make_vectorized_qual(DecompressionMapContext *context, DecompressChunkPath *path
|
|||||||
* ExecQual is performed before ExecProject and operates on the decompressed
|
* ExecQual is performed before ExecProject and operates on the decompressed
|
||||||
* scan slot, so the qual attnos are the uncompressed chunk attnos.
|
* scan slot, so the qual attnos are the uncompressed chunk attnos.
|
||||||
*/
|
*/
|
||||||
if (!context->uncompressed_chunk_attno_to_compression_info[var->varattno]
|
if (!context->uncompressed_attno_info[var->varattno].bulk_decompression_possible)
|
||||||
.bulk_decompression_possible)
|
|
||||||
{
|
{
|
||||||
/* This column doesn't support bulk decompression. */
|
/* This column doesn't support bulk decompression. */
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -763,9 +962,6 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
|
|||||||
decompress_plan->methods = &decompress_chunk_plan_methods;
|
decompress_plan->methods = &decompress_chunk_plan_methods;
|
||||||
decompress_plan->scan.scanrelid = dcpath->info->chunk_rel->relid;
|
decompress_plan->scan.scanrelid = dcpath->info->chunk_rel->relid;
|
||||||
|
|
||||||
/* output target list */
|
|
||||||
decompress_plan->scan.plan.targetlist = output_targetlist;
|
|
||||||
|
|
||||||
if (IsA(compressed_path, IndexPath))
|
if (IsA(compressed_path, IndexPath))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
@ -864,14 +1060,12 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
|
|||||||
/*
|
/*
|
||||||
* Determine which compressed column goes to which output column.
|
* Determine which compressed column goes to which output column.
|
||||||
*/
|
*/
|
||||||
DecompressionMapContext context = { 0 };
|
DecompressionMapContext context = { .root = root,
|
||||||
build_decompression_map(root,
|
.decompress_path = dcpath,
|
||||||
&context,
|
.uncompressed_attrs_needed = uncompressed_attrs_needed };
|
||||||
dcpath,
|
build_decompression_map(&context, compressed_scan->plan.targetlist);
|
||||||
compressed_scan->plan.targetlist,
|
|
||||||
uncompressed_attrs_needed);
|
|
||||||
|
|
||||||
/* Build heap sort info for sorted_merge_append */
|
/* Build heap sort info for batch sorted merge. */
|
||||||
List *sort_options = NIL;
|
List *sort_options = NIL;
|
||||||
|
|
||||||
if (dcpath->batch_sorted_merge)
|
if (dcpath->batch_sorted_merge)
|
||||||
@ -930,13 +1124,17 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* We found a Var equivalence member that belongs to the
|
* We found a Var equivalence member that belongs to the
|
||||||
* decompressed relation. We can use its varattno directly for
|
* decompressed relation. We have to convert its varattno which
|
||||||
* the comparison operator, because it operates on the
|
* is the varattno of the uncompressed chunk tuple, to the
|
||||||
* decompressed scan tuple.
|
* decompressed scan tuple varattno.
|
||||||
*/
|
*/
|
||||||
Var *var = castNode(Var, em->em_expr);
|
Var *var = castNode(Var, em->em_expr);
|
||||||
Assert((Index) var->varno == (Index) em_relid);
|
Assert((Index) var->varno == (Index) em_relid);
|
||||||
|
|
||||||
|
const int decompressed_scan_attno =
|
||||||
|
context.uncompressed_attno_info[var->varattno].custom_scan_attno;
|
||||||
|
Assert(decompressed_scan_attno > 0);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Look up the correct sort operator from the PathKey's slightly
|
* Look up the correct sort operator from the PathKey's slightly
|
||||||
* abstracted representation.
|
* abstracted representation.
|
||||||
@ -953,7 +1151,7 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
|
|||||||
var->vartype,
|
var->vartype,
|
||||||
pk->pk_opfamily);
|
pk->pk_opfamily);
|
||||||
|
|
||||||
sort_col_idx = lappend_oid(sort_col_idx, var->varattno);
|
sort_col_idx = lappend_oid(sort_col_idx, decompressed_scan_attno);
|
||||||
sort_collations = lappend_oid(sort_collations, var->varcollid);
|
sort_collations = lappend_oid(sort_collations, var->varcollid);
|
||||||
sort_nulls = lappend_oid(sort_nulls, pk->pk_nulls_first);
|
sort_nulls = lappend_oid(sort_nulls, pk->pk_nulls_first);
|
||||||
sort_ops = lappend_oid(sort_ops, sortop);
|
sort_ops = lappend_oid(sort_ops, sortop);
|
||||||
@ -1125,11 +1323,17 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
|
|||||||
lfirst(list_nth_cell(decompress_plan->custom_private, DCP_SortInfo)) = sort_options;
|
lfirst(list_nth_cell(decompress_plan->custom_private, DCP_SortInfo)) = sort_options;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Note that our scan tuple type is uncompressed chunk tuple. This is the
|
* We might be using a custom scan tuple if it allows us to avoid the
|
||||||
* assumption of decompression map and generally of all decompression
|
* projection. Otherwise, this tlist is NIL and we'll be using the
|
||||||
* functions.
|
* uncompressed tuple as the custom scan tuple.
|
||||||
*/
|
*/
|
||||||
decompress_plan->custom_scan_tlist = NIL;
|
decompress_plan->custom_scan_tlist = context.custom_scan_targetlist;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Note that we cannot decide here that we require a projection. It is
|
||||||
|
* decided at Path stage, now we must produce the requested targetlist.
|
||||||
|
*/
|
||||||
|
decompress_plan->scan.plan.targetlist = output_targetlist;
|
||||||
|
|
||||||
return &decompress_plan->scan.plan;
|
return &decompress_plan->scan.plan;
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,7 @@ vector_agg_exec(CustomScanState *vector_agg_state)
|
|||||||
for (int i = 0; i < dcontext->num_data_columns; i++)
|
for (int i = 0; i < dcontext->num_data_columns; i++)
|
||||||
{
|
{
|
||||||
CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i];
|
CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i];
|
||||||
if (current_column->output_attno == var->varattno)
|
if (current_column->uncompressed_chunk_attno == var->varattno)
|
||||||
{
|
{
|
||||||
value_column_description = current_column;
|
value_column_description = current_column;
|
||||||
break;
|
break;
|
||||||
@ -156,7 +156,7 @@ vector_agg_exec(CustomScanState *vector_agg_state)
|
|||||||
Assert(n > 0);
|
Assert(n > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int offs = AttrNumberGetAttrOffset(value_column_description->output_attno);
|
int offs = AttrNumberGetAttrOffset(value_column_description->custom_scan_attno);
|
||||||
agg->agg_const(batch_state->decompressed_scan_slot_data.base.tts_values[offs],
|
agg->agg_const(batch_state->decompressed_scan_slot_data.base.tts_values[offs],
|
||||||
batch_state->decompressed_scan_slot_data.base.tts_isnull[offs],
|
batch_state->decompressed_scan_slot_data.base.tts_isnull[offs],
|
||||||
n,
|
n,
|
||||||
|
@ -76,16 +76,29 @@ resolve_outer_special_vars_mutator(Node *node, void *context)
|
|||||||
return expression_tree_mutator(node, resolve_outer_special_vars_mutator, context);
|
return expression_tree_mutator(node, resolve_outer_special_vars_mutator, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
Var *var = castNode(Var, node);
|
Var *aggregated_var = castNode(Var, node);
|
||||||
if (var->varno != OUTER_VAR)
|
Ensure(aggregated_var->varno == OUTER_VAR,
|
||||||
{
|
"encountered unexpected varno %d as an aggregate argument",
|
||||||
return node;
|
aggregated_var->varno);
|
||||||
}
|
|
||||||
|
|
||||||
|
CustomScan *custom = castNode(CustomScan, context);
|
||||||
TargetEntry *decompress_chunk_tentry =
|
TargetEntry *decompress_chunk_tentry =
|
||||||
castNode(TargetEntry, list_nth(context, var->varattno - 1));
|
castNode(TargetEntry, list_nth(custom->scan.plan.targetlist, aggregated_var->varattno - 1));
|
||||||
Var *uncompressed_var = castNode(Var, decompress_chunk_tentry->expr);
|
Var *decompressed_var = castNode(Var, decompress_chunk_tentry->expr);
|
||||||
return (Node *) copyObject(uncompressed_var);
|
if (decompressed_var->varno == INDEX_VAR)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* This is a reference into the custom scan targetlist, we have to resolve
|
||||||
|
* it as well.
|
||||||
|
*/
|
||||||
|
decompressed_var =
|
||||||
|
castNode(Var,
|
||||||
|
castNode(TargetEntry,
|
||||||
|
list_nth(custom->custom_scan_tlist, decompressed_var->varattno - 1))
|
||||||
|
->expr);
|
||||||
|
}
|
||||||
|
Assert(decompressed_var->varno > 0);
|
||||||
|
return (Node *) copyObject(decompressed_var);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -94,9 +107,9 @@ resolve_outer_special_vars_mutator(Node *node, void *context)
|
|||||||
* variables.
|
* variables.
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
resolve_outer_special_vars(List *agg_tlist, List *outer_tlist)
|
resolve_outer_special_vars(List *agg_tlist, CustomScan *custom)
|
||||||
{
|
{
|
||||||
return castNode(List, resolve_outer_special_vars_mutator((Node *) agg_tlist, outer_tlist));
|
return castNode(List, resolve_outer_special_vars_mutator((Node *) agg_tlist, custom));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -116,8 +129,7 @@ vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk)
|
|||||||
* the previous planning stages, and they contain special varnos referencing
|
* the previous planning stages, and they contain special varnos referencing
|
||||||
* the scan targetlists.
|
* the scan targetlists.
|
||||||
*/
|
*/
|
||||||
custom->custom_scan_tlist =
|
custom->custom_scan_tlist = resolve_outer_special_vars(agg->plan.targetlist, decompress_chunk);
|
||||||
resolve_outer_special_vars(agg->plan.targetlist, decompress_chunk->scan.plan.targetlist);
|
|
||||||
custom->scan.plan.targetlist =
|
custom->scan.plan.targetlist =
|
||||||
build_trivial_custom_output_targetlist(custom->custom_scan_tlist);
|
build_trivial_custom_output_targetlist(custom->custom_scan_tlist);
|
||||||
|
|
||||||
|
@ -60,6 +60,31 @@ select * from vectorqual order by vectorqual;
|
|||||||
Sun Jan 01 00:00:00 2023 | 42 | 4 | 43 | 44
|
Sun Jan 01 00:00:00 2023 | 42 | 4 | 43 | 44
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
|
-- single chunk
|
||||||
|
select * from vectorqual where ts between '2019-02-02' and '2020-02-02' order by vectorqual;
|
||||||
|
ts | metric2 | device | metric3 | metric4
|
||||||
|
--------------------------+---------+--------+---------+---------
|
||||||
|
Wed Jan 01 00:00:00 2020 | 12 | 1 | 777 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select * from vectorqual where ts between '2020-02-02' and '2021-02-02' order by vectorqual;
|
||||||
|
ts | metric2 | device | metric3 | metric4
|
||||||
|
--------------------------+---------+--------+---------+---------
|
||||||
|
Fri Jan 01 00:00:00 2021 | 22 | 2 | 777 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select * from vectorqual where ts between '2021-02-02' and '2022-02-02' order by vectorqual;
|
||||||
|
ts | metric2 | device | metric3 | metric4
|
||||||
|
--------------------------+---------+--------+---------+---------
|
||||||
|
Sat Jan 01 00:00:00 2022 | 32 | 3 | 33 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select * from vectorqual where ts between '2022-02-02' and '2023-02-02' order by vectorqual;
|
||||||
|
ts | metric2 | device | metric3 | metric4
|
||||||
|
--------------------------+---------+--------+---------+---------
|
||||||
|
Sun Jan 01 00:00:00 2023 | 42 | 4 | 43 | 44
|
||||||
|
(1 row)
|
||||||
|
|
||||||
set timescaledb.debug_require_vector_qual to 'only' /* all following quals must be vectorized */;
|
set timescaledb.debug_require_vector_qual to 'only' /* all following quals must be vectorized */;
|
||||||
select count(*) from vectorqual where ts > '1999-01-01 00:00:00';
|
select count(*) from vectorqual where ts > '1999-01-01 00:00:00';
|
||||||
count
|
count
|
||||||
|
@ -27,6 +27,12 @@ select count(compress_chunk(x, true)) from show_chunks('vectorqual') x;
|
|||||||
|
|
||||||
select * from vectorqual order by vectorqual;
|
select * from vectorqual order by vectorqual;
|
||||||
|
|
||||||
|
-- single chunk
|
||||||
|
select * from vectorqual where ts between '2019-02-02' and '2020-02-02' order by vectorqual;
|
||||||
|
select * from vectorqual where ts between '2020-02-02' and '2021-02-02' order by vectorqual;
|
||||||
|
select * from vectorqual where ts between '2021-02-02' and '2022-02-02' order by vectorqual;
|
||||||
|
select * from vectorqual where ts between '2022-02-02' and '2023-02-02' order by vectorqual;
|
||||||
|
|
||||||
set timescaledb.debug_require_vector_qual to 'only' /* all following quals must be vectorized */;
|
set timescaledb.debug_require_vector_qual to 'only' /* all following quals must be vectorized */;
|
||||||
select count(*) from vectorqual where ts > '1999-01-01 00:00:00';
|
select count(*) from vectorqual where ts > '1999-01-01 00:00:00';
|
||||||
select count(*) from vectorqual where metric2 = 22;
|
select count(*) from vectorqual where metric2 = 22;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user