diff --git a/src/adts/bit_array_impl.h b/src/adts/bit_array_impl.h index 1302cea5c..aad90a8d1 100644 --- a/src/adts/bit_array_impl.h +++ b/src/adts/bit_array_impl.h @@ -161,7 +161,11 @@ bytes_store_bit_array_and_advance(char *dest, size_t expected_size, const BitArr *num_buckets_out = bit_array_num_buckets(array); *bits_in_last_bucket_out = array->bits_used_in_last_bucket; - memcpy(dest, array->buckets.data, size); + if (size > 0) + { + Assert(array->buckets.data != NULL); + memcpy(dest, array->buckets.data, size); + } return dest + size; } diff --git a/tsl/src/compression/CMakeLists.txt b/tsl/src/compression/CMakeLists.txt index 40ac5b772..cf0010c59 100644 --- a/tsl/src/compression/CMakeLists.txt +++ b/tsl/src/compression/CMakeLists.txt @@ -1,9 +1,9 @@ set(SOURCES - ${CMAKE_CURRENT_SOURCE_DIR}/compression.c ${CMAKE_CURRENT_SOURCE_DIR}/array.c + ${CMAKE_CURRENT_SOURCE_DIR}/compression.c + ${CMAKE_CURRENT_SOURCE_DIR}/create.c + ${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c ${CMAKE_CURRENT_SOURCE_DIR}/dictionary.c ${CMAKE_CURRENT_SOURCE_DIR}/gorilla.c - ${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c - ${CMAKE_CURRENT_SOURCE_DIR}/create.c ) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/compression/array.c b/tsl/src/compression/array.c index f9011af93..0379a0cb7 100644 --- a/tsl/src/compression/array.c +++ b/tsl/src/compression/array.c @@ -93,6 +93,13 @@ typedef struct ArrayCompressor bool has_nulls; } ArrayCompressor; +typedef struct ExtendedCompressor +{ + Compressor base; + ArrayCompressor *internal; + Oid element_type; +} ExtendedCompressor; + typedef struct ArrayDecompressionIterator { DecompressionIterator base; @@ -111,6 +118,53 @@ typedef struct ArrayDecompressionIterator *** Compressor *** ******************/ +static void +array_compressor_append_datum(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = array_compressor_alloc(extended->element_type); + + array_compressor_append(extended->internal, val); +} + +static void +array_compressor_append_null_value(Compressor *compressor) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = array_compressor_alloc(extended->element_type); + + array_compressor_append_null(extended->internal); +} + +static void * +array_compressor_finish_and_reset(Compressor *compressor) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + void *compressed = array_compressor_finish(extended->internal); + pfree(extended->internal); + extended->internal = NULL; + return compressed; +} + +const Compressor array_compressor = { + .append_val = array_compressor_append_datum, + .append_null = array_compressor_append_null_value, + .finish = array_compressor_finish_and_reset, +}; + +Compressor * +array_compressor_for_type(Oid element_type) +{ + ExtendedCompressor *compressor = palloc(sizeof(*compressor)); + *compressor = (ExtendedCompressor){ + .base = array_compressor, + .element_type = element_type, + }; + return &compressor->base; +} + ArrayCompressor * array_compressor_alloc(Oid type_to_compress) { @@ -193,7 +247,9 @@ array_compressor_get_serialization_info(ArrayCompressor *compressor) if (info->nulls != NULL) info->total += simple8brle_serialized_total_size(info->nulls); - info->total += simple8brle_serialized_total_size(info->sizes); + if (info->sizes != NULL) + info->total += simple8brle_serialized_total_size(info->sizes); + info->total += compressor->data.num_elements; return info; } @@ -262,10 +318,13 @@ array_compressed_from_serialization_info(ArrayCompressorSerializationInfo *info, return compressed_array; } -ArrayCompressed * +void * array_compressor_finish(ArrayCompressor *compressor) { ArrayCompressorSerializationInfo *info = array_compressor_get_serialization_info(compressor); + if (info->sizes == NULL) + return NULL; + return array_compressed_from_serialization_info(info, compressor->type); } @@ -738,8 +797,13 @@ tsl_array_compressor_finish(PG_FUNCTION_ARGS) { ArrayCompressor *compressor = (ArrayCompressor *) (PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0)); + void *compressed; if (compressor == NULL) PG_RETURN_NULL(); - PG_RETURN_POINTER(array_compressor_finish(compressor)); + compressed = array_compressor_finish(compressor); + if (compressed == NULL) + PG_RETURN_NULL(); + + PG_RETURN_POINTER(compressed); } diff --git a/tsl/src/compression/array.h b/tsl/src/compression/array.h index 2694ebbe8..6c0c163cd 100644 --- a/tsl/src/compression/array.h +++ b/tsl/src/compression/array.h @@ -24,10 +24,13 @@ typedef struct ArrayCompressor ArrayCompressor; typedef struct ArrayCompressed ArrayCompressed; typedef struct ArrayDecompressionIterator ArrayDecompressionIterator; +extern const Compressor array_compressor; + +extern Compressor *array_compressor_for_type(Oid element_type); extern ArrayCompressor *array_compressor_alloc(Oid type_to_compress); extern void array_compressor_append_null(ArrayCompressor *compressor); extern void array_compressor_append(ArrayCompressor *compressor, Datum val); -extern ArrayCompressed *array_compressor_finish(ArrayCompressor *compressor); +extern void *array_compressor_finish(ArrayCompressor *compressor); extern ArrayDecompressionIterator *array_decompression_iterator_alloc(void); extern DecompressionIterator * @@ -70,6 +73,7 @@ extern Datum tsl_array_compressor_finish(PG_FUNCTION_ARGS); .iterator_init_reverse = tsl_array_decompression_iterator_from_datum_reverse, \ .compressed_data_send = array_compressed_send, \ .compressed_data_recv = array_compressed_recv, \ + .compressor_for_type = array_compressor_for_type, \ } #endif diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index fb1a15806..2bf8f0bcb 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -4,29 +4,667 @@ * LICENSE-TIMESCALE for a copy of the license. */ -#include "compression.h" -#include +#include "compression/compression.h" + +#include +#include +#include +#include +#include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include +#include +#include #include "array.h" +#include "deltadelta.h" #include "dictionary.h" #include "gorilla.h" -#include "deltadelta.h" -static const CompressionAlgorithmDefinition definitions[] = { +#define MAX_ROWS_PER_COMPRESSION 1000 + +static const CompressionAlgorithmDefinition definitions[_END_COMPRESSION_ALGORITHMS] = { [COMPRESSION_ALGORITHM_ARRAY] = ARRAY_ALGORITHM_DEFINITION, [COMPRESSION_ALGORITHM_DICTIONARY] = DICTIONARY_ALGORITHM_DEFINITION, [COMPRESSION_ALGORITHM_GORILLA] = GORILLA_ALGORITHM_DEFINITION, [COMPRESSION_ALGORITHM_DELTADELTA] = DELTA_DELTA_ALGORITHM_DEFINITION, - }; +static Compressor * +compressor_for_algorithm_and_type(CompressionAlgorithms algorithm, Oid type) +{ + if (algorithm >= _END_COMPRESSION_ALGORITHMS) + elog(ERROR, "invalid compression algorithm %d", algorithm); + + return definitions[algorithm].compressor_for_type(type); +} + +typedef struct SegmentInfo +{ + Datum val; + FmgrInfo eq_fn; + FunctionCallInfoData eq_fcinfo; + int16 typlen; + bool is_null; + bool typ_by_val; +} SegmentInfo; + +typedef struct PerColumn +{ + /* the compressor to use for regular columns, NULL for segmenters */ + Compressor *compressor; + + /* segment info; only used if compressor is NULL */ + SegmentInfo *segment_info; +} PerColumn; + +typedef struct RowCompressor +{ + /* the table we're writing the compressed data to */ + Relation compressed_table; + BulkInsertState bistate; + + /* in theory we could have more input columns than outputted ones, so we + store the number of inputs/compressors seperately*/ + int n_input_columns; + + /* info about each column */ + struct PerColumn *per_column; + + /* the order of columns in the compressed data need not match the order in the + * uncompressed. This array maps each attribute offset in the uncompressed + * data to the corresponding one in the compressed + */ + int16 *uncompressed_col_to_compressed_col; + + /* the number of uncompressed rows compressed into the current compressed row */ + uint32 rows_compressed_into_current_value; + + /* cached arrays used to build the HeapTuple */ + Datum *compressed_values; + bool *compressed_is_null; +} RowCompressor; + +static int16 *compress_chunk_populate_keys(Oid in_table, const ColumnCompressionInfo **columns, + int n_columns, int *n_keys_out, + const ColumnCompressionInfo ***keys_out); +static Tuplesortstate *compress_chunk_sort_relation(Relation in_rel, int n_keys, + const ColumnCompressionInfo **keys); +static void row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_desc, + Relation compressed_table, int num_compression_infos, + const ColumnCompressionInfo **column_compression_info, + int16 *column_offsets, int16 num_compressed_columns); +static void row_compressor_append_sorted_rows(RowCompressor *row_compressor, + Tuplesortstate *sorted_rel, TupleDesc sorted_desc); +static void row_compressor_finish(RowCompressor *row_compressor); + +/******************** + ** compress_chunk ** + ********************/ + +void +compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column_compression_info, + int num_compression_infos) +{ + int n_keys; + const ColumnCompressionInfo **keys; + int16 *in_column_offsets = compress_chunk_populate_keys(in_table, + column_compression_info, + num_compression_infos, + &n_keys, + &keys); + + /*We want to prevent other compressors from compressing this table, + * and we want to prevent INSERTs or UPDATEs which could mess up our compression. + * We may as well allow readers to keep reading the uncompressed data while + * we are compressing, so we only take an ExclusiveLock instead of AccessExclusive. + */ + Relation in_rel = relation_open(in_table, ExclusiveLock); + /* we are _just_ INSERTing into the out_table so in principle we could take + * a RowExclusive lock, and let other operations read and write this table + * as we work. However, we currently compress each table as a oneshot, so + * we're taking the stricter lock to prevent accidents. + */ + Relation out_rel = relation_open(out_table, ExclusiveLock); + // TODO error if out_rel is non-empty + + TupleDesc in_desc = RelationGetDescr(in_rel); + TupleDesc out_desc = RelationGetDescr(out_rel); + + Tuplesortstate *sorted_rel = compress_chunk_sort_relation(in_rel, n_keys, keys); + + RowCompressor row_compressor; + + Assert(num_compression_infos <= in_desc->natts); + Assert(num_compression_infos <= out_desc->natts); + + row_compressor_init(&row_compressor, + in_desc, + out_rel, + num_compression_infos, + column_compression_info, + in_column_offsets, + out_desc->natts); + + row_compressor_append_sorted_rows(&row_compressor, sorted_rel, in_desc); + + row_compressor_finish(&row_compressor); + + tuplesort_end(sorted_rel); + + RelationClose(out_rel); + RelationClose(in_rel); +} + +static int16 * +compress_chunk_populate_keys(Oid in_table, const ColumnCompressionInfo **columns, int n_columns, + int *n_keys_out, const ColumnCompressionInfo ***keys_out) +{ + int16 *column_offsets = palloc(sizeof(*column_offsets) * n_columns); + + int i; + int n_segment_keys = 0; + *n_keys_out = 0; + + for (i = 0; i < n_columns; i++) + { + bool is_segmentby = columns[i]->segmentby_column_index >= 0; + bool is_orderby = columns[i]->orderby_column_index >= 0; + if (is_segmentby) + n_segment_keys += 1; + + if (is_segmentby || is_orderby) + *n_keys_out += 1; + } + + *keys_out = palloc(sizeof(**keys_out) * *n_keys_out); + + for (i = 0; i < n_columns; i++) + { + const ColumnCompressionInfo *column = columns[i]; + int16 segment_offset = column->segmentby_column_index; + int16 orderby_offset = column->orderby_column_index; + AttrNumber compressed_att; + if (segment_offset >= 0) + (*keys_out)[segment_offset] = column; + + if (columns[i]->orderby_column_index >= 0) + (*keys_out)[n_segment_keys + orderby_offset] = column; + + compressed_att = get_attnum(in_table, NameStr(column->attname)); + if (!AttributeNumberIsValid(compressed_att)) + elog(ERROR, "could not find compressed column for \"%s\"", NameStr(column->attname)); + + column_offsets[i] = AttrNumberGetAttrOffset(compressed_att); + } + + return column_offsets; +} + +static void compress_chunk_populate_sort_info_for_column(Oid table, + const ColumnCompressionInfo *column, + AttrNumber *att_nums, Oid *sort_operator, + Oid *collation, bool *nulls_first); + +static Tuplesortstate * +compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressionInfo **keys) +{ + TupleDesc tupDesc = RelationGetDescr(in_rel); + Tuplesortstate *tuplesortstate; + HeapTuple tuple; + HeapScanDesc heapScan; + TupleTableSlot *heap_tuple_slot = MakeTupleTableSlotCompat(tupDesc); + AttrNumber *sort_keys = palloc(sizeof(*sort_keys) * n_keys); + Oid *sort_operators = palloc(sizeof(*sort_operators) * n_keys); + Oid *sort_collations = palloc(sizeof(*sort_collations) * n_keys); + bool *nulls_first = palloc(sizeof(*nulls_first) * n_keys); + int n; + + for (n = 0; n < n_keys; n++) + compress_chunk_populate_sort_info_for_column(RelationGetRelid(in_rel), + keys[n], + &sort_keys[n], + &sort_operators[n], + &sort_collations[n], + &nulls_first[n]); + + tuplesortstate = tuplesort_begin_heap(tupDesc, + n_keys, + sort_keys, + sort_operators, + sort_collations, + nulls_first, + work_mem, +#if PG11 + NULL, +#endif + false /*=randomAccess*/); + + heapScan = heap_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); + for (tuple = heap_getnext(heapScan, ForwardScanDirection); tuple != NULL; + tuple = heap_getnext(heapScan, ForwardScanDirection)) + { + if (HeapTupleIsValid(tuple)) + { + // TODO is this the most efficient way to do this? + // (since we use begin_heap() the tuplestore expects tupleslots, + // so ISTM that the options are this or maybe putdatum()) + ExecStoreTuple(tuple, heap_tuple_slot, InvalidBuffer, false); + tuplesort_puttupleslot(tuplesortstate, heap_tuple_slot); + } + } + + heap_endscan(heapScan); + + ExecDropSingleTupleTableSlot(heap_tuple_slot); + + tuplesort_performsort(tuplesortstate); + + return tuplesortstate; +} + +static void +compress_chunk_populate_sort_info_for_column(Oid table, const ColumnCompressionInfo *column, + AttrNumber *att_nums, Oid *sort_operator, + Oid *collation, bool *nulls_first) +{ + HeapTuple tp; + Form_pg_attribute att_tup; + TypeCacheEntry *tentry; + + tp = SearchSysCacheAttName(table, NameStr(column->attname)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "table %d does not have column \"%s\"", table, NameStr(column->attname)); + + att_tup = (Form_pg_attribute) GETSTRUCT(tp); + // TODO other valdation checks? + + *att_nums = att_tup->attnum; + *collation = att_tup->attcollation; + *nulls_first = column->segmentby_column_index < 0 && column->orderby_nullsfirst; + + tentry = lookup_type_cache(att_tup->atttypid, TYPECACHE_LT_OPR | TYPECACHE_GT_OPR); + + if (column->segmentby_column_index >= 0 || column->orderby_asc) + *sort_operator = tentry->lt_opr; + else + *sort_operator = tentry->gt_opr; + + if (!OidIsValid(*sort_operator)) + elog(ERROR, + "no valid sort operator for column \"%s\" of type \"%s\"", + NameStr(column->attname), + format_type_be(att_tup->atttypid)); + + ReleaseSysCache(tp); +} + +/******************** + ** row_compressor ** + ********************/ + +static void row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row); +static bool row_compressor_new_row_is_in_new_group(RowCompressor *row_compressor, + TupleTableSlot *row); +static void row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row); +static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid); + +static SegmentInfo *segment_info_new(Form_pg_attribute column_attr); +static void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null); +static bool segment_info_datum_is_in_group(SegmentInfo *segment_info, Datum datum, bool is_null); + +/* num_compression_infos is the number of columns we will write to in the compressed table */ +static void +row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_desc, + Relation compressed_table, int num_compression_infos, + const ColumnCompressionInfo **column_compression_info, int16 *in_column_offsets, + int16 num_compressed_columns) +{ + TupleDesc out_desc = RelationGetDescr(compressed_table); + int col; + *row_compressor = (RowCompressor){ + .compressed_table = compressed_table, + .bistate = GetBulkInsertState(), + .n_input_columns = uncompressed_tuple_desc->natts, + .per_column = palloc0(sizeof(PerColumn) * uncompressed_tuple_desc->natts), + .uncompressed_col_to_compressed_col = + palloc0(sizeof(*row_compressor->uncompressed_col_to_compressed_col) * + uncompressed_tuple_desc->natts), + .compressed_values = palloc(sizeof(Datum) * num_compressed_columns), + .compressed_is_null = palloc(sizeof(bool) * num_compressed_columns), + .rows_compressed_into_current_value = 0, + }; + + memset(row_compressor->compressed_is_null, 1, sizeof(bool) * num_compressed_columns); + + for (col = 0; col < num_compression_infos; col++) + { + const ColumnCompressionInfo *compression_info = column_compression_info[col]; + /* we want row_compressor.per_column to be in the same order as the underlying table */ + int16 in_column_offset = in_column_offsets[col]; + PerColumn *column = &row_compressor->per_column[in_column_offset]; + Form_pg_attribute column_attr = TupleDescAttr(uncompressed_tuple_desc, in_column_offset); + AttrNumber compressed_colnum = + attno_find_by_attname(out_desc, (Name) &compression_info->attname); + row_compressor->uncompressed_col_to_compressed_col[in_column_offset] = + AttrNumberGetAttrOffset(compressed_colnum); + Assert(AttrNumberGetAttrOffset(compressed_colnum) < num_compressed_columns); + + if (compression_info->segmentby_column_index < 0) + { + *column = (PerColumn){ + .compressor = compressor_for_algorithm_and_type(compression_info->algo_id, + column_attr->atttypid), + }; + } + else + { + *column = (PerColumn){ + .segment_info = segment_info_new(column_attr), + }; + } + } +} + +static void +row_compressor_append_sorted_rows(RowCompressor *row_compressor, Tuplesortstate *sorted_rel, + TupleDesc sorted_desc) +{ + CommandId mycid = GetCurrentCommandId(true); + TupleTableSlot *slot = MakeTupleTableSlotCompat(sorted_desc); + bool got_tuple; + bool first_iteration = true; + for (got_tuple = tuplesort_gettupleslot(sorted_rel, + true /*=forward*/, + false /*=copy*/, + slot, + NULL /*=abbrev*/); + got_tuple; + got_tuple = tuplesort_gettupleslot(sorted_rel, + true /*=forward*/, + false /*=copy*/, + slot, + NULL /*=abbrev*/)) + { + bool changed_groups, compressed_row_is_full; + slot_getallattrs(slot); + + /* first time through */ + if (first_iteration) + { + row_compressor_update_group(row_compressor, slot); + first_iteration = false; + } + + changed_groups = row_compressor_new_row_is_in_new_group(row_compressor, slot); + compressed_row_is_full = + row_compressor->rows_compressed_into_current_value >= MAX_ROWS_PER_COMPRESSION; + if (compressed_row_is_full || changed_groups) + { + if (row_compressor->rows_compressed_into_current_value > 0) + row_compressor_flush(row_compressor, mycid); + if (changed_groups) + row_compressor_update_group(row_compressor, slot); + } + + row_compressor_append_row(row_compressor, slot); + ExecClearTuple(slot); + } + + if (row_compressor->rows_compressed_into_current_value > 0) + row_compressor_flush(row_compressor, mycid); + + ExecDropSingleTupleTableSlot(slot); +} + +static void +row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row) +{ + int col; + + Assert(row_compressor->rows_compressed_into_current_value == 0); + Assert(row_compressor->n_input_columns <= row->tts_nvalid); + + for (col = 0; col < row_compressor->n_input_columns; col++) + { + PerColumn *column = &row_compressor->per_column[col]; + Datum val; + bool is_null; + + if (column->segment_info == NULL) + continue; + + Assert(column->compressor == NULL); + + // TODO we should just use array access here; everything is guaranteed to be fetched + val = slot_getattr(row, AttrOffsetGetAttrNumber(col), &is_null); + segment_info_update(column->segment_info, val, is_null); + } +} + +static bool +row_compressor_new_row_is_in_new_group(RowCompressor *row_compressor, TupleTableSlot *row) +{ + int col; + for (col = 0; col < row_compressor->n_input_columns; col++) + { + PerColumn *column = &row_compressor->per_column[col]; + Datum datum = CharGetDatum(0); + bool is_null; + + if (column->segment_info == NULL) + continue; + + Assert(column->compressor == NULL); + + datum = slot_getattr(row, AttrOffsetGetAttrNumber(col), &is_null); + + if (!segment_info_datum_is_in_group(column->segment_info, datum, is_null)) + return true; + } + + return false; +} + +static void +row_compressor_append_row(RowCompressor *row_compressor, TupleTableSlot *row) +{ + int col; + for (col = 0; col < row_compressor->n_input_columns; col++) + { + Compressor *compressor = row_compressor->per_column[col].compressor; + bool is_null; + Datum val; + + /* if there is no compressor, this must be a segmenter, so just skip */ + if (compressor == NULL) + continue; + + // TODO since we call getallatts at the beginning, slot_getattr is useless + // overhead here, and we should just access the array directly + val = slot_getattr(row, AttrOffsetGetAttrNumber(col), &is_null); + if (is_null) + compressor->append_null(compressor); + else + compressor->append_val(compressor, val); + } + + row_compressor->rows_compressed_into_current_value += 1; +} + +static void +row_compressor_flush(RowCompressor *row_compressor, CommandId mycid) +{ + int16 col; + HeapTuple compressed_tuple; + + for (col = 0; col < row_compressor->n_input_columns; col++) + { + PerColumn *column = &row_compressor->per_column[col]; + Compressor *compressor; + int16 compressed_col; + if (column->compressor == NULL && column->segment_info == NULL) + continue; + + compressor = column->compressor; + compressed_col = row_compressor->uncompressed_col_to_compressed_col[col]; + + Assert(compressed_col >= 0); + + if (compressor != NULL) + { + void *compressed_data; + Assert(column->segment_info == NULL); + Assert(compressed_col < row_compressor->n_input_columns); + + compressed_data = compressor->finish(compressor); + + /* non-segment columns are NULL iff all the values are NULL */ + row_compressor->compressed_is_null[compressed_col] = compressed_data == NULL; + if (compressed_data != NULL) + row_compressor->compressed_values[compressed_col] = + PointerGetDatum(compressed_data); + } + else if (column->segment_info != NULL) + { + row_compressor->compressed_values[compressed_col] = column->segment_info->val; + row_compressor->compressed_is_null[compressed_col] = column->segment_info->is_null; + } + } + + compressed_tuple = heap_form_tuple(RelationGetDescr(row_compressor->compressed_table), + row_compressor->compressed_values, + row_compressor->compressed_is_null); + heap_insert(row_compressor->compressed_table, + compressed_tuple, + mycid, + 0 /*=options*/, + row_compressor->bistate); + + /* free the compressed values now that we're done with them (the old compressor is freed in + * finish()) */ + for (col = 0; col < row_compressor->n_input_columns; col++) + { + PerColumn *column = &row_compressor->per_column[col]; + int16 compressed_col; + if (column->compressor == NULL && column->segment_info == NULL) + continue; + + compressed_col = row_compressor->uncompressed_col_to_compressed_col[col]; + Assert(compressed_col >= 0); + Assert(compressed_col < row_compressor->n_input_columns); + if (row_compressor->compressed_is_null[compressed_col]) + continue; + + if (column->compressor != NULL || !column->segment_info->typ_by_val) + pfree(DatumGetPointer(row_compressor->compressed_values[compressed_col])); + + row_compressor->compressed_values[compressed_col] = 0; + row_compressor->compressed_is_null[compressed_col] = true; + } + + row_compressor->rows_compressed_into_current_value = 0; +} + +static void +row_compressor_finish(RowCompressor *row_compressor) +{ + FreeBulkInsertState(row_compressor->bistate); +} + +/****************** + ** segment_info ** + ******************/ + +static SegmentInfo * +segment_info_new(Form_pg_attribute column_attr) +{ + Oid eq_fn_oid = + lookup_type_cache(column_attr->atttypid, TYPECACHE_EQ_OPR_FINFO)->eq_opr_finfo.fn_oid; + SegmentInfo *segment_info = palloc(sizeof(*segment_info)); + + *segment_info = (SegmentInfo){ + .typlen = column_attr->attlen, + .typ_by_val = column_attr->attbyval, + }; + + if (!OidIsValid(eq_fn_oid)) + elog(ERROR, "no equality function for column \"%s\"", NameStr(column_attr->attname)); + fmgr_info_cxt(eq_fn_oid, &segment_info->eq_fn, CurrentMemoryContext); + + InitFunctionCallInfoData(segment_info->eq_fcinfo, + &segment_info->eq_fn /*=Flinfo*/, + 2 /*=Nargs*/, + column_attr->attcollation /*=Collation*/, + NULL, /*=Context*/ + NULL /*=ResultInfo*/ + ); + + return segment_info; +} + +static void +segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null) +{ + segment_info->is_null = is_null; + if (is_null) + segment_info->val = 0; + else + segment_info->val = datumCopy(val, segment_info->typ_by_val, segment_info->typlen); +} + +static bool +segment_info_datum_is_in_group(SegmentInfo *segment_info, Datum datum, bool is_null) +{ + Datum data_is_eq; + FunctionCallInfoData *eq_fcinfo; + /* if one of the datums is null and the other isn't, we must be in a new group */ + if (segment_info->is_null != is_null) + return false; + + /* they're both null */ + if (segment_info->is_null) + return true; + + /* neither is null, call the eq function */ + eq_fcinfo = &segment_info->eq_fcinfo; + eq_fcinfo->arg[0] = segment_info->val; + eq_fcinfo->argnull[0] = false; + + eq_fcinfo->arg[1] = datum; + eq_fcinfo->isnull = false; + + data_is_eq = FunctionCallInvoke(eq_fcinfo); + + if (eq_fcinfo->isnull) + return false; + + return DatumGetBool(data_is_eq); +} + +/********************/ +/*** SQL Bindings ***/ +/********************/ + Datum tsl_compressed_data_decompress_forward(PG_FUNCTION_ARGS) { - Datum compressed = PG_GETARG_DATUM(0); + Datum compressed = PG_ARGISNULL(0) ? ({ + PG_RETURN_NULL(); + 0; + }) : + PG_GETARG_DATUM(0); CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(compressed); FuncCallContext *funcctx; MemoryContext oldcontext; @@ -63,7 +701,11 @@ tsl_compressed_data_decompress_forward(PG_FUNCTION_ARGS) Datum tsl_compressed_data_decompress_reverse(PG_FUNCTION_ARGS) { - Datum compressed = PG_GETARG_DATUM(0); + Datum compressed = PG_ARGISNULL(0) ? ({ + PG_RETURN_NULL(); + 0; + }) : + PG_GETARG_DATUM(0); CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(compressed); FuncCallContext *funcctx; MemoryContext oldcontext; diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index 6b76c1b3a..46aad6ef3 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -48,6 +48,17 @@ typedef struct DecompressResult bool is_done; } DecompressResult; +/* Forward declaration of ColumnCompressionInfo so we don't need to include catalog.h */ +typedef struct FormData_hypertable_compression ColumnCompressionInfo; + +typedef struct Compressor Compressor; +struct Compressor +{ + void (*append_null)(Compressor *compressord); + void (*append_val)(Compressor *compressor, Datum val); + void *(*finish)(Compressor *data); +}; + typedef struct DecompressionIterator { uint8 compression_algorithm; @@ -63,6 +74,8 @@ typedef struct CompressionAlgorithmDefinition DecompressionIterator *(*iterator_init_reverse)(Datum, Oid element_type); void (*compressed_data_send)(CompressedDataHeader *, StringInfo); Datum (*compressed_data_recv)(StringInfo); + + Compressor *(*compressor_for_type)(Oid element_type); } CompressionAlgorithmDefinition; typedef enum CompressionAlgorithms @@ -74,8 +87,8 @@ typedef enum CompressionAlgorithms COMPRESSION_ALGORITHM_DICTIONARY, COMPRESSION_ALGORITHM_GORILLA, COMPRESSION_ALGORITHM_DELTADELTA, - /* When adding an algorithm also add a static assert statement below */ + /* When adding an algorithm also add a static assert statement below */ /* end of real values */ _END_COMPRESSION_ALGORITHMS, _MAX_NUM_COMPRESSION_ALGORITHMS = 128, @@ -109,4 +122,7 @@ pg_attribute_unused() assert_num_compression_algorithms_sane(void) "number of algorithms have changed, the asserts should be updated"); } +extern void compress_chunk(Oid in_table, Oid out_table, + const ColumnCompressionInfo **column_compression_info, int num_columns); + #endif diff --git a/tsl/src/compression/deltadelta.c b/tsl/src/compression/deltadelta.c index 710f3c9da..75718af78 100644 --- a/tsl/src/compression/deltadelta.c +++ b/tsl/src/compression/deltadelta.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -71,6 +72,159 @@ typedef struct DeltaDeltaCompressor bool has_nulls; } DeltaDeltaCompressor; +typedef struct ExtendedCompressor +{ + Compressor base; + DeltaDeltaCompressor *internal; +} ExtendedCompressor; + +static void +deltadelta_compressor_append_int16(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = delta_delta_compressor_alloc(); + + delta_delta_compressor_append_value(extended->internal, DatumGetInt16(val)); +} + +static void +deltadelta_compressor_append_int32(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = delta_delta_compressor_alloc(); + + delta_delta_compressor_append_value(extended->internal, DatumGetInt32(val)); +} + +static void +deltadelta_compressor_append_int64(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = delta_delta_compressor_alloc(); + + delta_delta_compressor_append_value(extended->internal, DatumGetInt64(val)); +} + +static void +deltadelta_compressor_append_date(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = delta_delta_compressor_alloc(); + + delta_delta_compressor_append_value(extended->internal, DatumGetDateADT(val)); +} + +static void +deltadelta_compressor_append_timestamp(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = delta_delta_compressor_alloc(); + + delta_delta_compressor_append_value(extended->internal, DatumGetTimestamp(val)); +} + +static void +deltadelta_compressor_append_timestamptz(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = delta_delta_compressor_alloc(); + + delta_delta_compressor_append_value(extended->internal, DatumGetTimestampTz(val)); +} + +static void +deltadelta_compressor_append_null_value(Compressor *compressor) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = delta_delta_compressor_alloc(); + + delta_delta_compressor_append_null(extended->internal); +} + +static void *delta_delta_compressor_finish(DeltaDeltaCompressor *compressor); + +static void * +deltadelta_compressor_finish_and_reset(Compressor *compressor) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + void *compressed = delta_delta_compressor_finish(extended->internal); + pfree(extended->internal); + extended->internal = NULL; + return compressed; +} + +const Compressor deltadelta_uint16_compressor = { + .append_val = deltadelta_compressor_append_int16, + .append_null = deltadelta_compressor_append_null_value, + .finish = deltadelta_compressor_finish_and_reset, +}; +const Compressor deltadelta_uint32_compressor = { + .append_val = deltadelta_compressor_append_int32, + .append_null = deltadelta_compressor_append_null_value, + .finish = deltadelta_compressor_finish_and_reset, +}; +const Compressor deltadelta_uint64_compressor = { + .append_val = deltadelta_compressor_append_int64, + .append_null = deltadelta_compressor_append_null_value, + .finish = deltadelta_compressor_finish_and_reset, +}; + +const Compressor deltadelta_date_compressor = { + .append_val = deltadelta_compressor_append_date, + .append_null = deltadelta_compressor_append_null_value, + .finish = deltadelta_compressor_finish_and_reset, +}; + +const Compressor deltadelta_timestamp_compressor = { + .append_val = deltadelta_compressor_append_timestamp, + .append_null = deltadelta_compressor_append_null_value, + .finish = deltadelta_compressor_finish_and_reset, +}; + +const Compressor deltadelta_timestamptz_compressor = { + .append_val = deltadelta_compressor_append_timestamptz, + .append_null = deltadelta_compressor_append_null_value, + .finish = deltadelta_compressor_finish_and_reset, +}; + +Compressor * +delta_delta_compressor_for_type(Oid element_type) +{ + ExtendedCompressor *compressor = palloc(sizeof(*compressor)); + switch (element_type) + { + case INT2OID: + *compressor = (ExtendedCompressor){ .base = deltadelta_uint16_compressor }; + return &compressor->base; + case INT4OID: + *compressor = (ExtendedCompressor){ .base = deltadelta_uint32_compressor }; + return &compressor->base; + case INT8OID: + *compressor = (ExtendedCompressor){ .base = deltadelta_uint64_compressor }; + return &compressor->base; + case DATEOID: + *compressor = (ExtendedCompressor){ .base = deltadelta_date_compressor }; + return &compressor->base; +#ifdef HAVE_INT64_TIMESTAMP + case TIMESTAMPOID: + *compressor = (ExtendedCompressor){ .base = deltadelta_timestamp_compressor }; + return &compressor->base; + case TIMESTAMPTZOID: + *compressor = (ExtendedCompressor){ .base = deltadelta_timestamptz_compressor }; + return &compressor->base; +#endif + default: + elog(ERROR, "invalid type for delta-delta compressor %d", element_type); + } +} + Datum tsl_deltadelta_compressor_append(PG_FUNCTION_ARGS) { @@ -157,20 +311,38 @@ delta_delta_from_parts(uint64 first_value, uint64 last_value, uint64 last_delta, return compressed; } +static void * +delta_delta_compressor_finish(DeltaDeltaCompressor *compressor) +{ + Simple8bRleSerialized *deltas = simple8brle_compressor_finish(&compressor->delta_delta); + Simple8bRleSerialized *nulls = simple8brle_compressor_finish(&compressor->nulls); + DeltaDeltaCompressed *compressed; + + if (deltas == NULL) + return NULL; + + compressed = delta_delta_from_parts(compressor->start, + compressor->prev_val, + compressor->prev_delta, + deltas, + compressor->has_nulls ? nulls : NULL); + + Assert(compressed->compression_algorithm == COMPRESSION_ALGORITHM_DELTADELTA); + return compressed; +} + Datum tsl_deltadelta_compressor_finish(PG_FUNCTION_ARGS) { - DeltaDeltaCompressor *compressor = (DeltaDeltaCompressor *) PG_GETARG_POINTER(0); + DeltaDeltaCompressor *compressor = + PG_ARGISNULL(0) ? NULL : (DeltaDeltaCompressor *) PG_GETARG_POINTER(0); + void *compressed; + if (compressor == NULL) + PG_RETURN_NULL(); - Simple8bRleSerialized *deltas = simple8brle_compressor_finish(&compressor->delta_delta); - Simple8bRleSerialized *nulls = simple8brle_compressor_finish(&compressor->nulls); - DeltaDeltaCompressed *compressed = delta_delta_from_parts(compressor->start, - compressor->prev_val, - compressor->prev_delta, - deltas, - compressor->has_nulls ? nulls : NULL); - - Assert(compressed->compression_algorithm == COMPRESSION_ALGORITHM_DELTADELTA); + compressed = delta_delta_compressor_finish(compressor); + if (compressed == NULL) + PG_RETURN_NULL(); PG_RETURN_POINTER(compressed); } diff --git a/tsl/src/compression/deltadelta.h b/tsl/src/compression/deltadelta.h index 8e8a7f71e..2a50246f7 100644 --- a/tsl/src/compression/deltadelta.h +++ b/tsl/src/compression/deltadelta.h @@ -29,6 +29,7 @@ typedef struct DeltaDeltaCompressor DeltaDeltaCompressor; typedef struct DeltaDeltaCompressed DeltaDeltaCompressed; typedef struct DeltaDeltaDecompressionIterator DeltaDeltaDecompressionIterator; +extern Compressor *delta_delta_compressor_for_type(Oid element_type); extern DeltaDeltaCompressor *delta_delta_compressor_alloc(void); extern void delta_delta_compressor_append_null(DeltaDeltaCompressor *compressor); extern void delta_delta_compressor_append_value(DeltaDeltaCompressor *compressor, int64 next_val); @@ -56,6 +57,7 @@ extern Datum tsl_deltadelta_compressor_finish(PG_FUNCTION_ARGS); .iterator_init_reverse = delta_delta_decompression_iterator_from_datum_reverse, \ .compressed_data_send = deltadelta_compressed_send, \ .compressed_data_recv = deltadelta_compressed_recv, \ + .compressor_for_type = delta_delta_compressor_for_type, \ } #endif diff --git a/tsl/src/compression/dictionary.c b/tsl/src/compression/dictionary.c index 982b9d0e3..5524cbb3b 100644 --- a/tsl/src/compression/dictionary.c +++ b/tsl/src/compression/dictionary.c @@ -80,6 +80,60 @@ typedef struct DictionaryCompressor Simple8bRleCompressor nulls; } DictionaryCompressor; +typedef struct ExtendedCompressor +{ + Compressor base; + DictionaryCompressor *internal; + Oid element_type; +} ExtendedCompressor; + +static void +dictionary_compressor_append_datum(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = dictionary_compressor_alloc(extended->element_type); + + dictionary_compressor_append(extended->internal, val); +} + +static void +dictionary_compressor_append_null_value(Compressor *compressor) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = dictionary_compressor_alloc(extended->element_type); + + dictionary_compressor_append_null(extended->internal); +} + +static void * +dictionary_compressor_finish_and_reset(Compressor *compressor) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + void *compressed = dictionary_compressor_finish(extended->internal); + pfree(extended->internal); + extended->internal = NULL; + return compressed; +} + +const Compressor dictionary_compressor = { + .append_val = dictionary_compressor_append_datum, + .append_null = dictionary_compressor_append_null_value, + .finish = dictionary_compressor_finish_and_reset, +}; + +Compressor * +dictionary_compressor_for_type(Oid element_type) +{ + ExtendedCompressor *compressor = palloc(sizeof(*compressor)); + *compressor = (ExtendedCompressor){ + .base = dictionary_compressor, + .element_type = element_type, + }; + return &compressor->base; +} + DictionaryCompressor * dictionary_compressor_alloc(Oid type) { @@ -140,6 +194,7 @@ typedef struct DictionaryCompressorSerializationInfo Simple8bRleSerialized *dictionary_compressed_indexes; Simple8bRleSerialized *compressed_nulls; ArrayCompressorSerializationInfo *dictionary_serialization_info; + bool is_all_null; } DictionaryCompressorSerializationInfo; static DictionaryCompressorSerializationInfo @@ -160,6 +215,9 @@ compressor_get_serialization_info(DictionaryCompressor *compressor) }; Size header_size = sizeof(DictionaryCompressed); + if (sizes.dictionary_compressed_indexes == NULL) + return (DictionaryCompressorSerializationInfo){ .is_all_null = true }; + sizes.bitmaps_size = simple8brle_serialized_total_size(dict_indexes); sizes.total_size = MAXALIGN(header_size) + sizes.bitmaps_size; if (compressor->has_nulls) @@ -221,10 +279,13 @@ dictionary_compressed_from_serialization_info(DictionaryCompressorSerializationI return bitmap; } -DictionaryCompressed * +void * dictionary_compressor_finish(DictionaryCompressor *compressor) { DictionaryCompressorSerializationInfo sizes = compressor_get_serialization_info(compressor); + if (sizes.is_all_null) + return NULL; + return dictionary_compressed_from_serialization_info(sizes, compressor->type); } @@ -439,10 +500,15 @@ tsl_dictionary_compressor_finish(PG_FUNCTION_ARGS) { DictionaryCompressor *compressor = (DictionaryCompressor *) (PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0)); + void *compressed; if (compressor == NULL) PG_RETURN_NULL(); - PG_RETURN_POINTER(dictionary_compressor_finish(compressor)); + compressed = dictionary_compressor_finish(compressor); + if (compressed == NULL) + PG_RETURN_NULL(); + + PG_RETURN_POINTER(compressed); } ///////////////////// diff --git a/tsl/src/compression/dictionary.h b/tsl/src/compression/dictionary.h index 70b5ed362..34fa4a0ae 100644 --- a/tsl/src/compression/dictionary.h +++ b/tsl/src/compression/dictionary.h @@ -25,10 +25,11 @@ typedef struct DictionaryCompressor DictionaryCompressor; typedef struct DictionaryCompressed DictionaryCompressed; typedef struct DictionaryDecompressionIterator DictionaryDecompressionIterator; +extern Compressor *dictionary_compressor_for_type(Oid element_type); extern DictionaryCompressor *dictionary_compressor_alloc(Oid type_to_compress); extern void dictionary_compressor_append_null(DictionaryCompressor *compressor); extern void dictionary_compressor_append(DictionaryCompressor *compressor, Datum val); -extern DictionaryCompressed *dictionary_compressor_finish(DictionaryCompressor *compressor); +extern void *dictionary_compressor_finish(DictionaryCompressor *compressor); extern DecompressionIterator * tsl_dictionary_decompression_iterator_from_datum_forward(Datum dictionary_compressed, @@ -54,6 +55,7 @@ extern Datum tsl_dictionary_compressor_finish(PG_FUNCTION_ARGS); .iterator_init_reverse = tsl_dictionary_decompression_iterator_from_datum_reverse, \ .compressed_data_send = dictionary_compressed_send, \ .compressed_data_recv = dictionary_compressed_recv, \ + .compressor_for_type = dictionary_compressor_for_type \ } #endif diff --git a/tsl/src/compression/gorilla.c b/tsl/src/compression/gorilla.c index 83d872fee..e45808361 100644 --- a/tsl/src/compression/gorilla.c +++ b/tsl/src/compression/gorilla.c @@ -100,6 +100,12 @@ typedef struct GorillaCompressor bool has_nulls; } GorillaCompressor; +typedef struct ExtendedCompressor +{ + Compressor base; + GorillaCompressor *internal; +} ExtendedCompressor; + typedef struct GorillaDecompressionIterator { DecompressionIterator base; @@ -203,6 +209,131 @@ pg_rightmost_one_pos64(uint64 word) *** Compressor *** ********************/ +static void +gorilla_compressor_append_float(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + uint64 value = float_get_bits(DatumGetFloat4(val)); + if (extended->internal == NULL) + extended->internal = gorilla_compressor_alloc(); + + gorilla_compressor_append_value(extended->internal, value); +} + +static void +gorilla_compressor_append_double(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + uint64 value = double_get_bits(DatumGetFloat8(val)); + if (extended->internal == NULL) + extended->internal = gorilla_compressor_alloc(); + + gorilla_compressor_append_value(extended->internal, value); +} + +static void +gorilla_compressor_append_int16(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = gorilla_compressor_alloc(); + + gorilla_compressor_append_value(extended->internal, (uint16) DatumGetInt16(val)); +} + +static void +gorilla_compressor_append_int32(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = gorilla_compressor_alloc(); + + gorilla_compressor_append_value(extended->internal, (uint32) DatumGetInt32(val)); +} + +static void +gorilla_compressor_append_int64(Compressor *compressor, Datum val) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = gorilla_compressor_alloc(); + + gorilla_compressor_append_value(extended->internal, DatumGetInt64(val)); +} + +static void +gorilla_compressor_append_null_value(Compressor *compressor) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + if (extended->internal == NULL) + extended->internal = gorilla_compressor_alloc(); + + gorilla_compressor_append_null(extended->internal); +} + +static void * +gorilla_compressor_finish_and_reset(Compressor *compressor) +{ + ExtendedCompressor *extended = (ExtendedCompressor *) compressor; + void *compressed = gorilla_compressor_finish(extended->internal); + pfree(extended->internal); + extended->internal = NULL; + return compressed; +} + +const Compressor gorilla_float_compressor = { + .append_val = gorilla_compressor_append_float, + .append_null = gorilla_compressor_append_null_value, + .finish = gorilla_compressor_finish_and_reset, +}; + +const Compressor gorilla_double_compressor = { + .append_val = gorilla_compressor_append_double, + .append_null = gorilla_compressor_append_null_value, + .finish = gorilla_compressor_finish_and_reset, +}; +const Compressor gorilla_uint16_compressor = { + .append_val = gorilla_compressor_append_int16, + .append_null = gorilla_compressor_append_null_value, + .finish = gorilla_compressor_finish_and_reset, +}; +const Compressor gorilla_uint32_compressor = { + .append_val = gorilla_compressor_append_int32, + .append_null = gorilla_compressor_append_null_value, + .finish = gorilla_compressor_finish_and_reset, +}; +const Compressor gorilla_uint64_compressor = { + .append_val = gorilla_compressor_append_int64, + .append_null = gorilla_compressor_append_null_value, + .finish = gorilla_compressor_finish_and_reset, +}; + +Compressor * +gorilla_compressor_for_type(Oid element_type) +{ + ExtendedCompressor *compressor = palloc(sizeof(*compressor)); + switch (element_type) + { + case FLOAT4OID: + *compressor = (ExtendedCompressor){ .base = gorilla_float_compressor }; + return &compressor->base; + case FLOAT8OID: + *compressor = (ExtendedCompressor){ .base = gorilla_double_compressor }; + return &compressor->base; + case INT2OID: + *compressor = (ExtendedCompressor){ .base = gorilla_uint16_compressor }; + return &compressor->base; + case INT4OID: + *compressor = (ExtendedCompressor){ .base = gorilla_uint32_compressor }; + return &compressor->base; + case INT8OID: + *compressor = (ExtendedCompressor){ .base = gorilla_uint64_compressor }; + return &compressor->base; + default: + elog(ERROR, "invalid type for Gorilla compression %d", element_type); + } +} + GorillaCompressor * gorilla_compressor_alloc(void) { @@ -261,30 +392,41 @@ gorilla_compressor_append_null(GorillaCompressor *compressor) void gorilla_compressor_append_value(GorillaCompressor *compressor, uint64 val) { + bool has_values; uint64 xor = compressor->prev_val ^ val; simple8brle_compressor_append(&compressor->nulls, 0); - if (xor == 0) + /* for the first value we store the bitsize even if the xor is all zeroes, + * this ensures that the bits-per-xor isn't empty, and that we can calculate + * the remaining offsets correctly. + */ + has_values = !simple8brle_compressor_is_empty(&compressor->bits_used_per_xor); + + if (has_values && xor == 0) simple8brle_compressor_append(&compressor->tag0s, 0); else { - int leading_zeros = 63 - pg_leftmost_one_pos64(xor); - int trailing_zeros = pg_rightmost_one_pos64(xor); + /* leftmost/rightmost 1 is not well-defined when all the bits in the number + * are 0; the C implementations of these functions will ERROR, while the + * assembly versions may return any value. We special-case 0 to to use + * values for leading and trailing-zeroes that we know will work. + */ + int leading_zeros = xor != 0 ? 63 - pg_leftmost_one_pos64(xor) : 63; + int trailing_zeros = xor != 0 ? pg_rightmost_one_pos64(xor) : 1; /* TODO this can easily get stuck with a bad value for trailing_zeroes * we use a new trailing_zeroes if th delta is too large, but the * threshold was picked in a completely unprincipled manner. * Needs benchmarking */ - bool reuse_bitsizes = leading_zeros >= compressor->prev_leading_zeroes && + bool reuse_bitsizes = has_values && leading_zeros >= compressor->prev_leading_zeroes && trailing_zeros >= compressor->prev_trailing_zeros && - (leading_zeros - compressor->prev_leading_zeroes) + - (trailing_zeros - compressor->prev_trailing_zeros) <= - 12; + ((leading_zeros - compressor->prev_leading_zeroes) + + (trailing_zeros - compressor->prev_trailing_zeros) <= + 12); uint8 num_bits_used; simple8brle_compressor_append(&compressor->tag0s, 1); simple8brle_compressor_append(&compressor->tag1s, reuse_bitsizes ? 0 : 1); - if (!reuse_bitsizes) { compressor->prev_leading_zeroes = leading_zeros; @@ -357,7 +499,7 @@ compressed_gorilla_data_serialize(CompressedGorillaData *input) return compressed; } -GorillaCompressed * +void * gorilla_compressor_finish(GorillaCompressor *compressor) { GorillaCompressed header = { @@ -367,11 +509,22 @@ gorilla_compressor_finish(GorillaCompressor *compressor) }; CompressedGorillaData data = { .header = &header }; data.tag0s = simple8brle_compressor_finish(&compressor->tag0s); + if (data.tag0s == NULL) + return NULL; + data.tag1s = simple8brle_compressor_finish(&compressor->tag1s); + Assert(data.tag1s != NULL); data.leading_zeros = compressor->leading_zeros; + /* if all elements in the compressed are the same, there will be no xors, + * and thus bits_used_per_xor will be empty. Since we need to store the header + * to get the sizing right, we force at least one bits_used_per_xor to be created + * in append, above + */ data.num_bits_used_per_xor = simple8brle_compressor_finish(&compressor->bits_used_per_xor); + Assert(data.num_bits_used_per_xor != NULL); data.xors = compressor->xors; data.nulls = simple8brle_compressor_finish(&compressor->nulls); + Assert(compressor->has_nulls || data.nulls != NULL); return compressed_gorilla_data_serialize(&data); } @@ -381,10 +534,15 @@ tsl_gorilla_compressor_finish(PG_FUNCTION_ARGS) { GorillaCompressor *compressor = (GorillaCompressor *) (PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0)); + void *compressed; if (compressor == NULL) PG_RETURN_NULL(); - PG_RETURN_POINTER(gorilla_compressor_finish(compressor)); + compressed = gorilla_compressor_finish(compressor); + if (compressed == NULL) + PG_RETURN_NULL(); + + PG_RETURN_POINTER(compressed); } /******************************* @@ -430,9 +588,9 @@ compressed_gorilla_data_init_from_pointer(CompressedGorillaData *expanded, static void compressed_gorilla_data_init_from_datum(CompressedGorillaData *data, Datum gorilla_compressed) { - return compressed_gorilla_data_init_from_pointer(data, - (GorillaCompressed *) PG_DETOAST_DATUM( - gorilla_compressed)); + compressed_gorilla_data_init_from_pointer(data, + (GorillaCompressed *) PG_DETOAST_DATUM( + gorilla_compressed)); } DecompressionIterator * diff --git a/tsl/src/compression/gorilla.h b/tsl/src/compression/gorilla.h index bf77b4e87..a339fefc4 100644 --- a/tsl/src/compression/gorilla.h +++ b/tsl/src/compression/gorilla.h @@ -72,10 +72,12 @@ typedef struct GorillaCompressor GorillaCompressor; typedef struct GorillaCompressed GorillaCompressed; typedef struct GorillaDecompressionIterator GorillaDecompressionIterator; +extern Compressor *gorilla_compressor_for_type(Oid element_type); + extern GorillaCompressor *gorilla_compressor_alloc(void); extern void gorilla_compressor_append_null(GorillaCompressor *compressor); extern void gorilla_compressor_append_value(GorillaCompressor *compressor, uint64 val); -extern GorillaCompressed *gorilla_compressor_finish(GorillaCompressor *compressor); +extern void *gorilla_compressor_finish(GorillaCompressor *compressor); extern DecompressionIterator * gorilla_decompression_iterator_from_datum_forward(Datum dictionary_compressed, Oid element_type); @@ -99,6 +101,7 @@ extern Datum tsl_gorilla_compressor_finish(PG_FUNCTION_ARGS); .iterator_init_reverse = gorilla_decompression_iterator_from_datum_reverse, \ .compressed_data_send = gorilla_compressed_send, \ .compressed_data_recv = gorilla_compressed_recv, \ + .compressor_for_type = gorilla_compressor_for_type, \ } #endif diff --git a/tsl/src/compression/simple8b_rle.h b/tsl/src/compression/simple8b_rle.h index 5496f3a30..b133e8045 100644 --- a/tsl/src/compression/simple8b_rle.h +++ b/tsl/src/compression/simple8b_rle.h @@ -133,26 +133,27 @@ typedef struct Simple8bRleDecompressResult bool is_done; } Simple8bRleDecompressResult; -static void simple8brle_compressor_init(Simple8bRleCompressor *compressor); -static Simple8bRleSerialized *simple8brle_compressor_finish(Simple8bRleCompressor *compressor); -static void simple8brle_compressor_append(Simple8bRleCompressor *compressor, uint64 val); +static inline void simple8brle_compressor_init(Simple8bRleCompressor *compressor); +static inline Simple8bRleSerialized *simple8brle_compressor_finish(Simple8bRleCompressor *compressor); +static inline void simple8brle_compressor_append(Simple8bRleCompressor *compressor, uint64 val); +static inline bool simple8brle_compressor_is_empty(Simple8bRleCompressor *compressor); -static void simple8brle_decompression_iterator_init_forward(Simple8bRleDecompressionIterator *iter, +static inline void simple8brle_decompression_iterator_init_forward(Simple8bRleDecompressionIterator *iter, Simple8bRleSerialized *compressed); -static void simple8brle_decompression_iterator_init_reverse(Simple8bRleDecompressionIterator *iter, +static inline void simple8brle_decompression_iterator_init_reverse(Simple8bRleDecompressionIterator *iter, Simple8bRleSerialized *compressed); -static Simple8bRleDecompressResult +static inline Simple8bRleDecompressResult simple8brle_decompression_iterator_try_next_forward(Simple8bRleDecompressionIterator *iter); -static Simple8bRleDecompressResult +static inline Simple8bRleDecompressResult simple8brle_decompression_iterator_try_next_reverse(Simple8bRleDecompressionIterator *iter); -static void simple8brle_serialized_send(StringInfo buffer, const Simple8bRleSerialized *data); -static char *bytes_serialize_simple8b_and_advance(char *dest, size_t expected_size, +static inline void simple8brle_serialized_send(StringInfo buffer, const Simple8bRleSerialized *data); +static inline char *bytes_serialize_simple8b_and_advance(char *dest, size_t expected_size, const Simple8bRleSerialized *data); -static Simple8bRleSerialized *bytes_deserialize_simple8b_and_advance(const char **data); -static size_t simple8brle_serialized_slot_size(const Simple8bRleSerialized *data); -static size_t simple8brle_serialized_total_size(const Simple8bRleSerialized *data); -static size_t simple8brle_compressor_compressed_size(Simple8bRleCompressor *compressor); +static inline Simple8bRleSerialized *bytes_deserialize_simple8b_and_advance(const char **data); +static inline size_t simple8brle_serialized_slot_size(const Simple8bRleSerialized *data); +static inline size_t simple8brle_serialized_total_size(const Simple8bRleSerialized *data); +static inline size_t simple8brle_compressor_compressed_size(Simple8bRleCompressor *compressor); /********************* *** Private API *** @@ -303,6 +304,12 @@ simple8brle_compressor_append(Simple8bRleCompressor *compressor, uint64 val) compressor->num_uncompressed_elements += 1; } +static bool +simple8brle_compressor_is_empty(Simple8bRleCompressor *compressor) +{ + return compressor->num_elements == 0; +} + static size_t simple8brle_compressor_compressed_size(Simple8bRleCompressor *compressor) { @@ -357,12 +364,12 @@ simple8brle_compressor_finish(Simple8bRleCompressor *compressor) uint64 bits; simple8brle_compressor_flush(compressor); - Assert(compressor->last_block_set); - simple8brle_compressor_push_block(compressor, compressor->last_block); - if (compressor->num_elements == 0) return NULL; + Assert(compressor->last_block_set); + simple8brle_compressor_push_block(compressor, compressor->last_block); + compressed_size = simple8brle_compressor_compressed_size(compressor); /* we use palloc0 despite initializing the entire structure, * to ensure padding bits are zeroed, and that there's a 0 seletor at the end. diff --git a/tsl/test/expected/compress_table.out b/tsl/test/expected/compress_table.out new file mode 100644 index 000000000..f98061e79 --- /dev/null +++ b/tsl/test/expected/compress_table.out @@ -0,0 +1,274 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE OR REPLACE FUNCTION ts_compress_table(in_table REGCLASS, out_table REGCLASS, compression_info _timescaledb_catalog.hypertable_compression[]) + RETURNS VOID + AS :TSL_MODULE_PATHNAME LANGUAGE C STRICT VOLATILE; +\ir include/compression_utils.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +SELECT * FROM pg_type WHERE typname='hypertable_compression'; + typname | typnamespace | typowner | typlen | typbyval | typtype | typcategory | typispreferred | typisdefined | typdelim | typrelid | typelem | typarray | typinput | typoutput | typreceive | typsend | typmodin | typmodout | typanalyze | typalign | typstorage | typnotnull | typbasetype | typtypmod | typndims | typcollation | typdefaultbin | typdefault | typacl +------------------------+--------------+----------+--------+----------+---------+-------------+----------------+--------------+----------+----------+---------+----------+-----------+------------+-------------+-------------+----------+-----------+------------+----------+------------+------------+-------------+-----------+----------+--------------+---------------+------------+-------- + hypertable_compression | 16390 | 16385 | -1 | f | c | C | f | t | , | 16666 | 0 | 16667 | record_in | record_out | record_recv | record_send | - | - | - | d | x | f | 0 | -1 | 0 | 0 | | | +(1 row) + +-- column name, algorithm, idx, asc, nulls_first +CREATE FUNCTION ord(TEXT, INT, INT, BOOL = true, BOOL = false) + RETURNS _timescaledb_catalog.hypertable_compression + AS $$ + SELECT (1, $1, $2::SMALLINT, -1, $3::SMALLINT, $4, $5)::_timescaledb_catalog.hypertable_compression + $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; +-- column name, idx, asc, nulls_first +CREATE FUNCTION seg(TEXT, INT, BOOL = true, BOOL = false) + RETURNS _timescaledb_catalog.hypertable_compression + AS $$ + SELECT (1, $1, 0, $2::SMALLINT, -1, $3, $4)::_timescaledb_catalog.hypertable_compression + $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; +-- column name, algorithm +CREATE FUNCTION com(TEXT, INT) + RETURNS _timescaledb_catalog.hypertable_compression + AS $$ + SELECT (1, $1, $2::SMALLINT, -1, -1, true, false)::_timescaledb_catalog.hypertable_compression + $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; +SELECT * FROM ord('time', 4, 0); + hypertable_id | attname | compression_algorithm_id | segmentby_column_index | orderby_column_index | orderby_asc | orderby_nullsfirst +---------------+---------+--------------------------+------------------------+----------------------+-------------+-------------------- + 1 | time | 4 | -1 | 0 | t | f +(1 row) + +CREATE TABLE uncompressed( + time INT, + device INT, + data INT, + floats FLOAT(26), + nulls TEXT, + texts TEXT); +CREATE TABLE compressed( + time _timescaledb_internal.compressed_data, + device INT, + data _timescaledb_internal.compressed_data, + floats _timescaledb_internal.compressed_data, + nulls _timescaledb_internal.compressed_data, + texts _timescaledb_internal.compressed_data); +\set DATA_IN uncompressed +\set DATA_OUT uncompressed +-- _INVALID_COMPRESSION_ALGORITHM = 0, +-- COMPRESSION_ALGORITHM_ARRAY = 1, +-- COMPRESSION_ALGORITHM_DICTIONARY = 2, +-- COMPRESSION_ALGORITHM_GORILLA = 3, +-- COMPRESSION_ALGORITHM_DELTADELTA = 4, +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('data', 4), com('floats', 3), com('nulls', 1), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +-- TODO NULL decompression doesn't quite work +\set DECOMPRESS_FORWARD_CMD _timescaledb_internal.decompress_forward(time::_timescaledb_internal.compressed_data, NULL::INT) t, device, _timescaledb_internal.decompress_forward(data::_timescaledb_internal.compressed_data, NULL::INT) d, _timescaledb_internal.decompress_forward(floats::_timescaledb_internal.compressed_data, NULL::FLOAT(26)) f, NULL, _timescaledb_internal.decompress_forward(texts::_timescaledb_internal.compressed_data, NULL::TEXT) e +INSERT INTO uncompressed + SELECT generate_series( 1, 5), d, d % 3, d / 3.0, NULL, d + FROM generate_series(1, 5) d; +INSERT INTO uncompressed + SELECT generate_series(6,10), d, d % 2, d / 2.0, NULL, d + FROM generate_series(1, 4) d; +INSERT INTO uncompressed + SELECT generate_series(11,15), d, d , d , NULL, d + FROM generate_series(1, 5) d; +INSERT INTO uncompressed + SELECT generate_series(16,20), d, d % 3, d / 3.0, NULL, d + FROM generate_series(1, 5) d; +\ir include/compress_table_test.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + ts_compress_table +------------------- + +(1 row) + + ?column? | count +-------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed forward (expect 0) | 0 +(1 row) + + time | device | data | floats | nulls | texts | t | device | d | f | ?column? | e +------+--------+------+--------+-------+-------+---+--------+---+---+----------+--- +(0 rows) + +TRUNCATE compressed; +-- test gorilla on ints +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('data', 3), com('floats', 3), com('nulls', 1), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +\ir include/compress_table_test.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + ts_compress_table +------------------- + +(1 row) + + ?column? | count +-------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed forward (expect 0) | 0 +(1 row) + + time | device | data | floats | nulls | texts | t | device | d | f | ?column? | e +------+--------+------+--------+-------+-------+---+--------+---+---+----------+--- +(0 rows) + +TRUNCATE compressed; +-- test Dictionary on everything +SELECT ARRAY[ord('time', 2, 0), seg('device', 0), com('data', 2), com('floats', 2), com('nulls', 2), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +\ir include/compress_table_test.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + ts_compress_table +------------------- + +(1 row) + + ?column? | count +-------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed forward (expect 0) | 0 +(1 row) + + time | device | data | floats | nulls | texts | t | device | d | f | ?column? | e +------+--------+------+--------+-------+-------+---+--------+---+---+----------+--- +(0 rows) + +TRUNCATE compressed; +-- test Array on everything +SELECT ARRAY[ord('time', 1, 0), seg('device', 0), com('data', 1), com('floats', 1), com('nulls', 1), com('texts', 1)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +\ir include/compress_table_test.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + ts_compress_table +------------------- + +(1 row) + + ?column? | count +-------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed forward (expect 0) | 0 +(1 row) + + time | device | data | floats | nulls | texts | t | device | d | f | ?column? | e +------+--------+------+--------+-------+-------+---+--------+---+---+----------+--- +(0 rows) + +TRUNCATE compressed; +--test reordering compression info +SELECT ARRAY[com('floats', 3), com('data', 4), seg('device', 0), ord('time', 4, 0), com('nulls', 1), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +\ir include/compress_table_test.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + ts_compress_table +------------------- + +(1 row) + + ?column? | count +-------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed forward (expect 0) | 0 +(1 row) + + time | device | data | floats | nulls | texts | t | device | d | f | ?column? | e +------+--------+------+--------+-------+-------+---+--------+---+---+----------+--- +(0 rows) + +TRUNCATE compressed; +-- test dropping a column +ALTER TABLE uncompressed DROP COLUMN data; +ALTER TABLE uncompressed DROP COLUMN nulls; +ALTER TABLE compressed DROP COLUMN data; +ALTER TABLE compressed DROP COLUMN nulls; +\set DECOMPRESS_FORWARD_CMD _timescaledb_internal.decompress_forward(time::_timescaledb_internal.compressed_data, NULL::INT) t, device, _timescaledb_internal.decompress_forward(floats::_timescaledb_internal.compressed_data, NULL::FLOAT(26)) f, _timescaledb_internal.decompress_forward(texts::_timescaledb_internal.compressed_data, NULL::TEXT) e +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('floats', 3), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +\ir include/compress_table_test.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + ts_compress_table +------------------- + +(1 row) + + ?column? | count +-------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed forward (expect 0) | 0 +(1 row) + + time | device | floats | texts | t | device | f | e +------+--------+--------+-------+---+--------+---+--- +(0 rows) + +TRUNCATE compressed; +-- test adding a column +ALTER TABLE uncompressed ADD COLUMN dat2 INT DEFAULT 1; +ALTER TABLE uncompressed ADD COLUMN ord INT DEFAULT 2; +ALTER TABLE compressed ADD COLUMN dat2 _timescaledb_internal.compressed_data; +ALTER TABLE compressed ADD COLUMN ord _timescaledb_internal.compressed_data; +\set DECOMPRESS_FORWARD_CMD _timescaledb_internal.decompress_forward(time::_timescaledb_internal.compressed_data, NULL::INT) t, device, _timescaledb_internal.decompress_forward(floats::_timescaledb_internal.compressed_data, NULL::FLOAT(26)) f, _timescaledb_internal.decompress_forward(texts::_timescaledb_internal.compressed_data, NULL::TEXT) e, _timescaledb_internal.decompress_forward(dat2::_timescaledb_internal.compressed_data, NULL::INT) d2, _timescaledb_internal.decompress_forward(ord::_timescaledb_internal.compressed_data, NULL::INT) o +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('floats', 3), com('texts', 2), ord('ord', 4, 1), com('dat2', 4)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +\ir include/compress_table_test.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + ts_compress_table +------------------- + +(1 row) + + ?column? | count +-------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed forward (expect 0) | 0 +(1 row) + + time | device | floats | texts | dat2 | ord | t | device | f | e | d2 | o +------+--------+--------+-------+------+-----+---+--------+---+---+----+--- +(0 rows) + +TRUNCATE compressed; +-- test skipping columns +CREATE TABLE missing_columns AS SELECT time, device, dat2 FROM uncompressed; +\set DATA_OUT missing_columns +\set DECOMPRESS_FORWARD_CMD _timescaledb_internal.decompress_forward(time::_timescaledb_internal.compressed_data, NULL::INT) t, device, _timescaledb_internal.decompress_forward(dat2::_timescaledb_internal.compressed_data, NULL::INT) d2 +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('dat2', 4)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +\ir include/compress_table_test.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + ts_compress_table +------------------- + +(1 row) + + ?column? | count +-------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed forward (expect 0) | 0 +(1 row) + + time | device | dat2 | t | device | d2 +------+--------+------+---+--------+---- +(0 rows) + +TRUNCATE compressed; +\set ON_ERROR_STOP 0 +-- test compressing a non-existent column +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('floats', 3), com('texts', 2), ord('ord', 4, 1), com('dat2', 4), com('fictional', 4)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +SELECT ts_compress_table(:'DATA_IN'::REGCLASS, 'compressed'::REGCLASS,:'COMPRESSION_INFO'::_timescaledb_catalog.hypertable_compression[]); +ERROR: could not find compressed column for "fictional" +TRUNCATE compressed; +\set ON_ERROR_STOP 1 +TRUNCATE uncompressed; diff --git a/tsl/test/expected/compression_algos.out b/tsl/test/expected/compression_algos.out index f6170aa80..b3868ee69 100644 --- a/tsl/test/expected/compression_algos.out +++ b/tsl/test/expected/compression_algos.out @@ -407,7 +407,7 @@ CREATE TABLE base_doubles AS SELECT row_number() OVER() as rn, item::double prec \set ECHO errors compressed size ----------------- - 1760 + 1808 (1 row) ?column? | count @@ -431,9 +431,9 @@ CREATE TABLE base_doubles AS SELECT row_number() OVER() as rn, item::double prec (1 row) SELECT c gorilla_text FROM compressed; - gorilla_text ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - AwBAe9AAAAAAAAAAA+gAAAABAAAAAAAAAA8AAD6AAAAAAQAAA+gAAAAEAAAAAAAA8fEAAAAAAAAACwAAHAAAAAAAAAAAAAAAGFsAABqAAAAAAAAAAAE8AgwgowEEggEAAAAKAAAAAQAAAAAAAAAFAAGpYk9HNZIAAADMA2Xo3137ZgMxsznBPTnx4o+sCeCD8f+x/nYgnrDwB1A6d5Ey7F0O5sKQ5v83x32fHAQIBoeXQBgZC+3/gAvppD9up1NZ8jCBDOEPTnuSBSBdGE6R97ge/YE9oBwDuEwBsHkYbJICtnsUpH/QKN8h+EhW/xeYiuz2Cugb7x9p6bAJoywd7BVwGwHxlzF2Hh2ZDmLHfEQjAU31gqQ1YYSDuEQKSE0OAXPYAxLgYwWAKuDq9xfWv3ILvJef/d/oaD5fPA1HaDzB1AxB5r455yCAebs0gMsTeCwD50YCoAq9pEkN+BdA9QGvwj8DuwLu/IfsPeYXjAAI71UEYqJ412c64g9+Lvqf7cMoTgxIGcWQXUwibag7oYQHfp3waGHAI+vSGRyQCHk73F5iyZ099PlFtMauj9RKIrIVUEx8q9KfIfSJvgHAN7px3o8QB48tEEiinttHYIL+H5KDpgc7YCvPZ2rzAB9QTJ9szEGKErd8gAPDPInuEJH9eBcVUBpADvsQZv4wEH6uKmOo5lMQDuNCXkQwIc30sD28m6Q6hAiwYgFElAXIJr7sAFCA7ICVAvX9MK/RPn39bC8er4pdRg52OTC4L4B/5w/JgM63/swEwKL9gXnwl+E89DKFkDtgo35UIoJTsfwJ9fgW7ujvvKPYPtXX0AkDgD5gMbYYffnvwruFq1d5Xd4yhyCEFLjNnDIC/8j30GW9jBLvke/uW3DPxz7bjLzqC6aRi5dRAZv1H72DzhEKCHHdcu0eUAnvMnrsHUHH4u8Ob0iCf1+z/foEQCYdAJ46CB8vF4DLaIeH1gNofCWGSnXcfroRg0QKoEsGi5N6pB6g/w2wiCLzm6EZ3sFpvV/hjLjKAJACQBvXKdcPKBM/8y+0mGkvM4Fb+H8x9aBi9tfwPnn5PIu8sHxq7+GA5/0e/wcvn/Y4HQJ5174G/EByAeP2QMYBgPUAP09HgZ4yYBze0PJMCfoX32Cp1ICOCCfMemYGDup/AIQXWD9Al7kCfP8gX3jJMgbRwoCahIC6B8wOIb7aQDT9QOI6Sgl7RgjQyvrn6fisMV5tC8cKDAa3wEPrjynUL6EC88A6lfaFRjfgq4SlJb2V98C6eduCA4Z/3cf+mVhFI+L+1s/fA5Oc3CDY/pCsYWQCxAMH/ALYMoPKFX6PFX8Hf6QloYT0BeDIj7p/KlkDjf9vyunEAY0QWgp8Qwe4CsBReeHQv64b4ImHpBRB81GbXNGh6Tjmqz40OS6KAOBIXYEugSAits27WH8Bkhh/Vf7BonOQNQRgMOCBDZ/FKSEIGeDI78jpcaRwf6Zi0DYhpfuIKj9XTwPKCYDHBfjoCkg7A8BQhaIch0cBdhcfXggQ4nsL2B/Prf75FDD/vsd6QRNgi/3071t4o8A/SRd/7xinZAJp+AsgOkCkfqA2cegBvr8FQdsZN3g87HriLgx6jtaj08CoPd0v5B/n/lQlgbfzYDZ9rCl7+8wf3AgIBBBQdwGl6Q9uFANIF8DaB2P6fz4IcFv9qHX/CiguUYQgZXODrYHxCACvDgbkdkAVCHXvL3d/gA9/a9k8Ewch0iTmYUPJ097YAS9PgUwr3aTSg5oSMMyEKMcQsIhPIILPcX5r9yBjC0egRqdj/YYZbz/V/hT4p+mUS5eA9AUofEJHuYP8Ap+tjqOAFtPmu9khoIQMgCS+zDCBhBOPygIEJZU4D8Kg53fdFAD7vIAcHcnwH+qFAC3+hv9gRg0gfr9bzLRVsvAqAtMjgju2vNX4DxrdVq2AgahtChug1lqNVRwgz15DDwAGQf6qAHvJkRYaflMXEy4gZtidaOnBVTPKBr/8HP9miHgJFnDlmZAGB94J2EgDshHNPRfQOAF4KYG4nkOo89cSuhZ9AkIXe9Tci54BjAQB5wlw2vQIc8LiH+CR0MDvAOP3gYPLofPCocIExs+B6BEfFIOQ7nRPt/YDa4HUoup/BO39wGnM4DpyVDEFJ5NiPvF4QgVoRr1qnUsrDMAOctghAfz5/lf+8Au6tJG///cQWQFje1zMHB8p+nd4FQK8vWoJG6mQv7SGk8PXm5iiNAcn7TDnv/xT9gBMgoGHfU/WKcBMcheUf+dF7BK/uAA/34+7pwG7oJwE0KEU+8BSwQv+z5Pq8PPpkPWDejWZSfmYVoF5fAf1zDCMBl8AAAAAAAAAAA== + gorilla_text +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + AwBAe9AAAAAAAAAAA+gAAAABAAAAAAAAAA8AAD6AAAAAAQAAA+gAAAAEAAAAAAAA8f8AAAQQAAAAAQAAG/AAAAAAAAAAAAAAGFsAABqAAAAAAAAAAAcwggooggggggGIIIIIIINMICETDCCCCyCCggooohAgwo0IINIIIIONIDCSSiDEDjDTAAAgwgowEEgAAABIAAAABQAAAAAABURFBrWoSpTWsZLd2bx925R8edzHeqXWhNKZ2TdVfcjd3WcGpYk9HNOs6QAAAMg0ZejfXftmAzFj/WfPPef8X/X/XqufB+P/kcu7vc2E7EGeb/N8d9n+P/++mn+B3y757qdTc+cMO+/sDwh6c9yabt/ee8PSPvf38J84bk1v2L7sFUj/oFG+Q2ASrf4vMRXZNgPQN94+09MyHUZYO9gq4Jvr4y5i7Dw7cIjFjviIRgKwBwVIasMJB8HUFJCaHALneS4lwMYLAFW+ee4vrX7kFxiDP/u/0NB883YajtB5g6gHz818c85BAJIaaQGWJvBYhH6MBUAVe0jYe/AugeoDX6oIB3YF3fkPxB/MLxgAEd5QnMVE8a7OdZhE/F31P9uG/ToYkDOLILqkMttQd0MIDrzF4NDDgEfXaYw5IBDyd7gqoZM6e+nyi+kTXR+olEVkvR6Y+VelPkNFPXwDgG904yUGIA8eWiCRztW2jsEF/D/ZmUwOdsBXngeH5gA+oJk+8C6DFCVu+QAgzHkT3CEj+sdQKqA0gB32vIn8YCD9XFR5N8ymIB3GhAKIYEOb6WB7AKFIdQgRYMT6YSgLkE192F49AdkBKgXrcmFfonz7+tiTAV8Uuowc7EX7cF8A/84f6GWdb/2YCYGoRQLz4S/CefAtCyB2wUb8fasEp2P4E+vAY93R33lHsHcLr6ASBwB8DkFsMPvz34UF/1au8ru8ZSXfCClxmzhkjn2R76DLexgjFyPf3Lbhn3sHtxl51BdN5dsuogM36j/YO5wiFBDjupEEPKAT3mT1gEyDj8XeHN5eLv6/Z/v0CAbQOgE8dBA+/XUBltEPD6wNFvhLDJTruBtgIwaIFUCWvYMm9Ug9Qf6UARBF5zdCM64e03q/wxlxMNMgBIA3rlNj6lAmf+ZfafPyXmcCt/D+38NAxe2v4HxfPnkXeWD41XwNAc/6Pf4OgY3scDoE86+PA/iA5APH7OSYAwHqAH6eqQE8ZMA5vaEMHBP0L77BU36BHBBPmPTMvvHU/gEILrA1CS9yBPn+QH21kmQNo4UBlBMBdA+YHEPP0oBp+oHEdI4U9owRoZX1HlPxWGK82hd1KhgNb4CH1wlLqF9CBeeA87ftCoxvwVf9Mkt7K++BdJ+/BAcM/7uP/SCwikfF/a0P+AcnObhBsf0fWMLIBYgGQwgFsGUHlCr+VCr+Dv9IS9OJ6AvBkR90hg6yBxv+35WhfgMaILQU+CiDcBWAovPD0nFcN8ETD0gUAeajNrmjQ0BFzVZ8aHJdJDHAkLsCXQIga22bdrD+Az+K/qv9g0Tn35AIwGHBAhvFoFJCEDPBkX6v0uNI4P9MC/BsQ0v3EFShCp4HlBMBji4/0BSQdgeAsD9EOQ6OAuz/fLwQIcT2F/vpn1v98ihhki+O9IImwRfT8d628UeAfkBs/94xTsgEg7YWQHSBSP3EXOPQA31+CoFQMm7wedj1/KgY9R2tR6f7WHu6X8g/zxAQSwNv5sBs0h5S9/eYP7i0DgggoO4DS7f63CgGkC+BowjH9P58EODiEVDr/hRQXIAqQMrnB1sDAB4BXhwNyOxDpBDr3l7u/72w/teyeCYOu0hJzMKHk6cIUAJenwKYVwWfpQc0JGGZFo+OIWEQnkEy3uL81+5AxtMpQI1Ox/sM+IR/q/wp8U9bHJcvAegKULJCj3MH+AU/mGFHAC2nzXcIS0EIGQBJfe+7AwgnH5QEO5MqcB+FQc79DCgB93kAOH634D/VCgBbBaf+wIwaQP3wH5loq2XgVFDaRwR3bXmrqjg1uq1bAQMMghQ3Qay1Giw0QZ68hh4AzbH9VAD3kyKUDPymLiZcQPATOtHTgqpnD71/+Dn+zRCaeizhyzMgDANwE7CQB2QjdCwvoHAC8FO5FzyHUeeuJRLg+gSELvepP8E8AxgIA87vA7XoEOeFxIQJI6GB3gHHKQcHl0PnhUMG142fA9AiPtv7Idzon2/sqGMDqUXU/gnwhYDTmcB05JZWCk8mxH3iA/gK0I161Tp1aBmAHOWwQgLH8/yv/eAX9O8jf//uILISNva5mDg+U4eu8CoFeXrU2mFTIX9pDSeZBTcxRGgOT4CZz3/4p+wA2CUDDvqfrFN3TuQvKP/Oi0Ipf3AAf78fnyYDd0E4CaEG9feApYIX/QLz1eHn0yHrDL5rMpPzMK0AAPgP65hhGA== (1 row) DROP TABLE base_doubles; @@ -457,7 +457,40 @@ CREATE TABLE base_doubles AS SELECT row_number() over () as rn, item FROM \set ECHO errors compressed size ----------------- - 136 + 152 +(1 row) + + ?column? | count +-------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed forward (expect 0) | 0 +(1 row) + + ?column? | count +--------------------------------------------------------------------------------+------- + Number of rows different between original and decompressed reversed (expect 0) | 0 +(1 row) + + ?column? | count +----------------------------------------------------------------------------------------------------+------- + Number of rows different between original, decompressed, and decompressed deserializeed (expect 0) | 0 +(1 row) + + ?column? | ?column? +-----------------------------------------------------------------------------------------------------+---------- + Test that deserialization, decompression, recompression, and serialization results in the same text | t +(1 row) + +DROP TABLE base_doubles; +-- all 0s +CREATE TABLE base_doubles AS SELECT row_number() over () as rn, 0::FLOAT(50) as item FROM (SELECT generate_series(1, 1000) ) j; +\ir include/compression_test.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + compressed size +----------------- + 160 (1 row) ?column? | count diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 17fdcf63c..6d6d63ae2 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -11,6 +11,7 @@ set(TEST_FILES ) set(TEST_FILES_DEBUG + compression_table.sql compression_algos.sql bgw_policy.sql bgw_reorder_drop_chunks.sql diff --git a/tsl/test/sql/compress_table.sql b/tsl/test/sql/compress_table.sql new file mode 100644 index 000000000..a9b390fec --- /dev/null +++ b/tsl/test/sql/compress_table.sql @@ -0,0 +1,157 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE OR REPLACE FUNCTION ts_compress_table(in_table REGCLASS, out_table REGCLASS, compression_info _timescaledb_catalog.hypertable_compression[]) + RETURNS VOID + AS :TSL_MODULE_PATHNAME LANGUAGE C STRICT VOLATILE; +\ir include/compression_utils.sql +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER + +SELECT * FROM pg_type WHERE typname='hypertable_compression'; + +-- column name, algorithm, idx, asc, nulls_first +CREATE FUNCTION ord(TEXT, INT, INT, BOOL = true, BOOL = false) + RETURNS _timescaledb_catalog.hypertable_compression + AS $$ + SELECT (1, $1, $2::SMALLINT, -1, $3::SMALLINT, $4, $5)::_timescaledb_catalog.hypertable_compression + $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +-- column name, idx, asc, nulls_first +CREATE FUNCTION seg(TEXT, INT, BOOL = true, BOOL = false) + RETURNS _timescaledb_catalog.hypertable_compression + AS $$ + SELECT (1, $1, 0, $2::SMALLINT, -1, $3, $4)::_timescaledb_catalog.hypertable_compression + $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +-- column name, algorithm +CREATE FUNCTION com(TEXT, INT) + RETURNS _timescaledb_catalog.hypertable_compression + AS $$ + SELECT (1, $1, $2::SMALLINT, -1, -1, true, false)::_timescaledb_catalog.hypertable_compression + $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +SELECT * FROM ord('time', 4, 0); + +CREATE TABLE uncompressed( + time INT, + device INT, + data INT, + floats FLOAT(26), + nulls TEXT, + texts TEXT); + +CREATE TABLE compressed( + time _timescaledb_internal.compressed_data, + device INT, + data _timescaledb_internal.compressed_data, + floats _timescaledb_internal.compressed_data, + nulls _timescaledb_internal.compressed_data, + texts _timescaledb_internal.compressed_data); + +\set DATA_IN uncompressed +\set DATA_OUT uncompressed + +-- _INVALID_COMPRESSION_ALGORITHM = 0, +-- COMPRESSION_ALGORITHM_ARRAY = 1, +-- COMPRESSION_ALGORITHM_DICTIONARY = 2, +-- COMPRESSION_ALGORITHM_GORILLA = 3, +-- COMPRESSION_ALGORITHM_DELTADELTA = 4, + +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('data', 4), com('floats', 3), com('nulls', 1), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset + +-- TODO NULL decompression doesn't quite work +\set DECOMPRESS_FORWARD_CMD _timescaledb_internal.decompress_forward(time::_timescaledb_internal.compressed_data, NULL::INT) t, device, _timescaledb_internal.decompress_forward(data::_timescaledb_internal.compressed_data, NULL::INT) d, _timescaledb_internal.decompress_forward(floats::_timescaledb_internal.compressed_data, NULL::FLOAT(26)) f, NULL, _timescaledb_internal.decompress_forward(texts::_timescaledb_internal.compressed_data, NULL::TEXT) e + +INSERT INTO uncompressed + SELECT generate_series( 1, 5), d, d % 3, d / 3.0, NULL, d + FROM generate_series(1, 5) d; + +INSERT INTO uncompressed + SELECT generate_series(6,10), d, d % 2, d / 2.0, NULL, d + FROM generate_series(1, 4) d; + +INSERT INTO uncompressed + SELECT generate_series(11,15), d, d , d , NULL, d + FROM generate_series(1, 5) d; + +INSERT INTO uncompressed + SELECT generate_series(16,20), d, d % 3, d / 3.0, NULL, d + FROM generate_series(1, 5) d; + +\ir include/compress_table_test.sql +TRUNCATE compressed; + +-- test gorilla on ints +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('data', 3), com('floats', 3), com('nulls', 1), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset + +\ir include/compress_table_test.sql +TRUNCATE compressed; + +-- test Dictionary on everything +SELECT ARRAY[ord('time', 2, 0), seg('device', 0), com('data', 2), com('floats', 2), com('nulls', 2), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset + +\ir include/compress_table_test.sql +TRUNCATE compressed; + +-- test Array on everything +SELECT ARRAY[ord('time', 1, 0), seg('device', 0), com('data', 1), com('floats', 1), com('nulls', 1), com('texts', 1)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset + +\ir include/compress_table_test.sql +TRUNCATE compressed; + +--test reordering compression info +SELECT ARRAY[com('floats', 3), com('data', 4), seg('device', 0), ord('time', 4, 0), com('nulls', 1), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset +\ir include/compress_table_test.sql +TRUNCATE compressed; + +-- test dropping a column +ALTER TABLE uncompressed DROP COLUMN data; +ALTER TABLE uncompressed DROP COLUMN nulls; +ALTER TABLE compressed DROP COLUMN data; +ALTER TABLE compressed DROP COLUMN nulls; + +\set DECOMPRESS_FORWARD_CMD _timescaledb_internal.decompress_forward(time::_timescaledb_internal.compressed_data, NULL::INT) t, device, _timescaledb_internal.decompress_forward(floats::_timescaledb_internal.compressed_data, NULL::FLOAT(26)) f, _timescaledb_internal.decompress_forward(texts::_timescaledb_internal.compressed_data, NULL::TEXT) e + +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('floats', 3), com('texts', 2)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset + +\ir include/compress_table_test.sql +TRUNCATE compressed; + +-- test adding a column +ALTER TABLE uncompressed ADD COLUMN dat2 INT DEFAULT 1; +ALTER TABLE uncompressed ADD COLUMN ord INT DEFAULT 2; +ALTER TABLE compressed ADD COLUMN dat2 _timescaledb_internal.compressed_data; +ALTER TABLE compressed ADD COLUMN ord _timescaledb_internal.compressed_data; + +\set DECOMPRESS_FORWARD_CMD _timescaledb_internal.decompress_forward(time::_timescaledb_internal.compressed_data, NULL::INT) t, device, _timescaledb_internal.decompress_forward(floats::_timescaledb_internal.compressed_data, NULL::FLOAT(26)) f, _timescaledb_internal.decompress_forward(texts::_timescaledb_internal.compressed_data, NULL::TEXT) e, _timescaledb_internal.decompress_forward(dat2::_timescaledb_internal.compressed_data, NULL::INT) d2, _timescaledb_internal.decompress_forward(ord::_timescaledb_internal.compressed_data, NULL::INT) o + +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('floats', 3), com('texts', 2), ord('ord', 4, 1), com('dat2', 4)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset + +\ir include/compress_table_test.sql +TRUNCATE compressed; + +-- test skipping columns +CREATE TABLE missing_columns AS SELECT time, device, dat2 FROM uncompressed; +\set DATA_OUT missing_columns + +\set DECOMPRESS_FORWARD_CMD _timescaledb_internal.decompress_forward(time::_timescaledb_internal.compressed_data, NULL::INT) t, device, _timescaledb_internal.decompress_forward(dat2::_timescaledb_internal.compressed_data, NULL::INT) d2 + +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('dat2', 4)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset + +\ir include/compress_table_test.sql +TRUNCATE compressed; + + +\set ON_ERROR_STOP 0 + +-- test compressing a non-existent column +SELECT ARRAY[ord('time', 4, 0), seg('device', 0), com('floats', 3), com('texts', 2), ord('ord', 4, 1), com('dat2', 4), com('fictional', 4)]::_timescaledb_catalog.hypertable_compression[] AS "COMPRESSION_INFO" \gset + +SELECT ts_compress_table(:'DATA_IN'::REGCLASS, 'compressed'::REGCLASS,:'COMPRESSION_INFO'::_timescaledb_catalog.hypertable_compression[]); +TRUNCATE compressed; + +\set ON_ERROR_STOP 1 + +TRUNCATE uncompressed; diff --git a/tsl/test/sql/compression_algos.sql b/tsl/test/sql/compression_algos.sql index 996f4a18b..e9ec6997e 100644 --- a/tsl/test/sql/compression_algos.sql +++ b/tsl/test/sql/compression_algos.sql @@ -172,6 +172,11 @@ CREATE TABLE base_doubles AS SELECT row_number() over () as rn, item FROM \ir include/compression_test.sql DROP TABLE base_doubles; +-- all 0s +CREATE TABLE base_doubles AS SELECT row_number() over () as rn, 0::FLOAT(50) as item FROM (SELECT generate_series(1, 1000) ) j; +\ir include/compression_test.sql +DROP TABLE base_doubles; + -- NULLs CREATE TABLE base_doubles AS SELECT row_number() OVER() as rn, NULLIF(i, 5)::DOUBLE PRECISION item FROM generate_series(1, 10) i; \ir include/compression_test.sql diff --git a/tsl/test/sql/include/compress_table_test.sql b/tsl/test/sql/include/compress_table_test.sql new file mode 100644 index 000000000..7dcd02049 --- /dev/null +++ b/tsl/test/sql/include/compress_table_test.sql @@ -0,0 +1,33 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\set ECHO errors + +TRUNCATE TABLE compressed; +SELECT ts_compress_table(:'DATA_IN'::REGCLASS, 'compressed'::REGCLASS, :'COMPRESSION_INFO'::_timescaledb_catalog.hypertable_compression[]); + +--test that decompression gives same result in forward direction +WITH original AS ( + SELECT * FROM :DATA_OUT ORDER BY device, time +), +decompressed AS ( + SELECT * FROM (SELECT :DECOMPRESS_FORWARD_CMD FROM compressed ORDER BY device) AS q +) +SELECT 'Number of rows different between original and decompressed forward (expect 0)', count(*) +FROM original +FULL OUTER JOIN decompressed ON (original.device = decompressed.device AND original.time = decompressed.t) +WHERE (original.*) IS DISTINCT FROM (decompressed.*); + +with original AS ( + SELECT * FROM :DATA_OUT AS q ORDER BY device, time +), +decompressed AS ( + SELECT * FROM (SELECT :DECOMPRESS_FORWARD_CMD FROM compressed) AS q +) +SELECT * +FROM original +FULL OUTER JOIN decompressed ON (original.device = decompressed.device AND original.time = decompressed.t) +WHERE (original.*) IS DISTINCT FROM (decompressed.*); + +\set ECHO all diff --git a/tsl/test/src/test_compression.c b/tsl/test/src/test_compression.c index 65f50932d..85d7d0129 100644 --- a/tsl/test/src/test_compression.c +++ b/tsl/test/src/test_compression.c @@ -6,13 +6,21 @@ #include +#include +#include #include #include -#include -#include #include +#include +#include +#include +#include +#include +#include +#include #include +#include #include #include "compression/array.h" @@ -21,7 +29,15 @@ #include "compression/deltadelta.h" #include "compression/utils.h" +#define VEC_PREFIX compression_info +#define VEC_ELEMENT_TYPE Form_hypertable_compression +#define VEC_DECLARE 1 +#define VEC_DEFINE 1 +#define VEC_SCOPE static inline +#include + TS_FUNCTION_INFO_V1(ts_test_compression); +TS_FUNCTION_INFO_V1(ts_compress_table); #define AssertInt64Eq(a, b) \ do \ @@ -250,7 +266,7 @@ test_gorilla_int() compressed = gorilla_compressor_finish(compressor); Assert(compressed != NULL); - AssertInt64Eq(VARSIZE(compressed), 1304); + AssertInt64Eq(VARSIZE(compressed), 1344); i = 0; iter = gorilla_decompression_iterator_from_datum_forward(PointerGetDatum(compressed), INT8OID); @@ -316,7 +332,7 @@ test_gorilla_float() compressed = gorilla_compressor_finish(compressor); Assert(compressed != NULL); - AssertInt64Eq(VARSIZE(compressed), 1168); + AssertInt64Eq(VARSIZE(compressed), 1200); i = 0; iter = @@ -354,7 +370,7 @@ test_gorilla_double() compressed = gorilla_compressor_finish(compressor); Assert(compressed != NULL); - AssertInt64Eq(VARSIZE(compressed), 1176); + AssertInt64Eq(VARSIZE(compressed), 1200); i = 0; iter = @@ -455,3 +471,63 @@ ts_test_compression(PG_FUNCTION_ARGS) test_delta2(); PG_RETURN_VOID(); } + +static compression_info_vec * +compression_info_from_array(ArrayType *compression_info_arr, Oid form_oid) +{ + ArrayMetaState compression_info_arr_meta = { + .element_type = form_oid, + }; + ArrayIterator compression_info_iter; + Datum compression_info_datum; + bool is_null; + compression_info_vec *compression_info = compression_info_vec_create(CurrentMemoryContext, 0); + TupleDesc form_desc = NULL; + + get_typlenbyvalalign(compression_info_arr_meta.element_type, + &compression_info_arr_meta.typlen, + &compression_info_arr_meta.typbyval, + &compression_info_arr_meta.typalign); + + compression_info_iter = + array_create_iterator(compression_info_arr, 0, &compression_info_arr_meta); + + while (array_iterate(compression_info_iter, &compression_info_datum, &is_null)) + { + HeapTupleHeader form; + HeapTupleData tmptup; + + Assert(!is_null); + form = DatumGetHeapTupleHeaderCopy(compression_info_datum); + Assert(HeapTupleHeaderGetTypeId(form) == form_oid); + if (form_desc == NULL) + { + int32 formTypmod = HeapTupleHeaderGetTypMod(form); + form_desc = lookup_rowtype_tupdesc(form_oid, formTypmod); + } + + tmptup.t_len = HeapTupleHeaderGetDatumLength(form); + tmptup.t_data = form; + compression_info_vec_append(compression_info, (void *) GETSTRUCT(&tmptup)); + } + if (form_desc != NULL) + ReleaseTupleDesc(form_desc); + return compression_info; +} + +Datum +ts_compress_table(PG_FUNCTION_ARGS) +{ + Oid in_table = PG_GETARG_OID(0); + Oid out_table = PG_GETARG_OID(1); + ArrayType *compression_info_array = DatumGetArrayTypeP(PG_GETARG_DATUM(2)); + compression_info_vec *compression_info = + compression_info_from_array(compression_info_array, compression_info_array->elemtype); + + compress_chunk(in_table, + out_table, + (const ColumnCompressionInfo **) compression_info->data, + compression_info->num_elements); + + PG_RETURN_VOID(); +}