More tests for decompression

Add separate testing for bulk and row-by-row decompression, so that the
errors in one don't mask the errors in the other.

Also add fuzzing for row-by-row decompression, for text columns as well.
This commit is contained in:
Alexander Kuzmenkov 2023-12-20 11:44:21 +01:00
parent 1797f8ec32
commit 4f2f6585d3
71 changed files with 1203 additions and 677 deletions

View File

@ -9,17 +9,14 @@ name: Libfuzzer
- prerelease_test
- trigger/libfuzzer
pull_request:
paths: .github/workflows/libfuzzer.yaml
paths:
- .github/workflows/libfuzzer.yaml
- 'tsl/test/fuzzing/compression/**'
jobs:
fuzz:
strategy:
fail-fast: false
matrix:
case: [ { algo: gorilla, type: float8 }, { algo: deltadelta, type: int8 } ]
name: Fuzz decompression ${{ matrix.case.algo }} ${{ matrix.case.type }}
build:
runs-on: ubuntu-22.04
name: Build PostgreSQL and TimescaleDB
env:
PG_SRC_DIR: pgbuild
PG_INSTALL_DIR: postgresql
@ -30,7 +27,7 @@ jobs:
# Don't add ddebs here because the ddebs mirror is always 503 Service Unavailable.
# If needed, install them before opening the core dump.
sudo apt-get update
sudo apt-get install clang lld llvm flex bison lcov systemd-coredump gdb libipc-run-perl \
sudo apt-get install 7zip clang lld llvm flex bison libipc-run-perl \
libtest-most-perl tree
- name: Checkout TimescaleDB
@ -68,7 +65,7 @@ jobs:
CC=clang ./configure --prefix=$HOME/$PG_INSTALL_DIR --with-openssl \
--without-readline --without-zlib --without-libxml --enable-cassert \
--enable-debug CC=clang \
CFLAGS="-DTS_COMPRESSION_FUZZING=1 -fuse-ld=lld -ggdb3 -Og -fno-omit-frame-pointer"
CFLAGS="-DTS_COMPRESSION_FUZZING=1 -fuse-ld=lld -ggdb3 -O2 -fno-omit-frame-pointer"
make -j$(nproc)
- name: Install PostgreSQL
@ -89,13 +86,68 @@ jobs:
export LIBFUZZER_PATH=$(dirname "$(find $(llvm-config --libdir) -name libclang_rt.fuzzer_no_main-x86_64.a | head -1)")
# Some pointers for the next time we have linking/undefined symbol problems:
# http://web.archive.org/web/20200926071757/https://github.com/google/sanitizers/issues/111
# http://web.archive.org/web/20231101091231/https://github.com/cms-sw/cmssw/issues/40680
cmake -B build -S . -DASSERTIONS=ON -DLINTER=OFF -DCMAKE_VERBOSE_MAKEFILE=1 \
-DWARNINGS_AS_ERRORS=1 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=clang \
-DWARNINGS_AS_ERRORS=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_C_COMPILER=clang \
-DCMAKE_C_FLAGS="-fsanitize=fuzzer-no-link -lstdc++ -L$LIBFUZZER_PATH -l:libclang_rt.fuzzer_no_main-x86_64.a -static-libsan" \
-DPG_PATH=$HOME/$PG_INSTALL_DIR
make -C build -j$(nproc) install
# Incredibly, the upload-artifact action can't preserve executable permissions:
# https://github.com/actions/upload-artifact/issues/38
# It's also extremely slow.
- name: Compress the installation directory
run: 7z a install.7z $HOME/$PG_INSTALL_DIR
- name: Save the installation directory
uses: actions/upload-artifact@v3
with:
name: fuzzing-install-dir
path: install.7z
if-no-files-found: error
retention-days: 1
fuzz:
needs: build
strategy:
fail-fast: false
matrix:
case: [
{ algo: gorilla , pgtype: float8, bulk: false, runs: 500000000 },
{ algo: deltadelta, pgtype: int8 , bulk: false, runs: 500000000 },
{ algo: gorilla , pgtype: float8, bulk: true , runs: 1000000000 },
{ algo: deltadelta, pgtype: int8 , bulk: true , runs: 1000000000 },
# array has a peculiar recv function that recompresses all input, so
# fuzzing it is much slower. The dictionary recv also uses it.
{ algo: array , pgtype: text , bulk: false, runs: 10000000 },
{ algo: dictionary, pgtype: text , bulk: false, runs: 100000000 },
]
name: Fuzz decompression ${{ matrix.case.algo }} ${{ matrix.case.pgtype }} ${{ matrix.case.bulk && 'bulk' || 'rowbyrow' }}
runs-on: ubuntu-22.04
env:
PG_SRC_DIR: pgbuild
PG_INSTALL_DIR: postgresql
steps:
- name: Install Linux dependencies
run: sudo apt install 7zip systemd-coredump gdb
- name: Checkout TimescaleDB
uses: actions/checkout@v3
- name: Download the installation directory
uses: actions/download-artifact@v3
with:
name: fuzzing-install-dir
- name: Unpack the installation directory
run: 7z x -o$HOME install.7z
- name: initdb
run: |
# Have to do this before initializing the corpus, or initdb will complain.
@ -108,24 +160,37 @@ jobs:
initdb
echo "shared_preload_libraries = 'timescaledb'" >> $PGDATA/postgresql.conf
- name: Restore the cached fuzzing corpus
id: restore-corpus-cache
- name: Set configuration
id: config
run: |
set -x
echo "cache_prefix=${{ format('libfuzzer-corpus-2-{0}-{1}', matrix.case.algo, matrix.case.pgtype) }}" >> $GITHUB_OUTPUT
echo "name=${{ matrix.case.algo }} ${{ matrix.case.pgtype }} ${{ matrix.case.bulk && 'bulk' || 'rowbyrow' }}" >> $GITHUB_OUTPUT
- name: Restore the cached fuzzing corpus (bulk)
id: restore-corpus-cache-bulk
uses: actions/cache/restore@v3
with:
path: db/corpus
# If the initial corpus changes, probably it was updated by hand with
# some important examples, and it makes sense to start anew from it.
key: "libfuzzer-corpus-2-${{ matrix.case.algo }}-${{ matrix.case.type }}-\
${{ hashFiles(format('tsl/test/fuzzing/compression/{0}-{1}', matrix.case.algo, matrix.case.type)) }}"
path: db/corpus-bulk
key: "${{ steps.config.outputs.cache_prefix }}-bulk"
# We save the row-by-row corpus separately from the bulk corpus, so that
# they don't overwrite each other. Now we are going to combine them.
- name: Restore the cached fuzzing corpus (rowbyrow)
id: restore-corpus-cache-rowbyrow
uses: actions/cache/restore@v3
with:
path: db/corpus-rowbyrow
key: "${{ steps.config.outputs.cache_prefix }}-rowbyrow"
- name: Initialize the fuzzing corpus
# cache-hit is only true for exact key matches, and we use prefix matches.
if: steps.restore-corpus-cache.outputs.cache-matched-key == ''
run: |
# Copy the intial corpus files from the repository. The github actions
# cache doesn't follow symlinks.
mkdir -p db/corpus
find "tsl/test/fuzzing/compression/${{ matrix.case.algo }}-${{ matrix.case.type }}" -type f -exec cp -t db/corpus {} +
# Combine the cached corpus from rowbyrow and bulk fuzzing, and from repository.
mkdir -p db/corpus{,-rowbyrow,-bulk}
find "tsl/test/fuzzing/compression/${{ matrix.case.algo }}-${{ matrix.case.pgtype }}" -type f -exec cp -n -t db/corpus {} +
find "db/corpus-rowbyrow" -type f -exec cp -n -t db/corpus {} +
find "db/corpus-bulk" -type f -exec cp -n -t db/corpus {} +
ls db/corpus | wc -l
- name: Run libfuzzer for compression
run: |
@ -135,25 +200,36 @@ jobs:
export PGPORT=5432
export PGDATABASE=postgres
export PATH=$HOME/$PG_INSTALL_DIR/bin:$PATH
pg_ctl -l postmaster.log start
pg_ctl -l postgres.log start
psql -c "create extension timescaledb;"
# Create the fuzzing function
# Create the fuzzing functions
export MODULE_NAME=$(basename $(find $HOME/$PG_INSTALL_DIR -name "timescaledb-tsl-*.so"))
psql -a -c "create or replace function fuzz(algo cstring, type regtype, runs int) returns int as '"$MODULE_NAME"', 'ts_fuzz_compression' language c;"
psql -a -c "create or replace function fuzz(algo cstring, pgtype regtype,
bulk bool, runs int)
returns int as '"$MODULE_NAME"', 'ts_fuzz_compression' language c;
create or replace function ts_read_compressed_data_directory(algo cstring,
pgtype regtype, path cstring, bulk bool)
returns table(path text, bytes int, rows int, sqlstate text, location text)
as '"$MODULE_NAME"', 'ts_read_compressed_data_directory' language c;
"
# Start more fuzzing processes in the background. We won't even monitor
# their progress, because the server will panic if they find an error.
for x in {2..$(nproc)}
do
psql -v ON_ERROR_STOP=1 -c "select fuzz('${{ matrix.case.algo }}', '${{ matrix.case.type }}', 100000000);" &
psql -v ON_ERROR_STOP=1 -c "select fuzz('${{ matrix.case.algo }}',
'${{ matrix.case.pgtype }}', '${{ matrix.case.bulk }}', ${{ matrix.case.runs }});" &
done
# Start the one fuzzing process that we will monitor, in foreground.
# The LLVM fuzzing driver calls exit(), so we expect to lose the connection.
ret=0
psql -v ON_ERROR_STOP=1 -c "select fuzz('${{ matrix.case.algo }}', '${{ matrix.case.type }}', 100000000);" || ret=$?
psql -v ON_ERROR_STOP=1 -c "select fuzz('${{ matrix.case.algo }}',
'${{ matrix.case.pgtype }}', '${{ matrix.case.bulk }}', ${{ matrix.case.runs }});" || ret=$?
if ! [ $ret -eq 2 ]
then
>&2 echo "Unexpected psql exit code $ret"
@ -163,11 +239,38 @@ jobs:
# Check that the server is still alive.
psql -c "select 1"
ls db/corpus | wc -l
fn="ts_read_compressed_data_directory('${{ matrix.case.algo }}',
'${{ matrix.case.pgtype }}', 'corpus', '${{ matrix.case.bulk }}')"
# Show the statistics about fuzzing corpus
psql -c "select count(*), location, min(sqlstate), min(path)
from $fn
group by location order by count(*) desc
"
# Save interesting cases because the caches are not available for download from UI
mkdir -p interesting
psql -qtAX -c "select distinct on (location) 'db/' || path from $fn
order by location, bytes
" | xargs cp -t interesting
# Check that we don't have any internal errors
errors=$(psql -qtAX --set=ON_ERROR_STOP=1 -c "select count(*)
from $fn
where sqlstate = 'XX000'")
echo "Internal program errors: $errors"
[ $errors -eq 0 ] || exit 1
# Shouldn't have any WARNINGS in the log.
! grep -F "] WARNING: " postgres.log
- name: Collect the logs
if: always()
id: collectlogs
run: |
find . -name postmaster.log -exec cat {} + > postgres.log
# wait in case there are in-progress coredumps
sleep 10
if coredumpctl -q list >/dev/null; then echo "coredumps=true" >>$GITHUB_OUTPUT; fi
@ -178,26 +281,52 @@ jobs:
if: always()
uses: actions/upload-artifact@v3
with:
name: PostgreSQL log for ${{ matrix.case.algo }} ${{ matrix.case.type }}
name: PostgreSQL log for ${{ steps.config.outputs.name }}
path: postgres.log
- name: Save fuzzer-generated crash cases
if: always()
uses: actions/upload-artifact@v3
with:
name: Crash cases for ${{ matrix.case.algo }} ${{ matrix.case.type }}
name: Crash cases for ${{ steps.config.outputs.name }}
path: db/crash-*
- name: Save interesting cases
if: always()
uses: actions/upload-artifact@v3
with:
name: Interesting cases for ${{ steps.config.outputs.name }}
path: interesting/
# We use separate restore/save actions, because the default action won't
# save the updated folder after the cache hit. We also can't overwrite the
# existing cache, so we add a unique suffix. The cache is matched by key
# prefix, not exact key, and picks the newest matching item, so this works.
# save the updated folder after the cache hit. We also want to save the
# cache after fuzzing errors, and the default action doesn't save after
# errors.
# We can't overwrite the existing cache, so we add a unique suffix. The
# cache is matched by key prefix, not exact key, and picks the newest
# matching item, so this works.
# The caches for rowbyrow and bulk fuzzing are saved separately, otherwise
# the slower job would always overwrite the cache from the faster one. We
# want to combine corpuses from bulk and rowbyrow fuzzing for better
# coverage.
# Note that the cache action cannot be restored on a path different from the
# one it was saved from. To make our lives more interesting, it is not
# directly documented anywhere, but we can deduce it from path influencing
# the version.
- name: Change corpus path to please the 'actions/cache' GitHub Action
if: always()
run: |
rm -rf db/corpus-{bulk,rowbyrow} ||:
mv -fT db/corpus{,-${{ matrix.case.bulk && 'bulk' || 'rowbyrow' }}}
- name: Save fuzzer corpus
if: always()
uses: actions/cache/save@v3
with:
path: db/corpus
key: "${{ format('{0}-{1}-{2}',
steps.restore-corpus-cache.outputs.cache-primary-key,
path: db/corpus-${{ matrix.case.bulk && 'bulk' || 'rowbyrow' }}
key: "${{ format('{0}-{1}-{2}-{3}',
steps.config.outputs.cache_prefix,
matrix.case.bulk && 'bulk' || 'rowbyrow',
github.run_id, github.run_attempt) }}"
- name: Stack trace
@ -224,5 +353,5 @@ jobs:
if: always() && steps.collectlogs.outputs.coredumps == 'true'
uses: actions/upload-artifact@v3
with:
name: Coredumps for ${{ matrix.case.algo }} ${{ matrix.case.type }}
name: Coredumps for ${{ steps.config.outputs.name }}
path: coredumps

View File

@ -347,8 +347,6 @@ bit_array_append_bucket(BitArray *array, uint8 bits_used, uint64 bucket)
static uint64
bit_array_low_bits_mask(uint8 bits_used)
{
if (bits_used >= 64)
return PG_UINT64_MAX;
else
return (UINT64CONST(1) << bits_used) - UINT64CONST(1);
Assert(bits_used > 0);
return -1ULL >> (64 - bits_used);
}

View File

@ -2,6 +2,8 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/api.c
${CMAKE_CURRENT_SOURCE_DIR}/array.c
${CMAKE_CURRENT_SOURCE_DIR}/compression.c
${CMAKE_CURRENT_SOURCE_DIR}/compression_test.c
${CMAKE_CURRENT_SOURCE_DIR}/decompress_text_test_impl.c
${CMAKE_CURRENT_SOURCE_DIR}/create.c
${CMAKE_CURRENT_SOURCE_DIR}/datum_serialize.c
${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c

View File

@ -240,6 +240,7 @@ array_compression_serialization_size(ArrayCompressorSerializationInfo *info)
uint32
array_compression_serialization_num_elements(ArrayCompressorSerializationInfo *info)
{
CheckCompressedData(info->sizes != NULL);
return info->sizes->num_elements;
}
@ -405,12 +406,12 @@ array_decompression_iterator_try_next_forward(DecompressionIterator *general_ite
.is_done = true,
};
Assert(iter->data_offset + datum_size.val <= iter->num_data_bytes);
CheckCompressedData(iter->data_offset + datum_size.val <= iter->num_data_bytes);
start_pointer = iter->data + iter->data_offset;
val = bytes_to_datum_and_advance(iter->deserializer, &start_pointer);
iter->data_offset += datum_size.val;
Assert(iter->data + iter->data_offset == start_pointer);
CheckCompressedData(iter->data + iter->data_offset == start_pointer);
return (DecompressResult){
.val = val,
@ -602,7 +603,6 @@ array_compressed_data_send(StringInfo buffer, const char *_serialized_data, Size
Datum
array_compressed_recv(StringInfo buffer)
{
ArrayCompressorSerializationInfo *data;
uint8 has_nulls;
Oid element_type;
@ -611,9 +611,12 @@ array_compressed_recv(StringInfo buffer)
element_type = binary_string_get_type(buffer);
data = array_compressed_data_recv(buffer, element_type);
ArrayCompressorSerializationInfo *info = array_compressed_data_recv(buffer, element_type);
PG_RETURN_POINTER(array_compressed_from_serialization_info(data, element_type));
CheckCompressedData(info->sizes != NULL);
CheckCompressedData(has_nulls == (info->nulls != NULL));
PG_RETURN_POINTER(array_compressed_from_serialization_info(info, element_type));
}
void

View File

@ -18,7 +18,6 @@
#include <common/base64.h>
#include <executor/nodeIndexscan.h>
#include <executor/tuptable.h>
#include <funcapi.h>
#include <libpq/pqformat.h>
#include <miscadmin.h>
#include <nodes/nodeFuncs.h>
@ -42,16 +41,15 @@
#include <utils/tuplesort.h>
#include <utils/typcache.h>
#include <replication/message.h>
#include <math.h>
#include "compat/compat.h"
#include "array.h"
#include "chunk.h"
#include "compression.h"
#include "compression_test.h"
#include "create.h"
#include "custom_type_cache.h"
#include "arrow_c_data_interface.h"
#include "debug_assert.h"
#include "debug_point.h"
#include "deltadelta.h"
@ -2219,365 +2217,12 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
table_close(in_rel, NoLock);
}
#if !defined(NDEBUG) || defined(TS_COMPRESSION_FUZZING)
static int
get_compression_algorithm(char *name)
const CompressionAlgorithmDefinition *
algorithm_definition(CompressionAlgorithm algo)
{
if (pg_strcasecmp(name, "deltadelta") == 0)
{
return COMPRESSION_ALGORITHM_DELTADELTA;
Assert(algo > 0 && algo < _END_COMPRESSION_ALGORITHMS);
return &definitions[algo];
}
else if (pg_strcasecmp(name, "gorilla") == 0)
{
return COMPRESSION_ALGORITHM_GORILLA;
}
ereport(ERROR, (errmsg("unknown comrpession algorithm %s", name)));
return _INVALID_COMPRESSION_ALGORITHM;
}
#define ALGO gorilla
#define CTYPE float8
#define PGTYPE FLOAT8OID
#define DATUM_TO_CTYPE DatumGetFloat8
#include "decompress_test_impl.c"
#undef ALGO
#undef CTYPE
#undef PGTYPE
#undef DATUM_TO_CTYPE
#define ALGO deltadelta
#define CTYPE int64
#define PGTYPE INT8OID
#define DATUM_TO_CTYPE DatumGetInt64
#include "decompress_test_impl.c"
#undef ALGO
#undef CTYPE
#undef PGTYPE
#undef DATUM_TO_CTYPE
static int (*get_decompress_fn(int algo, Oid type))(const uint8 *Data, size_t Size,
bool extra_checks)
{
if (algo == COMPRESSION_ALGORITHM_GORILLA && type == FLOAT8OID)
{
return decompress_gorilla_float8;
}
else if (algo == COMPRESSION_ALGORITHM_DELTADELTA && type == INT8OID)
{
return decompress_deltadelta_int64;
}
elog(ERROR,
"no decompression function for compression algorithm %d with element type %d",
algo,
type);
pg_unreachable();
}
/*
* Read and decompress compressed data from file. Useful for debugging the
* results of fuzzing.
* The out parameter bytes is volatile because we want to fill it even
* if we error out later.
*/
static void
read_compressed_data_file_impl(int algo, Oid type, const char *path, volatile int *bytes, int *rows)
{
FILE *f = fopen(path, "r");
if (!f)
{
elog(ERROR, "could not open the file '%s'", path);
}
fseek(f, 0, SEEK_END);
const size_t fsize = ftell(f);
fseek(f, 0, SEEK_SET); /* same as rewind(f); */
*rows = 0;
*bytes = fsize;
if (fsize == 0)
{
/*
* Skip empty data, because we'll just get "no data left in message"
* right away.
*/
return;
}
char *string = palloc(fsize + 1);
size_t elements_read = fread(string, fsize, 1, f);
if (elements_read != 1)
{
elog(ERROR, "failed to read file '%s'", path);
}
fclose(f);
string[fsize] = 0;
*rows = get_decompress_fn(algo, type)((const uint8 *) string, fsize, /* extra_checks = */ true);
}
TS_FUNCTION_INFO_V1(ts_read_compressed_data_file);
/* Read and decompress compressed data from file -- SQL-callable wrapper. */
Datum
ts_read_compressed_data_file(PG_FUNCTION_ARGS)
{
int rows;
int bytes;
read_compressed_data_file_impl(get_compression_algorithm(PG_GETARG_CSTRING(0)),
PG_GETARG_OID(1),
PG_GETARG_CSTRING(2),
&bytes,
&rows);
PG_RETURN_INT32(rows);
}
TS_FUNCTION_INFO_V1(ts_read_compressed_data_directory);
/*
* Read and decomrpess all compressed data files from directory. Useful for
* checking the fuzzing corpuses in the regression tests.
*/
Datum
ts_read_compressed_data_directory(PG_FUNCTION_ARGS)
{
/* Output columns of this function. */
enum
{
out_path = 0,
out_bytes,
out_rows,
out_sqlstate,
out_location,
_out_columns
};
/* Cross-call context for this set-returning function. */
struct user_context
{
DIR *dp;
struct dirent *ep;
};
char *name = PG_GETARG_CSTRING(2);
const int algo = get_compression_algorithm(PG_GETARG_CSTRING(0));
FuncCallContext *funcctx;
struct user_context *c;
MemoryContext call_memory_context = CurrentMemoryContext;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &funcctx->tuple_desc) != TYPEFUNC_COMPOSITE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
/*
* generate attribute metadata needed later to produce tuples from raw
* C strings
*/
funcctx->attinmeta = TupleDescGetAttInMetadata(funcctx->tuple_desc);
funcctx->user_fctx = palloc(sizeof(struct user_context));
c = funcctx->user_fctx;
c->dp = opendir(name);
if (!c->dp)
{
elog(ERROR, "could not open directory '%s'", name);
}
MemoryContextSwitchTo(call_memory_context);
}
funcctx = SRF_PERCALL_SETUP();
c = (struct user_context *) funcctx->user_fctx;
Datum values[_out_columns] = { 0 };
bool nulls[_out_columns] = { 0 };
for (int i = 0; i < _out_columns; i++)
{
nulls[i] = true;
}
while ((c->ep = readdir(c->dp)))
{
if (c->ep->d_name[0] == '.')
{
continue;
}
char *path = psprintf("%s/%s", name, c->ep->d_name);
/* The return values are: path, ret, sqlstate, status, location. */
values[out_path] = PointerGetDatum(cstring_to_text(path));
nulls[out_path] = false;
int rows;
volatile int bytes = 0;
PG_TRY();
{
read_compressed_data_file_impl(algo, PG_GETARG_OID(1), path, &bytes, &rows);
values[out_rows] = Int32GetDatum(rows);
nulls[out_rows] = false;
}
PG_CATCH();
{
MemoryContextSwitchTo(call_memory_context);
ErrorData *error = CopyErrorData();
values[out_sqlstate] =
PointerGetDatum(cstring_to_text(unpack_sql_state(error->sqlerrcode)));
nulls[out_sqlstate] = false;
if (error->filename)
{
values[out_location] = PointerGetDatum(
cstring_to_text(psprintf("%s:%d", error->filename, error->lineno)));
nulls[out_location] = false;
}
FlushErrorState();
}
PG_END_TRY();
values[out_bytes] = Int32GetDatum(bytes);
nulls[out_bytes] = false;
SRF_RETURN_NEXT(funcctx,
HeapTupleGetDatum(heap_form_tuple(funcctx->tuple_desc, values, nulls)));
}
(void) closedir(c->dp);
SRF_RETURN_DONE(funcctx);
}
#endif
#ifdef TS_COMPRESSION_FUZZING
/*
* This is our test function that will be called by the libfuzzer driver. It
* has to catch the postgres exceptions normally produced for corrupt data.
*/
static int
llvm_fuzz_target_generic(int (*target)(const uint8_t *Data, size_t Size, bool extra_checks),
const uint8_t *Data, size_t Size)
{
MemoryContextReset(CurrentMemoryContext);
PG_TRY();
{
CHECK_FOR_INTERRUPTS();
target(Data, Size, /* extra_checks = */ false);
}
PG_CATCH();
{
FlushErrorState();
}
PG_END_TRY();
/* We always return 0, and -1 would mean "don't include it into corpus". */
return 0;
}
static int
llvm_fuzz_target_gorilla_float8(const uint8_t *Data, size_t Size)
{
return llvm_fuzz_target_generic(decompress_gorilla_float8, Data, Size);
}
static int
llvm_fuzz_target_deltadelta_int64(const uint8_t *Data, size_t Size)
{
return llvm_fuzz_target_generic(decompress_deltadelta_int64, Data, Size);
}
/*
* libfuzzer fuzzing driver that we import from LLVM libraries. It will run our
* test functions with random inputs.
*/
extern int LLVMFuzzerRunDriver(int *argc, char ***argv,
int (*UserCb)(const uint8_t *Data, size_t Size));
/*
* The SQL function to perform fuzzing.
*/
TS_FUNCTION_INFO_V1(ts_fuzz_compression);
Datum
ts_fuzz_compression(PG_FUNCTION_ARGS)
{
/*
* We use the memory context size larger than default here, so that all data
* allocated by fuzzing fit into the first chunk. The first chunk is not
* deallocated when the memory context is reset, so this reduces overhead
* caused by repeated reallocations.
* The particular value of 8MB is somewhat arbitrary and large. In practice,
* we have inputs of 1k rows max here, which decompress to 8 kB max.
*/
MemoryContext fuzzing_context =
AllocSetContextCreate(CurrentMemoryContext, "fuzzing", 0, 8 * 1024 * 1024, 8 * 1024 * 1024);
MemoryContext old_context = MemoryContextSwitchTo(fuzzing_context);
char *argvdata[] = { "PostgresFuzzer",
"-timeout=1",
"-report_slow_units=1",
// "-use_value_profile=1",
"-reload=1",
//"-print_coverage=1",
//"-print_full_coverage=1",
//"-print_final_stats=1",
//"-help=1",
psprintf("-runs=%d", PG_GETARG_INT32(2)),
"corpus" /* in the database directory */,
NULL };
char **argv = argvdata;
int argc = sizeof(argvdata) / sizeof(*argvdata) - 1;
int algo = get_compression_algorithm(PG_GETARG_CSTRING(0));
Oid type = PG_GETARG_OID(1);
int (*target)(const uint8_t *, size_t);
if (algo == COMPRESSION_ALGORITHM_GORILLA && type == FLOAT8OID)
{
target = llvm_fuzz_target_gorilla_float8;
}
else if (algo == COMPRESSION_ALGORITHM_DELTADELTA && type == INT8OID)
{
target = llvm_fuzz_target_deltadelta_int64;
}
else
{
elog(ERROR, "no llvm fuzz target for compression algorithm %d and type %d", algo, type);
}
int res = LLVMFuzzerRunDriver(&argc, &argv, target);
MemoryContextSwitchTo(old_context);
PG_RETURN_INT32(res);
}
#endif
#if PG14_GE
static BatchFilter *

View File

@ -376,22 +376,21 @@ extern enum CompressionAlgorithms compress_get_default_algorithm(Oid typeoid);
* to pollute the logs.
*/
#ifndef TS_COMPRESSION_FUZZING
#define CORRUPT_DATA_MESSAGE \
(errmsg("the compressed data is corrupt"), errcode(ERRCODE_DATA_CORRUPTED))
#define CORRUPT_DATA_MESSAGE(X) \
(errmsg("the compressed data is corrupt"), errdetail(X), errcode(ERRCODE_DATA_CORRUPTED))
#else
#define CORRUPT_DATA_MESSAGE (errcode(ERRCODE_DATA_CORRUPTED))
#define CORRUPT_DATA_MESSAGE(X) (errcode(ERRCODE_DATA_CORRUPTED))
#endif
#define CheckCompressedData(X) \
if (unlikely(!(X))) \
ereport(ERROR, CORRUPT_DATA_MESSAGE)
ereport(ERROR, CORRUPT_DATA_MESSAGE(#X))
inline static void *
consumeCompressedData(StringInfo si, int bytes)
{
CheckCompressedData(bytes >= 0);
CheckCompressedData(bytes < PG_INT32_MAX / 2);
CheckCompressedData(si->cursor + bytes >= 0);
CheckCompressedData(si->cursor + bytes >= si->cursor); /* Check for overflow. */
CheckCompressedData(si->cursor + bytes <= si->len);
void *result = si->data + si->cursor;

View File

@ -0,0 +1,459 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <math.h>
#include <postgres.h>
#include <libpq/pqformat.h>
#include <funcapi.h>
#include <utils/builtins.h>
#include "compression_test.h"
#include "compression.h"
#include "arrow_c_data_interface.h"
#if !defined(NDEBUG) || defined(TS_COMPRESSION_FUZZING)
static int
get_compression_algorithm(char *name)
{
if (pg_strcasecmp(name, "deltadelta") == 0)
{
return COMPRESSION_ALGORITHM_DELTADELTA;
}
else if (pg_strcasecmp(name, "gorilla") == 0)
{
return COMPRESSION_ALGORITHM_GORILLA;
}
else if (pg_strcasecmp(name, "array") == 0)
{
return COMPRESSION_ALGORITHM_ARRAY;
}
else if (pg_strcasecmp(name, "dictionary") == 0)
{
return COMPRESSION_ALGORITHM_DICTIONARY;
}
ereport(ERROR, (errmsg("unknown comrpession algorithm %s", name)));
return _INVALID_COMPRESSION_ALGORITHM;
}
/*
* Specializations of test functions for arithmetic types.
*/
#define ALGO GORILLA
#define CTYPE float8
#define PG_TYPE_PREFIX FLOAT8
#define DATUM_TO_CTYPE DatumGetFloat8
#include "decompress_arithmetic_test_impl.c"
#undef ALGO
#undef CTYPE
#undef PG_TYPE_PREFIX
#undef DATUM_TO_CTYPE
#define ALGO DELTADELTA
#define CTYPE int64
#define PG_TYPE_PREFIX INT8
#define DATUM_TO_CTYPE DatumGetInt64
#include "decompress_arithmetic_test_impl.c"
#undef ALGO
#undef CTYPE
#undef PG_TYPE_PREFIX
#undef DATUM_TO_CTYPE
/*
* The table of the supported testing configurations. We use it to generate
* dispatch tables and specializations of test functions.
*/
#define APPLY_FOR_TYPES(X) \
X(GORILLA, FLOAT8, true) \
X(GORILLA, FLOAT8, false) \
X(DELTADELTA, INT8, true) \
X(DELTADELTA, INT8, false) \
X(ARRAY, TEXT, false) \
X(DICTIONARY, TEXT, false)
static int (*get_decompress_fn(int algo, Oid type))(const uint8 *Data, size_t Size, bool bulk)
{
#define DISPATCH(ALGO, PGTYPE, BULK) \
if (algo == COMPRESSION_ALGORITHM_##ALGO && type == PGTYPE##OID) \
{ \
return decompress_##ALGO##_##PGTYPE; \
}
APPLY_FOR_TYPES(DISPATCH)
elog(ERROR,
"no decompression function for compression algorithm %d with element type %d",
algo,
type);
pg_unreachable();
#undef DISPATCH
}
/*
* Read and decompress compressed data from file. Useful for debugging the
* results of fuzzing.
* The out parameter bytes is volatile because we want to fill it even
* if we error out later.
*/
static void
read_compressed_data_file_impl(int algo, Oid type, const char *path, bool bulk, volatile int *bytes,
int *rows)
{
FILE *f = fopen(path, "r");
if (!f)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FILE), errmsg("could not open the file '%s'", path)));
}
fseek(f, 0, SEEK_END);
const size_t fsize = ftell(f);
fseek(f, 0, SEEK_SET); /* same as rewind(f); */
*rows = 0;
*bytes = fsize;
if (fsize == 0)
{
/*
* Skip empty data, because we'll just get "no data left in message"
* right away.
*/
return;
}
char *string = palloc(fsize + 1);
size_t elements_read = fread(string, fsize, 1, f);
if (elements_read != 1)
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FILE), errmsg("failed to read file '%s'", path)));
}
fclose(f);
string[fsize] = 0;
*rows = get_decompress_fn(algo, type)((const uint8 *) string, fsize, bulk);
}
TS_FUNCTION_INFO_V1(ts_read_compressed_data_file);
/* Read and decompress compressed data from file -- SQL-callable wrapper. */
Datum
ts_read_compressed_data_file(PG_FUNCTION_ARGS)
{
int rows;
int bytes;
read_compressed_data_file_impl(get_compression_algorithm(PG_GETARG_CSTRING(0)),
PG_GETARG_OID(1),
PG_GETARG_CSTRING(2),
PG_GETARG_BOOL(3),
&bytes,
&rows);
PG_RETURN_INT32(rows);
}
TS_FUNCTION_INFO_V1(ts_read_compressed_data_directory);
/*
* Read and decomrpess all compressed data files from directory. Useful for
* checking the fuzzing corpuses in the regression tests.
*/
Datum
ts_read_compressed_data_directory(PG_FUNCTION_ARGS)
{
/* Output columns of this function. */
enum
{
out_path = 0,
out_bytes,
out_rows,
out_sqlstate,
out_location,
_out_columns
};
/* Cross-call context for this set-returning function. */
struct user_context
{
DIR *dp;
struct dirent *ep;
};
char *name = PG_GETARG_CSTRING(2);
const int algo = get_compression_algorithm(PG_GETARG_CSTRING(0));
FuncCallContext *funcctx;
struct user_context *c;
MemoryContext call_memory_context = CurrentMemoryContext;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &funcctx->tuple_desc) != TYPEFUNC_COMPOSITE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
/*
* generate attribute metadata needed later to produce tuples from raw
* C strings
*/
funcctx->attinmeta = TupleDescGetAttInMetadata(funcctx->tuple_desc);
funcctx->user_fctx = palloc(sizeof(struct user_context));
c = funcctx->user_fctx;
c->dp = opendir(name);
if (!c->dp)
{
elog(ERROR, "could not open directory '%s'", name);
}
MemoryContextSwitchTo(call_memory_context);
}
funcctx = SRF_PERCALL_SETUP();
c = (struct user_context *) funcctx->user_fctx;
Datum values[_out_columns] = { 0 };
bool nulls[_out_columns] = { 0 };
for (int i = 0; i < _out_columns; i++)
{
nulls[i] = true;
}
while ((c->ep = readdir(c->dp)))
{
if (c->ep->d_name[0] == '.')
{
continue;
}
char *path = psprintf("%s/%s", name, c->ep->d_name);
/* The return values are: path, ret, sqlstate, status, location. */
values[out_path] = PointerGetDatum(cstring_to_text(path));
nulls[out_path] = false;
int rows;
volatile int bytes = 0;
PG_TRY();
{
read_compressed_data_file_impl(algo,
PG_GETARG_OID(1),
path,
PG_GETARG_BOOL(3),
&bytes,
&rows);
values[out_rows] = Int32GetDatum(rows);
nulls[out_rows] = false;
}
PG_CATCH();
{
MemoryContextSwitchTo(call_memory_context);
ErrorData *error = CopyErrorData();
values[out_sqlstate] =
PointerGetDatum(cstring_to_text(unpack_sql_state(error->sqlerrcode)));
nulls[out_sqlstate] = false;
if (error->filename)
{
values[out_location] = PointerGetDatum(
cstring_to_text(psprintf("%s:%d", error->filename, error->lineno)));
nulls[out_location] = false;
}
FlushErrorState();
}
PG_END_TRY();
values[out_bytes] = Int32GetDatum(bytes);
nulls[out_bytes] = false;
SRF_RETURN_NEXT(funcctx,
HeapTupleGetDatum(heap_form_tuple(funcctx->tuple_desc, values, nulls)));
}
(void) closedir(c->dp);
SRF_RETURN_DONE(funcctx);
}
#endif
#ifdef TS_COMPRESSION_FUZZING
/*
* Fuzzing target for all supported types.
*/
static int
target_generic(const uint8 *Data, size_t Size, CompressionAlgorithm requested_algo, Oid pg_type,
bool bulk)
{
StringInfoData si = { .data = (char *) Data, .len = Size };
const CompressionAlgorithm data_algo = pq_getmsgbyte(&si);
CheckCompressedData(data_algo > 0 && data_algo < _END_COMPRESSION_ALGORITHMS);
if (data_algo != requested_algo)
{
/*
* It's convenient to fuzz only one algorithm at a time. We specialize
* the fuzz target for one algorithm, so that the fuzzer doesn't waste
* time discovering others from scratch.
*/
return -1;
}
const CompressionAlgorithmDefinition *def = algorithm_definition(data_algo);
Datum compressed_data = def->compressed_data_recv(&si);
if (bulk)
{
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(data_algo, pg_type);
decompress_all(compressed_data, pg_type, CurrentMemoryContext);
return 0;
}
DecompressionIterator *iter = def->iterator_init_forward(compressed_data, pg_type);
for (DecompressResult r = iter->try_next(iter); !r.is_done; r = iter->try_next(iter))
;
return 0;
}
/*
* This is a wrapper for fuzzing target. It will called by the libfuzzer driver.
* It has to catch the postgres exceptions normally produced for corrupt data.
*/
static int
target_wrapper(const uint8_t *Data, size_t Size, CompressionAlgorithm requested_algo, Oid pg_type,
bool bulk)
{
MemoryContextReset(CurrentMemoryContext);
int res = 0;
PG_TRY();
{
CHECK_FOR_INTERRUPTS();
res = target_generic(Data, Size, requested_algo, pg_type, bulk);
}
PG_CATCH();
{
/* EmitErrorReport(); */
FlushErrorState();
}
PG_END_TRY();
/*
* -1 means "don't include it into corpus", return it if the test function
* says so, otherwise return 0. Some test functions also returns the number
* of rows for the correct data, the fuzzer doesn't understand these values.
*/
return res == -1 ? -1 : 0;
}
/*
* Specializations of fuzzing targets for supported types that will be directly
* called by the fuzzing driver.
*/
#define DECLARE_TARGET(ALGO, PGTYPE, BULK) \
static int target_##ALGO##_##PGTYPE##_##BULK(const uint8_t *D, size_t S) \
{ \
return target_wrapper(D, S, COMPRESSION_ALGORITHM_##ALGO, PGTYPE##OID, BULK); \
}
APPLY_FOR_TYPES(DECLARE_TARGET)
#undef DECLARE_TARGET
/*
* libfuzzer fuzzing driver that we import from LLVM libraries. It will run our
* test functions with random inputs.
*/
extern int LLVMFuzzerRunDriver(int *argc, char ***argv,
int (*UserCb)(const uint8_t *Data, size_t Size));
/*
* The SQL function to perform fuzzing.
*/
TS_FUNCTION_INFO_V1(ts_fuzz_compression);
Datum
ts_fuzz_compression(PG_FUNCTION_ARGS)
{
/*
* We use the memory context size larger than default here, so that all data
* allocated by fuzzing fit into the first chunk. The first chunk is not
* deallocated when the memory context is reset, so this reduces overhead
* caused by repeated reallocations.
* The particular value of 8MB is somewhat arbitrary and large. In practice,
* we have inputs of 1k rows max here, which decompress to 8 kB max.
*/
MemoryContext fuzzing_context =
AllocSetContextCreate(CurrentMemoryContext, "fuzzing", 0, 8 * 1024 * 1024, 8 * 1024 * 1024);
MemoryContext old_context = MemoryContextSwitchTo(fuzzing_context);
char *argvdata[] = { "PostgresFuzzer",
"-timeout=1",
"-report_slow_units=1",
// "-use_value_profile=1",
"-reload=1",
//"-print_coverage=1",
//"-print_full_coverage=1",
//"-print_final_stats=1",
//"-help=1",
psprintf("-runs=%d", PG_GETARG_INT32(3)),
"corpus" /* in the database directory */,
NULL };
char **argv = argvdata;
int argc = sizeof(argvdata) / sizeof(*argvdata) - 1;
int algo = get_compression_algorithm(PG_GETARG_CSTRING(0));
Oid type = PG_GETARG_OID(1);
bool bulk = PG_GETARG_BOOL(2);
int (*target)(const uint8_t *, size_t) = NULL;
#define DISPATCH(ALGO, PGTYPE, BULK) \
if (algo == COMPRESSION_ALGORITHM_##ALGO && type == PGTYPE##OID && bulk == BULK) \
{ \
target = target_##ALGO##_##PGTYPE##_##BULK; \
}
APPLY_FOR_TYPES(DISPATCH)
#undef DISPATCH
if (target == NULL)
{
elog(ERROR, "no llvm fuzz target for compression algorithm %d and type %d", algo, type);
}
int res = LLVMFuzzerRunDriver(&argc, &argv, target);
MemoryContextSwitchTo(old_context);
PG_RETURN_INT32(res);
}
#endif

View File

@ -0,0 +1,14 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#pragma once
#include "compression.h"
int decompress_ARRAY_TEXT(const uint8 *Data, size_t Size, bool bulk);
int decompress_DICTIONARY_TEXT(const uint8 *Data, size_t Size, bool bulk);
const CompressionAlgorithmDefinition *algorithm_definition(CompressionAlgorithm algo);

View File

@ -20,6 +20,8 @@
#include "datum_serialize.h"
#include "compat/compat.h"
#include "compression.h"
typedef struct DatumSerializer
{
Oid type_oid;
@ -305,6 +307,22 @@ bytes_to_datum_and_advance(DatumDeserializer *deserializer, const char **ptr)
*ptr =
(Pointer) att_align_pointer(*ptr, deserializer->type_align, deserializer->type_len, *ptr);
if (deserializer->type_len == -1)
{
/*
* Check for potentially corrupt varlena headers since we're reading them
* directly from compressed data. We can only have a plain datum
* with 1-byte or 4-byte header here, no TOAST or compressed data.
*/
CheckCompressedData(VARATT_IS_4B_U(*ptr) || (VARATT_IS_1B(*ptr) && !VARATT_IS_1B_E(*ptr)));
/*
* Full varsize must be larger or equal than the header size so that the
* calculation of size without header doesn't overflow.
*/
CheckCompressedData((VARATT_IS_1B(*ptr) && VARSIZE_1B(*ptr) >= VARHDRSZ_SHORT) ||
(VARSIZE_4B(*ptr) > VARHDRSZ));
}
res = fetch_att(*ptr, deserializer->type_by_val, deserializer->type_len);
*ptr = att_addlength_pointer(*ptr, deserializer->type_len, *ptr);
return res;
@ -343,8 +361,7 @@ binary_string_get_type(StringInfo buffer)
Anum_pg_type_oid,
PointerGetDatum(element_type_name),
ObjectIdGetDatum(namespace_oid));
if (!OidIsValid(type_oid))
elog(ERROR, "could not find type %s.%s", element_type_namespace, element_type_name);
CheckCompressedData(OidIsValid(type_oid));
return type_oid;
}

View File

@ -0,0 +1,216 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#define FUNCTION_NAME_HELPER3(X, Y, Z) X##_##Y##_##Z
#define FUNCTION_NAME3(X, Y, Z) FUNCTION_NAME_HELPER3(X, Y, Z)
#define FUNCTION_NAME_HELPER2(X, Y) X##_##Y
#define FUNCTION_NAME2(X, Y) FUNCTION_NAME_HELPER2(X, Y)
#define PG_TYPE_OID_HELPER(X) X##OID
#define PG_TYPE_OID_HELPER2(X) PG_TYPE_OID_HELPER(X)
#define PG_TYPE_OID PG_TYPE_OID_HELPER2(PG_TYPE_PREFIX)
static void
FUNCTION_NAME2(check_arrow, CTYPE)(ArrowArray *arrow, int error_type, DecompressResult *results,
int n)
{
if (n != arrow->length)
{
ereport(error_type,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("Expected %d elements, got %d.", n, (int) arrow->length)));
}
for (int i = 0; i < n; i++)
{
const bool arrow_isnull = !arrow_row_is_valid(arrow->buffers[0], i);
if (arrow_isnull != results[i].is_null)
{
ereport(error_type,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("Expected null %d, got %d at row %d.",
results[i].is_null,
arrow_isnull,
i)));
}
if (!results[i].is_null)
{
const CTYPE arrow_value = ((CTYPE *) arrow->buffers[1])[i];
const CTYPE rowbyrow_value = DATUM_TO_CTYPE(results[i].val);
/*
* Floats can also be NaN/infinite and the comparison doesn't
* work in that case.
*/
if (isfinite((double) arrow_value) != isfinite((double) rowbyrow_value))
{
ereport(error_type,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("At row %d\n", i)));
}
if (isfinite((double) arrow_value) && arrow_value != rowbyrow_value)
{
ereport(error_type,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("At row %d\n", i)));
}
}
}
}
/*
* Try to decompress the given compressed data. Used for fuzzing and for checking
* the examples found by fuzzing. For fuzzing we do less checks to keep it
* faster and the coverage space smaller. This is a generic implementation
* for arithmetic types.
*/
static int
FUNCTION_NAME3(decompress, ALGO, PG_TYPE_PREFIX)(const uint8 *Data, size_t Size, bool bulk)
{
StringInfoData si = { .data = (char *) Data, .len = Size };
const int data_algo = pq_getmsgbyte(&si);
CheckCompressedData(data_algo > 0 && data_algo < _END_COMPRESSION_ALGORITHMS);
if (data_algo != FUNCTION_NAME2(COMPRESSION_ALGORITHM, ALGO))
{
/*
* It's convenient to fuzz only one algorithm at a time. We specialize
* the fuzz target for one algorithm, so that the fuzzer doesn't waste
* time discovering others from scratch.
*/
return -1;
}
const CompressionAlgorithmDefinition *def = algorithm_definition(data_algo);
Datum compressed_data = def->compressed_data_recv(&si);
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(data_algo, PG_TYPE_OID);
ArrowArray *arrow = NULL;
if (bulk)
{
/*
* Test bulk decompression. Have to do this before row-by-row decompression
* so that the latter doesn't hide the errors.
*/
arrow = decompress_all(compressed_data, PG_TYPE_OID, CurrentMemoryContext);
}
/*
* Test row-by-row decompression.
*/
DecompressionIterator *iter = def->iterator_init_forward(compressed_data, PG_TYPE_OID);
DecompressResult results[GLOBAL_MAX_ROWS_PER_COMPRESSION];
int n = 0;
for (DecompressResult r = iter->try_next(iter); !r.is_done; r = iter->try_next(iter))
{
if (n >= GLOBAL_MAX_ROWS_PER_COMPRESSION)
{
elog(ERROR, "too many compressed rows");
}
results[n++] = r;
}
/* Check that both ways of decompression match. */
if (bulk)
{
FUNCTION_NAME2(check_arrow, CTYPE)(arrow, ERROR, results, n);
return n;
}
/*
* For row-by-row decompression, check that the result is still the same
* after we compress and decompress back.
*
* 1) Compress.
*/
Compressor *compressor = def->compressor_for_type(PG_TYPE_OID);
for (int i = 0; i < n; i++)
{
if (results[i].is_null)
{
compressor->append_null(compressor);
}
else
{
compressor->append_val(compressor, results[i].val);
}
}
compressed_data = (Datum) compressor->finish(compressor);
if (compressed_data == 0)
{
/* The gorilla compressor returns NULL for all-null input sets. */
return n;
};
/*
* 2) Decompress and check that it's the same.
*/
iter = def->iterator_init_forward(compressed_data, PG_TYPE_OID);
int nn = 0;
for (DecompressResult r = iter->try_next(iter); !r.is_done; r = iter->try_next(iter))
{
if (r.is_null != results[nn].is_null)
{
elog(ERROR, "the repeated decompression result doesn't match");
}
if (!r.is_null)
{
CTYPE old_value = DATUM_TO_CTYPE(results[nn].val);
CTYPE new_value = DATUM_TO_CTYPE(r.val);
/*
* Floats can also be NaN/infinite and the comparison doesn't
* work in that case.
*/
if (isfinite((double) old_value) != isfinite((double) new_value))
{
elog(ERROR, "the repeated decompression result doesn't match");
}
if (isfinite((double) old_value) && old_value != new_value)
{
elog(ERROR, "the repeated decompression result doesn't match");
}
}
nn++;
if (nn > n)
{
elog(ERROR, "the repeated recompression result doesn't match");
}
}
/*
* 3) The bulk decompression must absolutely work on the correct compressed
* data we've just generated.
*/
arrow = decompress_all(compressed_data, PG_TYPE_OID, CurrentMemoryContext);
FUNCTION_NAME2(check_arrow, CTYPE)(arrow, PANIC, results, n);
return n;
}
#undef FUNCTION_NAME3
#undef FUNCTION_NAME_HELPER3
#undef FUNCTION_NAME2
#undef FUNCTION_NAME_HELPER2
#undef PG_TYPE_OID
#undef PG_TYPE_OID_HELPER
#undef PG_TYPE_OID_HELPER2

View File

@ -1,204 +0,0 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#define FUNCTION_NAME_HELPER(X, Y) decompress_##X##_##Y
#define FUNCTION_NAME(X, Y) FUNCTION_NAME_HELPER(X, Y)
#define TOSTRING_HELPER(x) #x
#define TOSTRING(x) TOSTRING_HELPER(x)
/*
* Try to decompress the given compressed data. Used for fuzzing and for checking
* the examples found by fuzzing. For fuzzing we do less checks to keep it
* faster and the coverage space smaller.
*/
static int
FUNCTION_NAME(ALGO, CTYPE)(const uint8 *Data, size_t Size, bool extra_checks)
{
StringInfoData si = { .data = (char *) Data, .len = Size };
const int algo = pq_getmsgbyte(&si);
CheckCompressedData(algo > 0 && algo < _END_COMPRESSION_ALGORITHMS);
if (algo != get_compression_algorithm(TOSTRING(ALGO)))
{
/*
* It's convenient to fuzz only one algorithm at a time. We specialize
* the fuzz target for one algorithm, so that the fuzzer doesn't waste
* time discovering others from scratch.
*/
return -1;
}
Datum compressed_data = definitions[algo].compressed_data_recv(&si);
if (!extra_checks)
{
/*
* For routine fuzzing, we only run bulk decompression to make it faster
* and the coverage space smaller.
*/
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE);
decompress_all(compressed_data, PGTYPE, CurrentMemoryContext);
return 0;
}
/*
* Test bulk decompression. This might hide some errors in the row-by-row
* decompression, but testing both is significantly more complicated, and
* the row-by-row is old and stable.
*/
ArrowArray *arrow = NULL;
DecompressAllFunction decompress_all = tsl_get_decompress_all_function(algo, PGTYPE);
if (decompress_all)
{
arrow = decompress_all(compressed_data, PGTYPE, CurrentMemoryContext);
}
/*
* Test row-by-row decompression.
*/
DecompressionIterator *iter = definitions[algo].iterator_init_forward(compressed_data, PGTYPE);
DecompressResult results[GLOBAL_MAX_ROWS_PER_COMPRESSION];
int n = 0;
for (DecompressResult r = iter->try_next(iter); !r.is_done; r = iter->try_next(iter))
{
if (n >= GLOBAL_MAX_ROWS_PER_COMPRESSION)
{
elog(ERROR, "too many compressed rows");
}
results[n++] = r;
}
/* Check that both ways of decompression match. */
if (arrow)
{
if (n != arrow->length)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("Expected %d elements, got %d.", n, (int) arrow->length)));
}
for (int i = 0; i < n; i++)
{
const bool arrow_isnull = !arrow_row_is_valid(arrow->buffers[0], i);
if (arrow_isnull != results[i].is_null)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("Expected null %d, got %d at row %d.",
results[i].is_null,
arrow_isnull,
i)));
}
if (!results[i].is_null)
{
const CTYPE arrow_value = ((CTYPE *) arrow->buffers[1])[i];
const CTYPE rowbyrow_value = DATUM_TO_CTYPE(results[i].val);
/*
* Floats can also be NaN/infinite and the comparison doesn't
* work in that case.
*/
if (isfinite((double) arrow_value) != isfinite((double) rowbyrow_value))
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("At row %d\n", i)));
}
if (isfinite((double) arrow_value) && arrow_value != rowbyrow_value)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("the bulk decompression result does not match"),
errdetail("At row %d\n", i)));
}
}
}
}
/*
* Check that the result is still the same after we compress and decompress
* back.
*
* 1) Compress.
*/
Compressor *compressor = definitions[algo].compressor_for_type(PGTYPE);
for (int i = 0; i < n; i++)
{
if (results[i].is_null)
{
compressor->append_null(compressor);
}
else
{
compressor->append_val(compressor, results[i].val);
}
}
compressed_data = (Datum) compressor->finish(compressor);
if (compressed_data == 0)
{
/* The gorilla compressor returns NULL for all-null input sets. */
return n;
};
/*
* 2) Decompress and check that it's the same.
*/
iter = definitions[algo].iterator_init_forward(compressed_data, PGTYPE);
int nn = 0;
for (DecompressResult r = iter->try_next(iter); !r.is_done; r = iter->try_next(iter))
{
if (r.is_null != results[nn].is_null)
{
elog(ERROR, "the repeated decompression result doesn't match");
}
if (!r.is_null)
{
CTYPE old_value = DATUM_TO_CTYPE(results[nn].val);
CTYPE new_value = DATUM_TO_CTYPE(r.val);
/*
* Floats can also be NaN/infinite and the comparison doesn't
* work in that case.
*/
if (isfinite((double) old_value) != isfinite((double) new_value))
{
elog(ERROR, "the repeated decompression result doesn't match");
}
if (isfinite((double) old_value) && old_value != new_value)
{
elog(ERROR, "the repeated decompression result doesn't match");
}
}
nn++;
if (nn > n)
{
elog(ERROR, "the repeated recompression result doesn't match");
}
}
return n;
}
#undef TOSTRING
#undef TOSTRING_HELPER
#undef FUNCTION_NAME
#undef FUNCTION_NAME_HELPER

View File

@ -0,0 +1,152 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <libpq/pqformat.h>
#include "compression.h"
#include "compression_test.h"
/*
* Try to decompress the given compressed data. Used for fuzzing and for checking
* the examples found by fuzzing. For fuzzing we do less checks to keep it
* faster and the coverage space smaller. This is a generic implementation
* for arithmetic types.
*/
static int
decompress_generic_text(const uint8 *Data, size_t Size, bool bulk, int requested_algo)
{
if (bulk)
{
elog(ERROR, "bulk decompression not supported for text");
}
StringInfoData si = { .data = (char *) Data, .len = Size };
const int data_algo = pq_getmsgbyte(&si);
CheckCompressedData(data_algo > 0 && data_algo < _END_COMPRESSION_ALGORITHMS);
if (data_algo != requested_algo)
{
/*
* It's convenient to fuzz only one algorithm at a time. We specialize
* the fuzz target for one algorithm, so that the fuzzer doesn't waste
* time discovering others from scratch.
*/
return -1;
}
const CompressionAlgorithmDefinition *def = algorithm_definition(data_algo);
Datum compressed_data = def->compressed_data_recv(&si);
/*
* Test row-by-row decompression.
*/
DecompressionIterator *iter = def->iterator_init_forward(compressed_data, TEXTOID);
DecompressResult results[GLOBAL_MAX_ROWS_PER_COMPRESSION];
int n = 0;
for (DecompressResult r = iter->try_next(iter); !r.is_done; r = iter->try_next(iter))
{
if (n >= GLOBAL_MAX_ROWS_PER_COMPRESSION)
{
elog(ERROR, "too many compressed rows");
}
results[n++] = r;
}
/*
* For row-by-row decompression, check that the result is still the same
* after we compress and decompress back.
* Don't perform this check for other types of tests.
*/
if (bulk)
{
return n;
}
/*
* 1) Compress.
*/
Compressor *compressor = def->compressor_for_type(TEXTOID);
for (int i = 0; i < n; i++)
{
if (results[i].is_null)
{
compressor->append_null(compressor);
}
else
{
compressor->append_val(compressor, results[i].val);
}
}
compressed_data = (Datum) compressor->finish(compressor);
if (compressed_data == 0)
{
/* Some compressors return NULL when all rows are null. */
return n;
}
/*
* 2) Decompress and check that it's the same.
*/
iter = def->iterator_init_forward(compressed_data, TEXTOID);
int nn = 0;
for (DecompressResult r = iter->try_next(iter); !r.is_done; r = iter->try_next(iter))
{
if (r.is_null != results[nn].is_null)
{
elog(ERROR, "the repeated decompression result doesn't match");
}
if (!r.is_null)
{
const Datum old_value = results[nn].val;
const Datum new_value = r.val;
/*
* Floats can also be NaN/infinite and the comparison doesn't
* work in that case.
*/
if (VARSIZE_ANY_EXHDR(old_value) != VARSIZE_ANY_EXHDR(new_value))
{
elog(ERROR, "the repeated decompression result doesn't match");
}
if (strncmp(VARDATA_ANY(old_value),
VARDATA_ANY(new_value),
VARSIZE_ANY_EXHDR(new_value)))
{
elog(ERROR, "the repeated decompression result doesn't match");
}
}
nn++;
if (nn > n)
{
elog(ERROR, "the repeated recompression result doesn't match");
}
}
return n;
}
int
decompress_ARRAY_TEXT(const uint8 *Data, size_t Size, bool bulk)
{
return decompress_generic_text(Data, Size, bulk, COMPRESSION_ALGORITHM_ARRAY);
}
int
decompress_DICTIONARY_TEXT(const uint8 *Data, size_t Size, bool bulk)
{
return decompress_generic_text(Data, Size, bulk, COMPRESSION_ALGORITHM_DICTIONARY);
}

View File

@ -75,6 +75,9 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
*
* Also tried using SIMD prefix sum from here twice:
* https://en.algorithmica.org/hpc/algorithms/prefix/, it's slower.
*
* Also tried zig-zag decoding in a separate loop, seems to be slightly
* slower, around the noise threshold.
*/
#define INNER_LOOP_SIZE 8
Assert(n_notnull_padded % INNER_LOOP_SIZE == 0);

View File

@ -450,7 +450,7 @@ dictionary_decompression_iterator_try_next_forward(DecompressionIterator *iter_b
.is_done = true,
};
Assert(result.val < iter->compressed->num_distinct);
CheckCompressedData(result.val < iter->compressed->num_distinct);
return (DecompressResult){
.val = iter->values[result.val],
.is_null = false,
@ -596,7 +596,7 @@ dictionary_compressed_send(CompressedDataHeader *header, StringInfo buffer)
Datum
dictionary_compressed_recv(StringInfo buffer)
{
DictionaryCompressorSerializationInfo data = { 0 };
DictionaryCompressorSerializationInfo info = { 0 };
uint8 has_nulls;
Oid element_type;
@ -604,27 +604,30 @@ dictionary_compressed_recv(StringInfo buffer)
CheckCompressedData(has_nulls == 0 || has_nulls == 1);
element_type = binary_string_get_type(buffer);
data.dictionary_compressed_indexes = simple8brle_serialized_recv(buffer);
data.bitmaps_size = simple8brle_serialized_total_size(data.dictionary_compressed_indexes);
data.total_size = MAXALIGN(sizeof(DictionaryCompressed)) + data.bitmaps_size;
info.dictionary_compressed_indexes = simple8brle_serialized_recv(buffer);
info.bitmaps_size = simple8brle_serialized_total_size(info.dictionary_compressed_indexes);
info.total_size = MAXALIGN(sizeof(DictionaryCompressed)) + info.bitmaps_size;
if (has_nulls)
{
data.compressed_nulls = simple8brle_serialized_recv(buffer);
data.nulls_size = simple8brle_serialized_total_size(data.compressed_nulls);
data.total_size += data.nulls_size;
info.compressed_nulls = simple8brle_serialized_recv(buffer);
info.nulls_size = simple8brle_serialized_total_size(info.compressed_nulls);
info.total_size += info.nulls_size;
}
data.dictionary_serialization_info = array_compressed_data_recv(buffer, element_type);
data.dictionary_size = array_compression_serialization_size(data.dictionary_serialization_info);
data.total_size += data.dictionary_size;
data.num_distinct =
array_compression_serialization_num_elements(data.dictionary_serialization_info);
info.dictionary_serialization_info = array_compressed_data_recv(buffer, element_type);
if (!AllocSizeIsValid(data.total_size))
CheckCompressedData(info.dictionary_serialization_info != NULL);
info.dictionary_size = array_compression_serialization_size(info.dictionary_serialization_info);
info.total_size += info.dictionary_size;
info.num_distinct =
array_compression_serialization_num_elements(info.dictionary_serialization_info);
if (!AllocSizeIsValid(info.total_size))
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("compressed size exceeds the maximum allowed (%d)", (int) MaxAllocSize)));
return PointerGetDatum(dictionary_compressed_from_serialization_info(data, element_type));
return PointerGetDatum(dictionary_compressed_from_serialization_info(info, element_type));
}

View File

@ -56,10 +56,13 @@ FUNCTION_NAME(simple8brle_decompress_all_buf,
const uint16 n_block_values = simple8brle_rledata_repeatcount(block_data);
CheckCompressedData(decompressed_index + n_block_values <= n_buffer_elements);
const ELEMENT_TYPE repeated_value = simple8brle_rledata_value(block_data);
const uint64 repeated_value_raw = simple8brle_rledata_value(block_data);
const ELEMENT_TYPE repeated_value_converted = repeated_value_raw;
CheckCompressedData(repeated_value_raw == (uint64) repeated_value_converted);
for (uint16 i = 0; i < n_block_values; i++)
{
decompressed_values[decompressed_index + i] = repeated_value;
decompressed_values[decompressed_index + i] = repeated_value_converted;
}
decompressed_index += n_block_values;
@ -77,7 +80,7 @@ FUNCTION_NAME(simple8brle_decompress_all_buf,
* produces, which is easier for testing. \
*/ \
const uint8 bits_per_value = SIMPLE8B_BIT_LENGTH[X]; \
CheckCompressedData(bits_per_value / 8 <= sizeof(ELEMENT_TYPE)); \
CheckCompressedData(bits_per_value <= sizeof(ELEMENT_TYPE) * 8); \
\
/* \
* The last block might have less values than normal, but we have \
@ -86,7 +89,7 @@ FUNCTION_NAME(simple8brle_decompress_all_buf,
* might be incorrect. \
*/ \
const uint16 n_block_values = SIMPLE8B_NUM_ELEMENTS[X]; \
CheckCompressedData(decompressed_index + n_block_values < n_buffer_elements); \
CheckCompressedData(decompressed_index + n_block_values <= n_buffer_elements); \
\
const uint64 bitmask = simple8brle_selector_get_bitmask(X); \
\
@ -155,10 +158,11 @@ FUNCTION_NAME(simple8brle_decompress_all, ELEMENT_TYPE)(Simple8bRleSerialized *c
Assert(n_total_values <= GLOBAL_MAX_ROWS_PER_COMPRESSION);
/*
* We need a significant padding of 64 elements, not bytes, here, because we
* work in Simple8B blocks which can contain up to 64 elements.
* We need a quite significant padding of 63 elements, not bytes, after the
* last element, because we work in Simple8B blocks which can contain up to
* 64 elements.
*/
const uint16 n_buffer_elements = ((n_total_values + 63) / 64 + 1) * 64;
const uint16 n_buffer_elements = n_total_values + 63;
ELEMENT_TYPE *restrict decompressed_values = palloc(sizeof(ELEMENT_TYPE) * n_buffer_elements);

View File

@ -1542,31 +1542,80 @@ DROP TABLE base_texts;
-- Interesting corrupt data found by fuzzing --
-----------------------------------------------
\c :TEST_DBNAME :ROLE_SUPERUSER
create or replace function ts_read_compressed_data_directory(cstring, regtype, cstring)
returns table(path text, bytes int, rows int, sqlstate text, location text)
as :TSL_MODULE_PATHNAME, 'ts_read_compressed_data_directory' language c;
select count(*), coalesce((rows >= 0)::text, sqlstate) result
from ts_read_compressed_data_directory('gorilla', 'float8', (:'TEST_INPUT_DIR' || '/fuzzing/compression/gorilla-float8')::cstring)
group by 2 order by 1 desc;
count | result
-------+--------
224 | XX001
55 | true
23 | 08P01
(3 rows)
select count(*), coalesce((rows >= 0)::text, sqlstate) result
from ts_read_compressed_data_directory('deltadelta', 'int8', (:'TEST_INPUT_DIR' || '/fuzzing/compression/deltadelta-int8')::cstring)
group by 2 order by 1 desc;
count | result
-------+--------
157 | XX001
80 | true
13 | 08P01
1 | false
(4 rows)
create or replace function ts_read_compressed_data_file(cstring, regtype, cstring) returns int
create or replace function ts_read_compressed_data_file(cstring, regtype, cstring, bool = true) returns int
as :TSL_MODULE_PATHNAME, 'ts_read_compressed_data_file' language c;
\set ON_ERROR_STOP 0
select ts_read_compressed_data_file('gorilla', 'float8', '--nonexistent');
ERROR: could not open the file '--nonexistent'
\set ON_ERROR_STOP 1
create or replace function ts_read_compressed_data_directory(cstring, regtype, cstring, bool)
returns table(path text, bytes int, rows int, sqlstate text, location text)
as :TSL_MODULE_PATHNAME, 'ts_read_compressed_data_directory' language c;
\set fn 'ts_read_compressed_data_directory(:''algo'', :''type'', format(''%s/fuzzing/compression/%s-%s'', :''TEST_INPUT_DIR'', :''algo'', :''type'')::cstring, '
\set algo gorilla
\set type float8
select count(*)
, coalesce((bulk.rows >= 0)::text, bulk.sqlstate) bulk_result
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn true) bulk join :fn false) rowbyrow using (path)
group by 2, 3 order by 1 desc
;
count | bulk_result | rowbyrow_result
-------+-------------+-----------------
142 | XX001 | true
82 | XX001 | XX001
55 | true | true
23 | 08P01 | 08P01
(4 rows)
\set algo deltadelta
\set type int8
select count(*)
, coalesce((bulk.rows >= 0)::text, bulk.sqlstate) bulk_result
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn true) bulk join :fn false) rowbyrow using (path)
group by 2, 3 order by 1 desc
;
count | bulk_result | rowbyrow_result
-------+-------------+-----------------
106 | XX001 | XX001
69 | true | true
62 | XX001 | true
13 | 08P01 | 08P01
1 | false | false
(5 rows)
\set algo array
\set type text
select count(*)
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn false) rowbyrow
group by 2 order by 1 desc
;
count | rowbyrow_result
-------+-----------------
13 | XX001
4 | 08P01
3 | true
1 | false
1 | 22021
1 | 3F000
(6 rows)
\set algo dictionary
\set type text
select count(*)
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn false) rowbyrow
group by 2 order by 1 desc
;
count | rowbyrow_result
-------+-----------------
22 | XX001
4 | 08P01
2 | true
1 | false
1 | 22021
1 | 3F000
(6 rows)

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -376,19 +376,50 @@ DROP TABLE base_texts;
\c :TEST_DBNAME :ROLE_SUPERUSER
create or replace function ts_read_compressed_data_directory(cstring, regtype, cstring)
create or replace function ts_read_compressed_data_file(cstring, regtype, cstring, bool = true) returns int
as :TSL_MODULE_PATHNAME, 'ts_read_compressed_data_file' language c;
\set ON_ERROR_STOP 0
select ts_read_compressed_data_file('gorilla', 'float8', '--nonexistent');
\set ON_ERROR_STOP 1
create or replace function ts_read_compressed_data_directory(cstring, regtype, cstring, bool)
returns table(path text, bytes int, rows int, sqlstate text, location text)
as :TSL_MODULE_PATHNAME, 'ts_read_compressed_data_directory' language c;
select count(*), coalesce((rows >= 0)::text, sqlstate) result
from ts_read_compressed_data_directory('gorilla', 'float8', (:'TEST_INPUT_DIR' || '/fuzzing/compression/gorilla-float8')::cstring)
group by 2 order by 1 desc;
\set fn 'ts_read_compressed_data_directory(:''algo'', :''type'', format(''%s/fuzzing/compression/%s-%s'', :''TEST_INPUT_DIR'', :''algo'', :''type'')::cstring, '
select count(*), coalesce((rows >= 0)::text, sqlstate) result
from ts_read_compressed_data_directory('deltadelta', 'int8', (:'TEST_INPUT_DIR' || '/fuzzing/compression/deltadelta-int8')::cstring)
group by 2 order by 1 desc;
\set algo gorilla
\set type float8
select count(*)
, coalesce((bulk.rows >= 0)::text, bulk.sqlstate) bulk_result
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn true) bulk join :fn false) rowbyrow using (path)
group by 2, 3 order by 1 desc
;
create or replace function ts_read_compressed_data_file(cstring, regtype, cstring) returns int
as :TSL_MODULE_PATHNAME, 'ts_read_compressed_data_file' language c;
\set algo deltadelta
\set type int8
select count(*)
, coalesce((bulk.rows >= 0)::text, bulk.sqlstate) bulk_result
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn true) bulk join :fn false) rowbyrow using (path)
group by 2, 3 order by 1 desc
;
\set algo array
\set type text
select count(*)
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn false) rowbyrow
group by 2 order by 1 desc
;
\set algo dictionary
\set type text
select count(*)
, coalesce((rowbyrow.rows >= 0)::text, rowbyrow.sqlstate) rowbyrow_result
from :fn false) rowbyrow
group by 2 order by 1 desc
;
select ts_read_compressed_data_file('gorilla', 'float8', '--nonexistent');