From 3d8ec1e43d837ba977822351a9fed6127bf61707 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Fri, 15 Dec 2023 10:45:18 +0100 Subject: [PATCH] Make the bulk decompression function depend on PG type This is a refactoring to enable bulk decompression of array and dictionary compressed text columns, but not other types. Currently has no effect. --- tsl/src/compression/compression.c | 2 +- tsl/src/compression/compression.h | 3 ++- tsl/src/compression/decompress_test_impl.c | 4 ++-- tsl/src/nodes/decompress_chunk/compressed_batch.c | 3 ++- tsl/src/nodes/decompress_chunk/exec.c | 3 ++- tsl/src/nodes/decompress_chunk/planner.c | 10 +++++++++- 6 files changed, 18 insertions(+), 7 deletions(-) diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index e3efff151..d26a8347b 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -130,7 +130,7 @@ DecompressionIterator *(*tsl_get_decompression_iterator_init(CompressionAlgorith } DecompressAllFunction -tsl_get_decompress_all_function(CompressionAlgorithm algorithm) +tsl_get_decompress_all_function(CompressionAlgorithm algorithm, Oid type) { if (algorithm >= _END_COMPRESSION_ALGORITHMS) elog(ERROR, "invalid compression algorithm %d", algorithm); diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index 1368083bb..bd62d8404 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -320,7 +320,8 @@ extern void decompress_chunk(Oid in_table, Oid out_table); extern DecompressionIterator *(*tsl_get_decompression_iterator_init( CompressionAlgorithm algorithm, bool reverse))(Datum, Oid element_type); -extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithm algorithm); +extern DecompressAllFunction tsl_get_decompress_all_function(CompressionAlgorithm algorithm, + Oid type); typedef struct Chunk Chunk; typedef struct ChunkInsertState ChunkInsertState; diff --git a/tsl/src/compression/decompress_test_impl.c b/tsl/src/compression/decompress_test_impl.c index 69897d45b..c452b1733 100644 --- a/tsl/src/compression/decompress_test_impl.c +++ b/tsl/src/compression/decompress_test_impl.c @@ -42,7 +42,7 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks) * For routine fuzzing, we only run bulk decompression to make it faster * and the coverage space smaller. */ - DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo); + DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE); decompress_all(compressed_data, PGTYPE, CurrentMemoryContext); return 0; } @@ -53,7 +53,7 @@ FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks) * the row-by-row is old and stable. */ ArrowArray *arrow = NULL; - DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo); + DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE); if (decompress_all) { arrow = decompress_all(compressed_data, PGTYPE, CurrentMemoryContext); diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 5313d821f..e0cb3073a 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -123,7 +123,8 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state } DecompressAllFunction decompress_all = - tsl_get_decompress_all_function(header->compression_algorithm); + tsl_get_decompress_all_function(header->compression_algorithm, + column_description->typid); Assert(decompress_all != NULL); MemoryContext context_before_decompression = diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index a7e0b89d5..c97c4ea8e 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -609,7 +609,8 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) ArrowArray *arrow = NULL; DecompressAllFunction decompress_all = - tsl_get_decompress_all_function(header->compression_algorithm); + tsl_get_decompress_all_function(header->compression_algorithm, + column_description->typid); Assert(decompress_all != NULL); MemoryContext context_before_decompression = diff --git a/tsl/src/nodes/decompress_chunk/planner.c b/tsl/src/nodes/decompress_chunk/planner.c index d217ef2a2..8a4faffe0 100644 --- a/tsl/src/nodes/decompress_chunk/planner.c +++ b/tsl/src/nodes/decompress_chunk/planner.c @@ -211,14 +211,22 @@ build_decompression_map(PlannerInfo *root, DecompressChunkPath *path, List *scan lappend_int(path->decompression_map, destination_attno_in_uncompressed_chunk); path->is_segmentby_column = lappend_int(path->is_segmentby_column, is_segment); + /* + * Determine if we can use bulk decompression for this column. + */ Oid typoid = get_atttype(path->info->chunk_rte->relid, chunk_attno); const bool bulk_decompression_possible = !is_segment && destination_attno_in_uncompressed_chunk > 0 && - tsl_get_decompress_all_function(compression_get_default_algorithm(typoid)) != NULL; + tsl_get_decompress_all_function(compression_get_default_algorithm(typoid), typoid) != + NULL; path->have_bulk_decompression_columns |= bulk_decompression_possible; path->bulk_decompression_column = lappend_int(path->bulk_decompression_column, bulk_decompression_possible); + /* + * Save information about decompressed columns in uncompressed chunk + * for planning of vectorized filters. + */ if (destination_attno_in_uncompressed_chunk > 0) { path->uncompressed_chunk_attno_to_compression_info