Bulk decompression of text columns

Implement bulk decompression for text columns. This will allow us to use
them in the vectorized computation pipeline.
This commit is contained in:
Alexander Kuzmenkov 2024-01-31 13:43:12 +01:00
parent 85b27b4f34
commit bf20e5f970
70 changed files with 562 additions and 92 deletions

View File

@ -65,7 +65,7 @@ jobs:
CC=clang ./configure --prefix=$HOME/$PG_INSTALL_DIR --with-openssl \
--without-readline --without-zlib --without-libxml --enable-cassert \
--enable-debug CC=clang \
CFLAGS="-DTS_COMPRESSION_FUZZING=1 -fuse-ld=lld -ggdb3 -O2 -fno-omit-frame-pointer"
CFLAGS="-fuse-ld=lld -ggdb3 -O2 -fno-omit-frame-pointer"
make -j$(nproc)
- name: Install PostgreSQL
@ -93,7 +93,7 @@ jobs:
cmake -B build -S . -DASSERTIONS=ON -DLINTER=OFF -DCMAKE_VERBOSE_MAKEFILE=1 \
-DWARNINGS_AS_ERRORS=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_C_COMPILER=clang \
-DCMAKE_C_FLAGS="-fsanitize=fuzzer-no-link -lstdc++ -L$LIBFUZZER_PATH -l:libclang_rt.fuzzer_no_main-x86_64.a -static-libsan" \
-DPG_PATH=$HOME/$PG_INSTALL_DIR
-DCOMPRESSION_FUZZING=1 -DPG_PATH=$HOME/$PG_INSTALL_DIR
make -C build -j$(nproc) install
@ -124,7 +124,9 @@ jobs:
# array has a peculiar recv function that recompresses all input, so
# fuzzing it is much slower. The dictionary recv also uses it.
{ algo: array , pgtype: text , bulk: false, runs: 10000000 },
{ algo: array , pgtype: text , bulk: true , runs: 10000000 },
{ algo: dictionary, pgtype: text , bulk: false, runs: 100000000 },
{ algo: dictionary, pgtype: text , bulk: true , runs: 100000000 },
]
name: Fuzz decompression ${{ matrix.case.algo }} ${{ matrix.case.pgtype }} ${{ matrix.case.bulk && 'bulk' || 'rowbyrow' }}
@ -288,10 +290,11 @@ jobs:
- name: Save fuzzer-generated crash cases
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: Crash cases for ${{ steps.config.outputs.name }}
path: db/crash-*
if-no-files-found: ignore
- name: Save interesting cases
if: always()

View File

@ -1,2 +1,8 @@
option(CODECOVERAGE "Enable fuzzing of compression using Libfuzzer" OFF)
if(COMPRESSION_FUZZING)
add_compile_definitions(TS_COMPRESSION_FUZZING=1)
endif()
add_subdirectory(test)
add_subdirectory(src)

View File

@ -18,7 +18,7 @@ set(TSL_LIBRARY_NAME ${PROJECT_NAME}-tsl)
include(build-defs.cmake)
if(CMAKE_BUILD_TYPE MATCHES Debug)
if(CMAKE_BUILD_TYPE MATCHES Debug OR COMPRESSION_FUZZING)
add_library(${TSL_LIBRARY_NAME} MODULE
${SOURCES} $<TARGET_OBJECTS:${TSL_TESTS_LIB_NAME}>)
else()

View File

@ -3,8 +3,6 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/array.c
${CMAKE_CURRENT_SOURCE_DIR}/compression.c
${CMAKE_CURRENT_SOURCE_DIR}/compression_storage.c
${CMAKE_CURRENT_SOURCE_DIR}/compression_test.c
${CMAKE_CURRENT_SOURCE_DIR}/decompress_text_test_impl.c
${CMAKE_CURRENT_SOURCE_DIR}/create.c
${CMAKE_CURRENT_SOURCE_DIR}/datum_serialize.c
${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c

View File

@ -17,8 +17,11 @@
#include "compression/array.h"
#include "compression/compression.h"
#include "compression/simple8b_rle.h"
#include "compression/simple8b_rle_bitmap.h"
#include "datum_serialize.h"
#include "compression/arrow_c_data_interface.h"
/* A "compressed" array
* uint8 has_nulls: 1 iff this has a nulls bitmap stored before the data
* Oid element_type: the element stored by this array
@ -460,6 +463,148 @@ tsl_array_decompression_iterator_from_datum_reverse(Datum compressed_array, Oid
return &iterator->base;
}
#define ELEMENT_TYPE uint32
#include "simple8b_rle_decompress_all.h"
#undef ELEMENT_TYPE
ArrowArray *
tsl_text_array_decompress_all(Datum compressed_array, Oid element_type, MemoryContext dest_mctx)
{
Assert(element_type == TEXTOID);
void *compressed_data = PG_DETOAST_DATUM(compressed_array);
StringInfoData si = { .data = compressed_data, .len = VARSIZE(compressed_data) };
ArrayCompressed *header = consumeCompressedData(&si, sizeof(ArrayCompressed));
Assert(header->compression_algorithm == COMPRESSION_ALGORITHM_ARRAY);
CheckCompressedData(header->element_type == TEXTOID);
return text_array_decompress_all_serialized_no_header(&si, header->has_nulls, dest_mctx);
}
ArrowArray *
text_array_decompress_all_serialized_no_header(StringInfo si, bool has_nulls,
MemoryContext dest_mctx)
{
Simple8bRleSerialized *nulls_serialized = NULL;
if (has_nulls)
{
nulls_serialized = bytes_deserialize_simple8b_and_advance(si);
}
Simple8bRleSerialized *sizes_serialized = bytes_deserialize_simple8b_and_advance(si);
uint32 sizes[GLOBAL_MAX_ROWS_PER_COMPRESSION];
const uint16 n_notnull =
simple8brle_decompress_all_buf_uint32(sizes_serialized,
sizes,
sizeof(sizes) / sizeof(sizes[0]));
const int n_total = has_nulls ? nulls_serialized->num_elements : n_notnull;
uint32 *offsets =
(uint32 *) MemoryContextAllocZero(dest_mctx,
pad_to_multiple(64, sizeof(*offsets) * (n_total + 1)));
uint8 *arrow_bodies =
(uint8 *) MemoryContextAllocZero(dest_mctx, pad_to_multiple(64, si->len - si->cursor));
uint32 offset = 0;
for (int i = 0; i < n_notnull; i++)
{
void *vardata = consumeCompressedData(si, sizes[i]);
/*
* Check for potentially corrupt varlena headers since we're reading them
* directly from compressed data. We can only have a plain datum
* with 1-byte or 4-byte header here, no TOAST or compressed data.
*/
CheckCompressedData(VARATT_IS_4B_U(vardata) ||
(VARATT_IS_1B(vardata) && !VARATT_IS_1B_E(vardata)));
/*
* Full varsize must be larger or equal than the header size so that the
* calculation of size without header doesn't overflow.
*/
CheckCompressedData((VARATT_IS_1B(vardata) && VARSIZE_1B(vardata) >= VARHDRSZ_SHORT) ||
(VARSIZE_4B(vardata) >= VARHDRSZ));
/* Varsize must match the size stored in the sizes array for this element. */
CheckCompressedData(VARSIZE_ANY(vardata) == sizes[i]);
const uint32 textlen = VARSIZE_ANY_EXHDR(vardata);
memcpy(&arrow_bodies[offset], VARDATA_ANY(vardata), textlen);
offsets[i] = offset;
CheckCompressedData(offset <= offset + textlen); /* Check for overflow. */
offset += textlen;
}
offsets[n_notnull] = offset;
const int validity_bitmap_bytes = sizeof(uint64) * (pad_to_multiple(64, n_total) / 64);
uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes);
memset(validity_bitmap, 0xFF, validity_bitmap_bytes);
if (has_nulls)
{
/*
* We have decompressed the data with nulls skipped, reshuffle it
* according to the nulls bitmap.
*/
Simple8bRleBitmap nulls = simple8brle_bitmap_decompress(nulls_serialized);
CheckCompressedData(n_notnull + simple8brle_bitmap_num_ones(&nulls) == n_total);
int current_notnull_element = n_notnull - 1;
for (int i = n_total - 1; i >= 0; i--)
{
Assert(i >= current_notnull_element);
/*
* The index of the corresponding offset is higher by one than
* the index of the element. The offset[0] is never affected by
* this shuffling and is always 0.
* Note that unlike the usual null reshuffling in other algorithms,
* for offsets, even if all elements are null, the starting offset
* is well-defined and we can do this assignment. This case is only
* accessible through fuzzing. Through SQL, all-null batches result
* in a null compressed value.
*/
Assert(current_notnull_element + 1 >= 0);
offsets[i + 1] = offsets[current_notnull_element + 1];
if (simple8brle_bitmap_get_at(&nulls, i))
{
arrow_set_row_validity(validity_bitmap, i, false);
}
else
{
Assert(current_notnull_element >= 0);
current_notnull_element--;
}
}
Assert(current_notnull_element == -1);
}
else
{
/*
* The validity bitmap size is a multiple of 64 bits. Fill the tail bits
* with zeros, because the corresponding elements are not valid.
*/
if (n_total % 64)
{
const uint64 tail_mask = -1ULL >> (64 - n_total % 64);
validity_bitmap[n_total / 64] &= tail_mask;
}
}
ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 3);
const void **buffers = (const void **) &result[1];
buffers[0] = validity_bitmap;
buffers[1] = offsets;
buffers[2] = arrow_bodies;
result->n_buffers = 3;
result->buffers = buffers;
result->length = n_total;
result->null_count = n_total - n_notnull;
return result;
}
DecompressResult
array_decompression_iterator_try_next_reverse(DecompressionIterator *base_iter)
{

View File

@ -64,6 +64,12 @@ extern void array_compressed_send(CompressedDataHeader *header, StringInfo buffe
extern Datum tsl_array_compressor_append(PG_FUNCTION_ARGS);
extern Datum tsl_array_compressor_finish(PG_FUNCTION_ARGS);
ArrowArray *tsl_text_array_decompress_all(Datum compressed_array, Oid element_type,
MemoryContext dest_mctx);
ArrowArray *text_array_decompress_all_serialized_no_header(StringInfo si, bool has_nulls,
MemoryContext dest_mctx);
#define ARRAY_ALGORITHM_DEFINITION \
{ \
.iterator_init_forward = tsl_array_decompression_iterator_from_datum_forward, \
@ -72,4 +78,5 @@ extern Datum tsl_array_compressor_finish(PG_FUNCTION_ARGS);
.compressed_data_recv = array_compressed_recv, \
.compressor_for_type = array_compressor_for_type, \
.compressed_data_storage = TOAST_STORAGE_EXTENDED, \
.decompress_all = tsl_text_array_decompress_all, \
}

View File

@ -153,3 +153,10 @@ arrow_set_row_validity(uint64 *bitmap, size_t row_number, bool value)
Assert(arrow_row_is_valid(bitmap, row_number) == value);
}
/* Increase the `source_value` to be an even multiple of `pad_to`. */
static inline uint64
pad_to_multiple(uint64 pad_to, uint64 source_value)
{
return ((source_value + pad_to - 1) / pad_to) * pad_to;
}

View File

@ -47,7 +47,6 @@
#include "array.h"
#include "chunk.h"
#include "compression.h"
#include "compression_test.h"
#include "create.h"
#include "custom_type_cache.h"
#include "debug_assert.h"
@ -133,6 +132,13 @@ tsl_get_decompress_all_function(CompressionAlgorithm algorithm, Oid type)
if (algorithm >= _END_COMPRESSION_ALGORITHMS)
elog(ERROR, "invalid compression algorithm %d", algorithm);
if (type != TEXTOID &&
(algorithm == COMPRESSION_ALGORITHM_DICTIONARY || algorithm == COMPRESSION_ALGORITHM_ARRAY))
{
/* Bulk decompression of array and dictionary is only supported for text. */
return NULL;
}
return definitions[algorithm].decompress_all;
}

View File

@ -404,3 +404,5 @@ consumeCompressedData(StringInfo si, int bytes)
* We use this limit for sanity checks in case the compressed data is corrupt.
*/
#define GLOBAL_MAX_ROWS_PER_COMPRESSION 1015
const CompressionAlgorithmDefinition *algorithm_definition(CompressionAlgorithm algo);

View File

@ -22,9 +22,11 @@
#include "compression/compression.h"
#include "compression/dictionary.h"
#include "compression/simple8b_rle.h"
#include "compression/simple8b_rle_bitmap.h"
#include "compression/array.h"
#include "compression/dictionary_hash.h"
#include "compression/datum_serialize.h"
#include "compression/arrow_c_data_interface.h"
/*
* A compression bitmap is stored as
@ -395,6 +397,117 @@ dictionary_decompression_iterator_init(DictionaryDecompressionIterator *iter, co
}
Assert(array_decompression_iterator_try_next_forward(dictionary_iterator).is_done);
}
#define ELEMENT_TYPE int16
#include "simple8b_rle_decompress_all.h"
#undef ELEMENT_TYPE
ArrowArray *
tsl_text_dictionary_decompress_all(Datum compressed, Oid element_type, MemoryContext dest_mctx)
{
Assert(element_type == TEXTOID);
compressed = PointerGetDatum(PG_DETOAST_DATUM(compressed));
StringInfoData si = { .data = DatumGetPointer(compressed), .len = VARSIZE(compressed) };
const DictionaryCompressed *header = consumeCompressedData(&si, sizeof(DictionaryCompressed));
Assert(header->compression_algorithm == COMPRESSION_ALGORITHM_DICTIONARY);
CheckCompressedData(header->element_type == TEXTOID);
Simple8bRleSerialized *indices_serialized = bytes_deserialize_simple8b_and_advance(&si);
Simple8bRleSerialized *nulls_serialized = NULL;
if (header->has_nulls)
{
nulls_serialized = bytes_deserialize_simple8b_and_advance(&si);
}
const uint16 n_notnull = indices_serialized->num_elements;
const uint16 n_total = header->has_nulls ? nulls_serialized->num_elements : n_notnull;
const uint16 n_padded =
n_total + 63; /* This is the padding requirement of simple8brle_decompress_all. */
int16 *restrict indices = MemoryContextAlloc(dest_mctx, sizeof(int16) * n_padded);
const uint16 n_decompressed =
simple8brle_decompress_all_buf_int16(indices_serialized, indices, n_padded);
CheckCompressedData(n_decompressed == n_notnull);
/* Check that the dictionary indices that we've just read are not out of bounds. */
CheckCompressedData(header->num_distinct <= GLOBAL_MAX_ROWS_PER_COMPRESSION);
CheckCompressedData(header->num_distinct <= INT16_MAX);
bool have_incorrect_index = false;
for (int i = 0; i < n_notnull; i++)
{
have_incorrect_index |= indices[i] >= (int16) header->num_distinct;
}
CheckCompressedData(!have_incorrect_index);
/* Decompress the actual values in the dictionary. */
ArrowArray *dict =
text_array_decompress_all_serialized_no_header(&si, /* has_nulls = */ false, dest_mctx);
CheckCompressedData(header->num_distinct == dict->length);
/* Fill validity and indices of the array elements, reshuffling for nulls if needed. */
const int validity_bitmap_bytes = sizeof(uint64) * pad_to_multiple(64, n_total) / 64;
uint64 *restrict validity_bitmap = MemoryContextAlloc(dest_mctx, validity_bitmap_bytes);
memset(validity_bitmap, 0xFF, validity_bitmap_bytes);
if (header->has_nulls)
{
/*
* We have decompressed the data with nulls skipped, reshuffle it
* according to the nulls bitmap.
*/
Simple8bRleBitmap nulls = simple8brle_bitmap_decompress(nulls_serialized);
CheckCompressedData(n_notnull + simple8brle_bitmap_num_ones(&nulls) == n_total);
int current_notnull_element = n_notnull - 1;
for (int i = n_total - 1; i >= 0; i--)
{
Assert(i >= current_notnull_element);
if (simple8brle_bitmap_get_at(&nulls, i))
{
arrow_set_row_validity(validity_bitmap, i, false);
indices[i] = 0;
}
else
{
Assert(current_notnull_element >= 0);
indices[i] = indices[current_notnull_element];
current_notnull_element--;
}
}
Assert(current_notnull_element == -1);
}
else
{
/*
* The validity bitmap size is a multiple of 64 bits. Fill the tail bits
* with zeros, because the corresponding elements are not valid.
*/
if (n_total % 64)
{
const uint64 tail_mask = -1ULL >> (64 - n_total % 64);
validity_bitmap[n_total / 64] &= tail_mask;
}
}
ArrowArray *result = MemoryContextAllocZero(dest_mctx, sizeof(ArrowArray) + sizeof(void *) * 2);
const void **buffers = (const void **) &result[1];
buffers[0] = validity_bitmap;
buffers[1] = indices;
result->n_buffers = 2;
result->buffers = buffers;
result->length = n_total;
result->null_count = n_total - n_notnull;
result->dictionary = dict;
return result;
}
DecompressionIterator *
tsl_dictionary_decompression_iterator_from_datum_forward(Datum dictionary_compressed,
Oid element_type)

View File

@ -47,6 +47,12 @@ extern Datum dictionary_compressed_recv(StringInfo buf);
extern Datum tsl_dictionary_compressor_append(PG_FUNCTION_ARGS);
extern Datum tsl_dictionary_compressor_finish(PG_FUNCTION_ARGS);
ArrowArray *tsl_text_array_decompress_all(Datum compressed_array, Oid element_type,
MemoryContext dest_mctx);
ArrowArray *tsl_text_dictionary_decompress_all(Datum compressed, Oid element_type,
MemoryContext dest_mctx);
#define DICTIONARY_ALGORITHM_DEFINITION \
{ \
.iterator_init_forward = tsl_dictionary_decompression_iterator_from_datum_forward, \
@ -55,4 +61,5 @@ extern Datum tsl_dictionary_compressor_finish(PG_FUNCTION_ARGS);
.compressed_data_recv = dictionary_compressed_recv, \
.compressor_for_type = dictionary_compressor_for_type, \
.compressed_data_storage = TOAST_STORAGE_EXTENDED, \
.decompress_all = tsl_text_dictionary_decompress_all, \
}

View File

@ -10,6 +10,9 @@
/*
* Specialization of bulk simple8brle decompression for a data type specified by
* ELEMENT_TYPE macro.
*
* The buffer must have a padding of 63 elements after the last one, because
* decompression is performed always in full blocks.
*/
static uint16
FUNCTION_NAME(simple8brle_decompress_all_buf,

View File

@ -77,6 +77,23 @@ make_single_value_arrow(Oid pgtype, Datum datum, bool isnull)
return arrow;
}
static int
get_max_text_datum_size(ArrowArray *text_array)
{
int maxbytes = 0;
uint32 *offsets = (uint32 *) text_array->buffers[1];
for (int i = 0; i < text_array->length; i++)
{
const int curbytes = offsets[i + 1] - offsets[i];
if (curbytes > maxbytes)
{
maxbytes = curbytes;
}
}
return maxbytes;
}
static void
decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, int i)
{
@ -170,8 +187,37 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state
}
else
{
/* No variable-width columns support bulk decompression. */
Assert(false);
/*
* Text column. Pre-allocate memory for its text Datum in the
* decompressed scan slot. We can't put direct references to Arrow
* memory there, because it doesn't have the varlena headers that
* Postgres expects for text.
*/
const int maxbytes =
VARHDRSZ + (arrow->dictionary ? get_max_text_datum_size(arrow->dictionary) :
get_max_text_datum_size(arrow));
*column_values->output_value =
PointerGetDatum(MemoryContextAlloc(batch_state->per_batch_context, maxbytes));
/*
* Set up the datum conversion based on whether we use the dictionary.
*/
if (arrow->dictionary == NULL)
{
column_values->decompression_type = DT_ArrowText;
column_values->buffers[0] = arrow->buffers[0];
column_values->buffers[1] = arrow->buffers[1];
column_values->buffers[2] = arrow->buffers[2];
}
else
{
column_values->decompression_type = DT_ArrowTextDict;
column_values->buffers[0] = arrow->buffers[0];
column_values->buffers[1] = arrow->dictionary->buffers[1];
column_values->buffers[2] = arrow->dictionary->buffers[2];
column_values->buffers[3] = arrow->buffers[1];
}
}
}
@ -656,6 +702,21 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
MemoryContextSwitchTo(old_context);
}
static void
store_text_datum(CompressedColumnValues *column_values, int arrow_row)
{
const uint32 start = ((uint32 *) column_values->buffers[1])[arrow_row];
const int32 value_bytes = ((uint32 *) column_values->buffers[1])[arrow_row + 1] - start;
Assert(value_bytes >= 0);
const int total_bytes = value_bytes + VARHDRSZ;
Assert(DatumGetPointer(*column_values->output_value) != NULL);
SET_VARSIZE(*column_values->output_value, total_bytes);
memcpy(VARDATA(*column_values->output_value),
&((uint8 *) column_values->buffers[2])[start],
value_bytes);
}
/*
* Construct the next tuple in the decompressed scan slot.
* Doesn't check the quals.
@ -685,39 +746,47 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_com
*column_values->output_isnull = result.is_null;
*column_values->output_value = result.val;
}
else if (column_values->decompression_type > 0)
else if (column_values->decompression_type > SIZEOF_DATUM)
{
Assert(column_values->decompression_type <= 8);
/*
* Fixed-width by-reference type that doesn't fit into a Datum.
* For now this only happens for 8-byte types on 32-bit systems,
* but eventually we could also use it for bigger by-value types
* such as UUID.
*/
const uint8 value_bytes = column_values->decompression_type;
const char *restrict src = column_values->buffers[1];
*column_values->output_value = PointerGetDatum(&src[value_bytes * arrow_row]);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
}
else if (column_values->decompression_type > 0)
{
/*
* Fixed-width by-value type that fits into a Datum.
*
* The conversion of Datum to more narrow types will truncate
* the higher bytes, so we don't care if we read some garbage
* into them, and can always read 8 bytes. These are unaligned
* reads, so technically we have to do memcpy.
*/
uint64 value;
memcpy(&value, &src[value_bytes * arrow_row], 8);
#ifdef USE_FLOAT8_BYVAL
Datum datum = Int64GetDatum(value);
#else
/*
* On 32-bit systems, the data larger than 4 bytes go by
* reference, so we have to jump through these hoops.
*/
Datum datum;
if (value_bytes <= 4)
{
datum = Int32GetDatum((uint32) value);
}
else
{
datum = Int64GetDatum(value);
}
#endif
*column_values->output_value = datum;
const uint8 value_bytes = column_values->decompression_type;
Assert(value_bytes <= SIZEOF_DATUM);
const char *restrict src = column_values->buffers[1];
memcpy(column_values->output_value, &src[value_bytes * arrow_row], SIZEOF_DATUM);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
}
else if (column_values->decompression_type == DT_ArrowText)
{
store_text_datum(column_values, arrow_row);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
}
else if (column_values->decompression_type == DT_ArrowTextDict)
{
const int16 index = ((int16 *) column_values->buffers[3])[arrow_row];
store_text_datum(column_values, index);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
}

View File

@ -13,6 +13,8 @@ typedef struct ArrowArray ArrowArray;
/* How to obtain the decompressed datum for individual row. */
typedef enum
{
DT_ArrowTextDict = -4,
DT_ArrowText = -3,
DT_Default = -2,
DT_Iterator = -1,
DT_Invalid = 0,
@ -36,8 +38,10 @@ typedef struct CompressedColumnValues
* Depending on decompression type, they are as follows:
* iterator: iterator
* arrow fixed: validity, value
* arrow text: validity, uint32* offsets, void* bodies
* arrow dict text: validity, uint32* dict offsets, void* dict bodies, int16* indices
*/
const void *restrict buffers[2];
const void *restrict buffers[4];
/*
* The source arrow array, if any. We don't use it for building the

View File

@ -362,9 +362,14 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
CompressionColumnDescription *column = &dcontext->template_columns[i];
if (column->bulk_decompression_supported)
{
/* Values array, with 64 element padding (actually we have less). */
batch_memory_context_bytes +=
(GLOBAL_MAX_ROWS_PER_COMPRESSION + 64) * column->value_bytes;
/*
* Values array, with 64 element padding (actually we have less).
*
* For variable-length types (we only have text) we can't
* estimate the width currently.
*/
batch_memory_context_bytes += (GLOBAL_MAX_ROWS_PER_COMPRESSION + 64) *
(column->value_bytes > 0 ? column->value_bytes : 16);
/* Nulls bitmap, one uint64 per 64 rows. */
batch_memory_context_bytes +=
((GLOBAL_MAX_ROWS_PER_COMPRESSION + 63) / 64) * sizeof(uint64);

View File

@ -131,6 +131,6 @@ if(_install_checks)
add_dependencies(installcheck installcheck-t)
endif()
if(CMAKE_BUILD_TYPE MATCHES Debug)
if(CMAKE_BUILD_TYPE MATCHES Debug OR COMPRESSION_FUZZING)
add_subdirectory(src)
endif(CMAKE_BUILD_TYPE MATCHES Debug)
endif()

View File

@ -1588,34 +1588,37 @@ group by 2, 3 order by 1 desc
\set algo array
\set type text
select count(*)
, coalesce((bulk.rows >= 0)::text, bulk.sqlstate) bulk_result
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn false) rowbyrow
group by 2 order by 1 desc
from :fn true) bulk join :fn false) rowbyrow using (path)
group by 2, 3 order by 1 desc
;
count | rowbyrow_result
-------+-----------------
13 | XX001
4 | 08P01
3 | true
1 | false
1 | 22021
1 | 3F000
count | bulk_result | rowbyrow_result
-------+-------------+-----------------
21 | XX001 | XX001
6 | 08P01 | 08P01
2 | 3F000 | 3F000
2 | true | true
1 | 22021 | 22021
1 | false | false
(6 rows)
\set algo dictionary
\set type text
select count(*)
, coalesce((bulk.rows >= 0)::text, bulk.sqlstate) bulk_result
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn false) rowbyrow
group by 2 order by 1 desc
from :fn true) bulk join :fn false) rowbyrow using (path)
group by 2, 3 order by 1 desc
;
count | rowbyrow_result
-------+-----------------
22 | XX001
4 | 08P01
2 | true
1 | false
1 | 22021
1 | 3F000
(6 rows)
count | bulk_result | rowbyrow_result
-------+-------------+-----------------
51 | XX001 | XX001
4 | 08P01 | 08P01
4 | XX001 | true
2 | true | true
2 | 22021 | 22021
1 | 3F000 | 3F000
1 | false | false
(7 rows)

View File

@ -141,7 +141,7 @@ select * from arithmetic where
(1 row)
-- Test columns that don't support bulk decompression.
alter table vectorqual add column tag text;
alter table vectorqual add column tag name;
insert into vectorqual(ts, device, metric2, metric3, metric4, tag) values ('2025-01-01 00:00:00', 5, 52, 53, 54, 'tag5');
select count(compress_chunk(x, true)) from show_chunks('vectorqual') x;
NOTICE: chunk "_hyper_1_1_chunk" is already compressed

View File

@ -410,16 +410,18 @@ group by 2, 3 order by 1 desc
\set algo array
\set type text
select count(*)
, coalesce((bulk.rows >= 0)::text, bulk.sqlstate) bulk_result
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn false) rowbyrow
group by 2 order by 1 desc
from :fn true) bulk join :fn false) rowbyrow using (path)
group by 2, 3 order by 1 desc
;
\set algo dictionary
\set type text
select count(*)
, coalesce((bulk.rows >= 0)::text, bulk.sqlstate) bulk_result
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn false) rowbyrow
group by 2 order by 1 desc
from :fn true) bulk join :fn false) rowbyrow using (path)
group by 2, 3 order by 1 desc
;

View File

@ -57,7 +57,7 @@ select * from arithmetic where
-- Test columns that don't support bulk decompression.
alter table vectorqual add column tag text;
alter table vectorqual add column tag name;
insert into vectorqual(ts, device, metric2, metric3, metric4, tag) values ('2025-01-01 00:00:00', 5, 52, 53, 54, 'tag5');
select count(compress_chunk(x, true)) from show_chunks('vectorqual') x;

View File

@ -1,5 +1,6 @@
set(SOURCES test_chunk_stats.c test_merge_chunk.c test_compression.c
test_continuous_agg.c)
set(SOURCES
test_chunk_stats.c test_merge_chunk.c compression_unit_test.c
compression_sql_test.c decompress_text_test_impl.c test_continuous_agg.c)
include(${PROJECT_SOURCE_DIR}/tsl/src/build-defs.cmake)

View File

@ -11,10 +11,9 @@
#include <funcapi.h>
#include <utils/builtins.h>
#include "compression_test.h"
#include "compression_sql_test.h"
#include "compression.h"
#include "arrow_c_data_interface.h"
#include "compression/arrow_c_data_interface.h"
#if !defined(NDEBUG) || defined(TS_COMPRESSION_FUZZING)
@ -75,7 +74,9 @@ get_compression_algorithm(char *name)
X(DELTADELTA, INT8, true) \
X(DELTADELTA, INT8, false) \
X(ARRAY, TEXT, false) \
X(DICTIONARY, TEXT, false)
X(ARRAY, TEXT, true) \
X(DICTIONARY, TEXT, false) \
X(DICTIONARY, TEXT, true)
static int (*get_decompress_fn(int algo, Oid type))(const uint8 *Data, size_t Size, bool bulk)
{

View File

@ -5,7 +5,7 @@
*/
#pragma once
#include "compression.h"
#include "compression/compression.h"
int decompress_ARRAY_TEXT(const uint8 *Data, size_t Size, bool bulk);

View File

@ -280,8 +280,8 @@ test_gorilla_float()
GorillaCompressor *compressor = gorilla_compressor_alloc();
GorillaCompressed *compressed;
DecompressionIterator *iter;
for (int i = 0.0; i < TEST_ELEMENTS; i++)
gorilla_compressor_append_value(compressor, float_get_bits((float) i));
for (int x = 0; x < TEST_ELEMENTS; x++)
gorilla_compressor_append_value(compressor, float_get_bits((float) x));
compressed = gorilla_compressor_finish(compressor);
TestAssertTrue(compressed != NULL);

View File

@ -7,24 +7,91 @@
#include <libpq/pqformat.h>
#include "compression.h"
#include "compression_sql_test.h"
#include "compression_test.h"
#include "compression/arrow_c_data_interface.h"
static uint32
arrow_get_str(ArrowArray *arrow, int arrow_row, const char **str)
{
if (!arrow->dictionary)
{
const uint32 *offsets = (uint32 *) arrow->buffers[1];
const char *values = (char *) arrow->buffers[2];
const uint32 start = offsets[arrow_row];
const uint32 end = offsets[arrow_row + 1];
const uint32 arrow_len = end - start;
*str = &values[start];
return arrow_len;
}
const int16 dict_row = ((int16 *) arrow->buffers[1])[arrow_row];
return arrow_get_str(arrow->dictionary, dict_row, str);
}
static void
decompress_generic_text_check_arrow(ArrowArray *arrow, int errorlevel, DecompressResult *results,
int n)
{
/* Check that both ways of decompression match. */
if (n != arrow->length)
{
ereport(errorlevel,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("Expected %d elements, got %d.", n, (int) arrow->length)));
}
for (int i = 0; i < n; i++)
{
const bool arrow_isnull = !arrow_row_is_valid(arrow->buffers[0], i);
if (arrow_isnull != results[i].is_null)
{
ereport(errorlevel,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("Expected null %d, got %d at row %d.",
results[i].is_null,
arrow_isnull,
i)));
}
if (!results[i].is_null)
{
const char *arrow_cstring;
size_t arrow_len = arrow_get_str(arrow, i, &arrow_cstring);
const Datum rowbyrow_varlena = results[i].val;
const size_t rowbyrow_len = VARSIZE_ANY_EXHDR(rowbyrow_varlena);
const char *rowbyrow_cstring = VARDATA_ANY(rowbyrow_varlena);
if (rowbyrow_len != arrow_len)
{
ereport(errorlevel,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("At row %d\n", i)));
}
if (strncmp(arrow_cstring, rowbyrow_cstring, rowbyrow_len) != 0)
{
ereport(errorlevel,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("At row %d\n", i)));
}
}
}
}
/*
* Try to decompress the given compressed data. Used for fuzzing and for checking
* the examples found by fuzzing. For fuzzing we do less checks to keep it
* faster and the coverage space smaller. This is a generic implementation
* for arithmetic types.
* Try to decompress the given compressed data.
*/
static int
decompress_generic_text(const uint8 *Data, size_t Size, bool bulk, int requested_algo)
{
if (bulk)
{
elog(ERROR, "bulk decompression not supported for text");
}
StringInfoData si = { .data = (char *) Data, .len = Size };
const int data_algo = pq_getmsgbyte(&si);
@ -40,9 +107,19 @@ decompress_generic_text(const uint8 *Data, size_t Size, bool bulk, int requested
*/
return -1;
}
const CompressionAlgorithmDefinition *def = algorithm_definition(data_algo);
Datum compressed_data = def->compressed_data_recv(&si);
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(data_algo, TEXTOID);
ArrowArray *arrow = NULL;
if (bulk)
{
/*
* Check that the arrow decompression works. Have to do this before the
* row-by-row decompression so that it doesn't hide the possible errors.
*/
arrow = decompress_all(compressed_data, TEXTOID, CurrentMemoryContext);
}
/*
* Test row-by-row decompression.
@ -60,17 +137,20 @@ decompress_generic_text(const uint8 *Data, size_t Size, bool bulk, int requested
results[n++] = r;
}
/*
* For row-by-row decompression, check that the result is still the same
* after we compress and decompress back.
* Don't perform this check for other types of tests.
*/
if (bulk)
{
/*
* Check that the arrow decompression result matches.
*/
decompress_generic_text_check_arrow(arrow, ERROR, results, n);
return n;
}
/*
* For row-by-row decompression, check that the result is still the same
* after we compress and decompress back.
* Don't perform this check for other types of tests.
*
* 1) Compress.
*/
Compressor *compressor = def->compressor_for_type(TEXTOID);
@ -136,6 +216,13 @@ decompress_generic_text(const uint8 *Data, size_t Size, bool bulk, int requested
nn++;
}
/*
* 3) The bulk decompression must absolutely work on the correct compressed
* data we've just generated.
*/
arrow = decompress_all(compressed_data, TEXTOID, CurrentMemoryContext);
decompress_generic_text_check_arrow(arrow, PANIC, results, n);
return n;
}