From b9674600ae4771ec6552c6154fea30069515920f Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Fri, 23 Aug 2019 11:25:24 -0400 Subject: [PATCH] Add segment meta min/max Add the type for min/max segment meta object. Segment metadata objects keep metadata about data in segments (compressed rows). The min/max variant keeps the min and max values inside the compressed object. It will be used on compression order by columns to allow queries that have quals on those columns to be able to exclude entire segments if no uncompressed rows in the segment may match the qual. We also add generalized infrastructure for datum serialization / deserialization for arbitrary types to and from memory as well as binary strings. --- sql/CMakeLists.txt | 1 + sql/compression.sql | 18 + sql/pre_install/types.sql | 37 ++ sql/updates/latest-dev.sql | 39 +- src/CMakeLists.txt | 1 + src/compression_segment_meta_min_max.c | 107 +++++ src/cross_module_fn.c | 52 ++- src/cross_module_fn.h | 22 +- tsl/src/compression/CMakeLists.txt | 2 + tsl/src/compression/datum_serialize.c | 388 ++++++++++++++++++ tsl/src/compression/datum_serialize.h | 36 ++ tsl/src/compression/segment_meta.c | 258 ++++++++++++ tsl/src/compression/segment_meta.h | 39 ++ tsl/src/init.c | 6 + tsl/test/expected/compression_algos.out | 2 +- tsl/test/expected/compression_hypertable.out | 2 +- .../expected/compression_segment_meta.out | 275 +++++++++++++ tsl/test/sql/CMakeLists.txt | 1 + tsl/test/sql/compression_segment_meta.sql | 190 +++++++++ .../include/compression_test_segment_meta.sql | 28 ++ tsl/test/sql/include/rand_generator.sql | 2 +- tsl/test/src/test_compression.c | 51 +++ 22 files changed, 1540 insertions(+), 17 deletions(-) create mode 100644 sql/compression.sql create mode 100644 src/compression_segment_meta_min_max.c create mode 100644 tsl/src/compression/datum_serialize.c create mode 100644 tsl/src/compression/datum_serialize.h create mode 100644 tsl/src/compression/segment_meta.c create mode 100644 tsl/src/compression/segment_meta.h create mode 100644 tsl/test/expected/compression_segment_meta.out create mode 100644 tsl/test/sql/compression_segment_meta.sql create mode 100644 tsl/test/sql/include/compression_test_segment_meta.sql diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 3795f7e3c..b4e3269f5 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -21,6 +21,7 @@ set(IMMUTABLE_API_SOURCE_FILES set(SOURCE_FILES hypertable.sql chunk.sql + compression.sql ddl_internal.sql edition.sql util_time.sql diff --git a/sql/compression.sql b/sql/compression.sql new file mode 100644 index 000000000..5d7a07a25 --- /dev/null +++ b/sql/compression.sql @@ -0,0 +1,18 @@ +-- This file and its contents are licensed under the Apache License 2.0. +-- Please see the included NOTICE for copyright information and +-- LICENSE-APACHE for a copy of the license. + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_get_min(_timescaledb_internal.segment_meta_min_max, ANYELEMENT) + RETURNS ANYELEMENT + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_get_min' + LANGUAGE C IMMUTABLE; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_get_max(_timescaledb_internal.segment_meta_min_max, ANYELEMENT) + RETURNS ANYELEMENT + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_get_max' + LANGUAGE C IMMUTABLE; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_has_null(_timescaledb_internal.segment_meta_min_max) + RETURNS boolean + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_has_null' + LANGUAGE C IMMUTABLE; diff --git a/sql/pre_install/types.sql b/sql/pre_install/types.sql index 6a053455b..ccde98aa8 100644 --- a/sql/pre_install/types.sql +++ b/sql/pre_install/types.sql @@ -57,3 +57,40 @@ CREATE TYPE _timescaledb_internal.compressed_data ( RECEIVE = _timescaledb_internal.compressed_data_recv, SEND = _timescaledb_internal.compressed_data_send ); + +-- +-- _timescaledb_internal.segment_meta_min_max keeps the min/max range of compressed data +-- + +CREATE TYPE _timescaledb_internal.segment_meta_min_max; + +--the textual input/output is simply base64 encoding of the binary representation +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_in(CSTRING) + RETURNS _timescaledb_internal.segment_meta_min_max + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_in' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_out(_timescaledb_internal.segment_meta_min_max) + RETURNS CSTRING + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_out' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_send(_timescaledb_internal.segment_meta_min_max) + RETURNS BYTEA + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_send' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_recv(internal) + RETURNS _timescaledb_internal.segment_meta_min_max + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_recv' + LANGUAGE C IMMUTABLE STRICT; + +CREATE TYPE _timescaledb_internal.segment_meta_min_max ( + INTERNALLENGTH = VARIABLE, + STORAGE = EXTERNAL, --move to toast, don't compress + ALIGNMENT = DOUBLE, --needed for alignment to work with arbitrary datums + INPUT = _timescaledb_internal.segment_meta_min_max_in, + OUTPUT = _timescaledb_internal.segment_meta_min_max_out, + RECEIVE = _timescaledb_internal.segment_meta_min_max_recv, + SEND = _timescaledb_internal.segment_meta_min_max_send +); diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index 8c4583444..c5f095eef 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -211,5 +211,42 @@ insert into _timescaledb_catalog.compression_algorithm values ( 2, 1, 'COMPRESSION_ALGORITHM_DICTIONARY', 'dictionary'), ( 3, 1, 'COMPRESSION_ALGORITHM_GORILLA', 'gorilla'), ( 4, 1, 'COMPRESSION_ALGORITHM_DELTADELTA', 'deltadelta') -on conflict(id) do update set (version, name, description) +on conflict(id) do update set (version, name, description) = (excluded.version, excluded.name, excluded.description); + +-- +-- _timescaledb_internal.segment_meta_min_max keeps the min/max range of compressed data +-- + +CREATE TYPE _timescaledb_internal.segment_meta_min_max; + +--the textual input/output is simply base64 encoding of the binary representation +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_in(CSTRING) + RETURNS _timescaledb_internal.segment_meta_min_max + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_in' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_out(_timescaledb_internal.segment_meta_min_max) + RETURNS CSTRING + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_out' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_send(_timescaledb_internal.segment_meta_min_max) + RETURNS BYTEA + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_send' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_recv(internal) + RETURNS _timescaledb_internal.segment_meta_min_max + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_recv' + LANGUAGE C IMMUTABLE STRICT; + +CREATE TYPE _timescaledb_internal.segment_meta_min_max ( + INTERNALLENGTH = VARIABLE, + STORAGE = EXTERNAL, --move to toast, don't compress + ALIGNMENT = DOUBLE, --needed for alignment to work with arbitrary datums + INPUT = _timescaledb_internal.segment_meta_min_max_in, + OUTPUT = _timescaledb_internal.segment_meta_min_max_out, + RECEIVE = _timescaledb_internal.segment_meta_min_max_recv, + SEND = _timescaledb_internal.segment_meta_min_max_send +); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fc9e33758..ebef9e1c6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -18,6 +18,7 @@ set(SOURCES cross_module_fn.c copy.c compression_with_clause.c + compression_segment_meta_min_max.c dimension.c dimension_slice.c dimension_vector.c diff --git a/src/compression_segment_meta_min_max.c b/src/compression_segment_meta_min_max.c new file mode 100644 index 000000000..ecc296237 --- /dev/null +++ b/src/compression_segment_meta_min_max.c @@ -0,0 +1,107 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ + +#include + +#include "cross_module_fn.h" +#include "compat.h" +#include "base64_compat.h" +#include "license_guc.h" + +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_send); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_recv); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_out); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_in); + +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_get_min); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_get_max); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_has_null); + +Datum +ts_segment_meta_min_max_send(PG_FUNCTION_ARGS) +{ + Datum meta = PG_GETARG_DATUM(0); + PG_RETURN_DATUM(PointerGetDatum(ts_cm_functions->segment_meta_min_max_send(meta))); +} + +Datum +ts_segment_meta_min_max_recv(PG_FUNCTION_ARGS) +{ + StringInfo buf = (StringInfo) PG_GETARG_POINTER(0); + PG_RETURN_DATUM(ts_cm_functions->segment_meta_min_max_recv(buf)); +} + +Datum +ts_segment_meta_min_max_out(PG_FUNCTION_ARGS) +{ + Datum meta = PG_GETARG_DATUM(0); + bytea *bytes = ts_cm_functions->segment_meta_min_max_send(meta); + + int raw_len = VARSIZE_ANY_EXHDR(bytes); + const char *raw_data = VARDATA(bytes); + int encoded_len = pg_b64_enc_len(raw_len); + char *encoded = palloc(encoded_len + 1); + encoded_len = pg_b64_encode(raw_data, raw_len, encoded); + encoded[encoded_len] = '\0'; + + PG_RETURN_CSTRING(encoded); +} + +Datum +ts_segment_meta_min_max_in(PG_FUNCTION_ARGS) +{ + const char *input = PG_GETARG_CSTRING(0); + size_t input_len = strlen(input); + int decoded_len; + char *decoded; + StringInfoData data; + + /* Load TSL explicitly in case this is called during parsing */ + ts_license_enable_module_loading(); + + if (input_len > PG_INT32_MAX) + elog(ERROR, "input too long"); + + decoded_len = pg_b64_dec_len(input_len); + decoded = palloc(decoded_len + 1); + decoded_len = pg_b64_decode(input, input_len, decoded); + decoded[decoded_len] = '\0'; + data = (StringInfoData){ + .data = decoded, + .len = decoded_len, + .maxlen = decoded_len, + }; + + PG_RETURN_DATUM(ts_cm_functions->segment_meta_min_max_recv(&data)); +} + +Datum +ts_segment_meta_min_max_get_min(PG_FUNCTION_ARGS) +{ + if (PG_ARGISNULL(0)) + PG_RETURN_NULL(); + PG_RETURN_DATUM(PointerGetDatum( + ts_cm_functions->segment_meta_min_max_get_min(PG_GETARG_DATUM(0), + get_fn_expr_argtype(fcinfo->flinfo, 1)))); +} + +Datum +ts_segment_meta_min_max_get_max(PG_FUNCTION_ARGS) +{ + if (PG_ARGISNULL(0)) + PG_RETURN_NULL(); + PG_RETURN_DATUM(PointerGetDatum( + ts_cm_functions->segment_meta_min_max_get_max(PG_GETARG_DATUM(0), + get_fn_expr_argtype(fcinfo->flinfo, 1)))); +} + +Datum +ts_segment_meta_min_max_has_null(PG_FUNCTION_ARGS) +{ + if (PG_ARGISNULL(0)) + PG_RETURN_BOOL(true); + PG_RETURN_BOOL(ts_cm_functions->segment_meta_min_max_has_null(PG_GETARG_DATUM(0))); +} diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 069d02b35..68b4faedf 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -336,6 +336,41 @@ continuous_agg_drop_chunks_by_chunk_id_default(int32 raw_hypertable_id, Chunk ** error_no_default_fn_community(); } +static bytea * +segment_meta_min_max_send_default(Datum arg1) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + +static Datum +segment_meta_min_max_recv_default(StringInfo buf) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + +static Datum +segment_meta_min_max_get_min_default(Datum meta, Oid type) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + +static Datum +segment_meta_min_max_get_max_default(Datum meta, Oid type) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + +static bool +segment_meta_min_max_has_null_default(Datum meta) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + /* * Define cross-module functions' default values: * If the submodule isn't activated, using one of the cm functions will throw an @@ -377,12 +412,22 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .continuous_agg_drop_chunks_by_chunk_id = continuous_agg_drop_chunks_by_chunk_id_default, .continuous_agg_trigfn = error_no_default_fn_pg_community, .continuous_agg_update_options = continuous_agg_update_options_default, - .compressed_data_decompress_forward = error_no_default_fn_pg_community, - .compressed_data_decompress_reverse = error_no_default_fn_pg_community, + .compressed_data_send = error_no_default_fn_pg_community, .compressed_data_recv = error_no_default_fn_pg_community, .compressed_data_in = error_no_default_fn_pg_community, .compressed_data_out = error_no_default_fn_pg_community, + .process_compress_table = process_compress_table_default, + .compress_chunk = error_no_default_fn_pg_enterprise, + .decompress_chunk = error_no_default_fn_pg_enterprise, + .segment_meta_min_max_send = segment_meta_min_max_send_default, + .segment_meta_min_max_recv = segment_meta_min_max_recv_default, + .segment_meta_min_max_get_min = segment_meta_min_max_get_min_default, + .segment_meta_min_max_get_max = segment_meta_min_max_get_max_default, + .segment_meta_min_max_has_null = segment_meta_min_max_has_null_default, + + .compressed_data_decompress_forward = error_no_default_fn_pg_community, + .compressed_data_decompress_reverse = error_no_default_fn_pg_community, .deltadelta_compressor_append = error_no_default_fn_pg_community, .deltadelta_compressor_finish = error_no_default_fn_pg_community, .gorilla_compressor_append = error_no_default_fn_pg_community, @@ -391,9 +436,6 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .dictionary_compressor_finish = error_no_default_fn_pg_community, .array_compressor_append = error_no_default_fn_pg_community, .array_compressor_finish = error_no_default_fn_pg_community, - .process_compress_table = process_compress_table_default, - .compress_chunk = error_no_default_fn_pg_enterprise, - .decompress_chunk = error_no_default_fn_pg_enterprise, }; TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default; diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 7668a0a32..3a4215943 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -69,6 +69,20 @@ typedef struct CrossModuleFunctions void (*continuous_agg_update_options)(ContinuousAgg *cagg, WithClauseResult *with_clause_options); + PGFunction compressed_data_send; + PGFunction compressed_data_recv; + PGFunction compressed_data_in; + PGFunction compressed_data_out; + bool (*process_compress_table)(AlterTableCmd *cmd, Hypertable *ht, + WithClauseResult *with_clause_options); + PGFunction compress_chunk; + PGFunction decompress_chunk; + bytea *(*segment_meta_min_max_send)(Datum); + Datum (*segment_meta_min_max_recv)(StringInfo buf); + Datum (*segment_meta_min_max_get_min)(Datum, Oid type); + Datum (*segment_meta_min_max_get_max)(Datum, Oid type); + bool (*segment_meta_min_max_has_null)(Datum); + /* The compression functions below are not installed in SQL as part of create extension; * They are installed and tested during testing scripts. They are exposed in cross-module * functions because they may be very useful for debugging customer problems if the sql @@ -76,10 +90,6 @@ typedef struct CrossModuleFunctions */ PGFunction compressed_data_decompress_forward; PGFunction compressed_data_decompress_reverse; - PGFunction compressed_data_send; - PGFunction compressed_data_recv; - PGFunction compressed_data_in; - PGFunction compressed_data_out; PGFunction deltadelta_compressor_append; PGFunction deltadelta_compressor_finish; PGFunction gorilla_compressor_append; @@ -88,10 +98,6 @@ typedef struct CrossModuleFunctions PGFunction dictionary_compressor_finish; PGFunction array_compressor_append; PGFunction array_compressor_finish; - bool (*process_compress_table)(AlterTableCmd *cmd, Hypertable *ht, - WithClauseResult *with_clause_options); - PGFunction compress_chunk; - PGFunction decompress_chunk; } CrossModuleFunctions; extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions; diff --git a/tsl/src/compression/CMakeLists.txt b/tsl/src/compression/CMakeLists.txt index 2a9a3a9ad..efc81224f 100644 --- a/tsl/src/compression/CMakeLists.txt +++ b/tsl/src/compression/CMakeLists.txt @@ -3,8 +3,10 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/compression.c ${CMAKE_CURRENT_SOURCE_DIR}/create.c ${CMAKE_CURRENT_SOURCE_DIR}/compress_utils.c + ${CMAKE_CURRENT_SOURCE_DIR}/datum_serialize.c ${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c ${CMAKE_CURRENT_SOURCE_DIR}/dictionary.c ${CMAKE_CURRENT_SOURCE_DIR}/gorilla.c + ${CMAKE_CURRENT_SOURCE_DIR}/segment_meta.c ) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/compression/datum_serialize.c b/tsl/src/compression/datum_serialize.c new file mode 100644 index 000000000..860930b80 --- /dev/null +++ b/tsl/src/compression/datum_serialize.c @@ -0,0 +1,388 @@ + +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "datum_serialize.h" +#include "compat.h" + +typedef struct DatumSerializer +{ + Oid type_oid; + bool type_by_val; + int16 type_len; + char type_align; + char type_storage; + Oid type_send; + Oid type_out; + + /* lazy load */ + bool send_info_set; + FmgrInfo send_flinfo; + bool use_binary_send; +} DatumSerializer; + +DatumSerializer * +create_datum_serializer(Oid type_oid) +{ + DatumSerializer *res = palloc(sizeof(*res)); + /* we use the syscache and not the type cache here b/c we need the + * send/recv in/out functions that aren't in type cache */ + Form_pg_type type; + HeapTuple tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", type_oid); + type = (Form_pg_type) GETSTRUCT(tup); + + *res = (DatumSerializer){ + .type_oid = type_oid, + .type_by_val = type->typbyval, + .type_len = type->typlen, + .type_align = type->typalign, + .type_storage = type->typstorage, + .type_send = type->typsend, + .type_out = type->typoutput, + }; + + ReleaseSysCache(tup); + return res; +} + +static inline void +load_send_fn(DatumSerializer *ser) +{ + if (ser->send_info_set) + return; + + ser->send_info_set = true; + + ser->use_binary_send = OidIsValid(ser->type_send); + + if (ser->use_binary_send) + fmgr_info(ser->type_send, &ser->send_flinfo); + else + fmgr_info(ser->type_out, &ser->send_flinfo); +} + +#define TYPE_IS_PACKABLE(typlen, typstorage) ((typlen) == -1 && (typstorage) != 'p') + +/* Inspired by datum_compute_size in rangetypes.c */ +Size +datum_get_bytes_size(DatumSerializer *serializer, Size start_offset, Datum val) +{ + Size data_length = start_offset; + + if (serializer->type_len == -1) + { + /* varlena */ + Pointer ptr = DatumGetPointer(val); + + if (VARATT_IS_EXTERNAL(ptr)) + { + /* + * Throw error, because we should never get a toasted datum. + * Caller should have detoasted it. + */ + elog(ERROR, "datum should be detoasted before passed to datum_get_bytes_size"); + } + } + + if (TYPE_IS_PACKABLE(serializer->type_len, serializer->type_storage) && + VARATT_CAN_MAKE_SHORT(DatumGetPointer(val))) + { + /* + * we're anticipating converting to a short varlena header, so adjust + * length and don't count any alignment (the case where the Datum is already + * in short format is handled by att_align_datum) + */ + data_length += VARATT_CONVERTED_SHORT_SIZE(DatumGetPointer(val)); + } + else + { + data_length = + att_align_datum(data_length, serializer->type_align, serializer->type_len, val); + data_length = att_addlength_datum(data_length, serializer->type_len, val); + } + + return data_length; +} + +static void +check_allowed_data_len(Size data_length, Size max_size) +{ + if (max_size < data_length) + elog(ERROR, "trying to serialize more data than was allocated"); +} + +static inline char * +align_and_zero(char *ptr, char type_align, Size *max_size) +{ + char *new_pos = (char *) att_align_nominal(ptr, type_align); + if (new_pos != ptr) + { + Size padding = new_pos - ptr; + check_allowed_data_len(padding, *max_size); + memset(ptr, 0, padding); + *max_size = *max_size - padding; + } + return new_pos; +} + +/* Inspired by datum_write in rangetypes.c. This reduces the max_size by the data length before + * exiting */ +char * +datum_to_bytes_and_advance(DatumSerializer *serializer, char *ptr, Size *max_size, Datum datum) +{ + Size data_length; + + if (serializer->type_by_val) + { + /* pass-by-value */ + ptr = align_and_zero(ptr, serializer->type_align, max_size); + data_length = serializer->type_len; + check_allowed_data_len(data_length, *max_size); + store_att_byval(ptr, datum, data_length); + } + else if (serializer->type_len == -1) + { + /* varlena */ + Pointer val = DatumGetPointer(datum); + + if (VARATT_IS_EXTERNAL(val)) + { + /* + * Throw error, because we should never get a toast datum. + * Caller should have detoasted it. + */ + elog(ERROR, "datum should be detoasted before passed to datum_to_bytes_and_advance"); + data_length = 0; /* keep compiler quiet */ + } + else if (VARATT_IS_SHORT(val)) + { + /* no alignment for short varlenas */ + data_length = VARSIZE_SHORT(val); + check_allowed_data_len(data_length, *max_size); + memcpy(ptr, val, data_length); + } + else if (TYPE_IS_PACKABLE(serializer->type_len, serializer->type_storage) && + VARATT_CAN_MAKE_SHORT(val)) + { + /* convert to short varlena -- no alignment */ + data_length = VARATT_CONVERTED_SHORT_SIZE(val); + check_allowed_data_len(data_length, *max_size); + SET_VARSIZE_SHORT(ptr, data_length); + memcpy(ptr + 1, VARDATA(val), data_length - 1); + } + else + { + /* full 4-byte header varlena */ + ptr = align_and_zero(ptr, serializer->type_align, max_size); + data_length = VARSIZE(val); + check_allowed_data_len(data_length, *max_size); + memcpy(ptr, val, data_length); + } + } + else if (serializer->type_len == -2) + { + /* cstring ... never needs alignment */ + Assert(serializer->type_align == 'c'); + data_length = strlen(DatumGetCString(datum)) + 1; + check_allowed_data_len(data_length, *max_size); + memcpy(ptr, DatumGetPointer(datum), data_length); + } + else + { + /* fixed-length pass-by-reference */ + ptr = align_and_zero(ptr, serializer->type_align, max_size); + Assert(serializer->type_len > 0); + data_length = serializer->type_len; + check_allowed_data_len(data_length, *max_size); + memcpy(ptr, DatumGetPointer(datum), data_length); + } + + ptr += data_length; + *max_size = *max_size - data_length; + + return ptr; +} + +typedef struct DatumDeserializer +{ + bool type_by_val; + int16 type_len; + char type_align; + char type_storage; + + Oid type_recv; + + Oid type_in; + Oid type_io_param; + int32 type_mod; + /* lazy load */ + bool recv_info_set; + FmgrInfo recv_flinfo; + bool use_binary_recv; +} DatumDeserializer; + +DatumDeserializer * +create_datum_deserializer(Oid type_oid) +{ + DatumDeserializer *res = palloc(sizeof(*res)); + /* we use the syscache and not the type cache here b/c we need the + * send/recv in/out functions that aren't in type cache */ + Form_pg_type type; + HeapTuple tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", type_oid); + type = (Form_pg_type) GETSTRUCT(tup); + + *res = (DatumDeserializer){ + .type_by_val = type->typbyval, + .type_len = type->typlen, + .type_align = type->typalign, + .type_storage = type->typstorage, + .type_recv = type->typreceive, + .type_in = type->typinput, + .type_io_param = getTypeIOParam(tup), + .type_mod = type->typtypmod, + }; + + ReleaseSysCache(tup); + return res; +} + +static inline void +load_recv_fn(DatumDeserializer *des, bool use_binary) +{ + if (des->recv_info_set && des->use_binary_recv == use_binary) + return; + + des->recv_info_set = true; + des->use_binary_recv = use_binary; + + if (des->use_binary_recv) + fmgr_info(des->type_recv, &des->recv_flinfo); + else + fmgr_info(des->type_in, &des->recv_flinfo); +} + +/* Loosely based on `range_deserialize` in rangetypes.c */ +Datum +bytes_to_datum_and_advance(DatumDeserializer *deserializer, char **ptr) +{ + Datum res; + + /* att_align_pointer can handle the case where an unaligned short-varlen follows any other + * varlen by detecting padding. padding bytes _must always_ be set to 0, while the first byte of + * a varlen header is _never_ 0. This means that if the next byte is non-zero, it must be the + * start of a short-varlen, otherwise we need to align the pointer. + */ + + *ptr = + (Pointer) att_align_pointer(*ptr, deserializer->type_align, deserializer->type_len, *ptr); + res = fetch_att(*ptr, deserializer->type_by_val, deserializer->type_len); + *ptr = att_addlength_pointer(*ptr, deserializer->type_len, *ptr); + return res; +} + +void +type_append_to_binary_string(Oid type_oid, StringInfo buffer) +{ + Form_pg_type type_tuple; + HeapTuple tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid)); + char *namespace_name; + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", type_oid); + + type_tuple = (Form_pg_type) GETSTRUCT(tup); + + namespace_name = get_namespace_name(type_tuple->typnamespace); + + pq_sendstring(buffer, namespace_name); + pq_sendstring(buffer, NameStr(type_tuple->typname)); + + ReleaseSysCache(tup); +} + +Oid +binary_string_get_type(StringInfo buffer) +{ + const char *element_type_namespace = pq_getmsgstring(buffer); + const char *element_type_name = pq_getmsgstring(buffer); + Oid namespace_oid; + Oid type_oid; + + namespace_oid = LookupExplicitNamespace(element_type_namespace, false); + + type_oid = GetSysCacheOid2(TYPENAMENSP, + PointerGetDatum(element_type_name), + ObjectIdGetDatum(namespace_oid)); + if (!OidIsValid(type_oid)) + elog(ERROR, "could not find type %s.%s", element_type_namespace, element_type_name); + + return type_oid; +} + +void +datum_append_to_binary_string(DatumSerializer *serializer, StringInfo buffer, Datum datum) +{ + load_send_fn(serializer); + pq_sendbyte(buffer, serializer->use_binary_send); + if (serializer->use_binary_send) + { + bytea *output = SendFunctionCall(&serializer->send_flinfo, datum); + pq_sendint32(buffer, VARSIZE_ANY_EXHDR(output)); + pq_sendbytes(buffer, VARDATA(output), VARSIZE_ANY_EXHDR(output)); + } + else + { + char *output = OutputFunctionCall(&serializer->send_flinfo, datum); + pq_sendstring(buffer, output); + } +} + +Datum +binary_string_to_datum(DatumDeserializer *deserializer, StringInfo buffer) +{ + bool use_binary_recv = pq_getmsgbyte(buffer) != 0; + Datum res; + load_recv_fn(deserializer, use_binary_recv); + + if (use_binary_recv) + { + uint32 data_size = pq_getmsgint32(buffer); + const char *bytes = pq_getmsgbytes(buffer, data_size); + StringInfoData d = { + .data = (char *) bytes, + .len = data_size, + .maxlen = data_size, + }; + res = ReceiveFunctionCall(&deserializer->recv_flinfo, + &d, + deserializer->type_io_param, + deserializer->type_mod); + } + else + { + const char *string = pq_getmsgstring(buffer); + res = InputFunctionCall(&deserializer->recv_flinfo, + (char *) string, + deserializer->type_io_param, + deserializer->type_mod); + } + return res; +} diff --git a/tsl/src/compression/datum_serialize.h b/tsl/src/compression/datum_serialize.h new file mode 100644 index 000000000..39a0fb274 --- /dev/null +++ b/tsl/src/compression/datum_serialize.h @@ -0,0 +1,36 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#ifndef TIMESCALEDB_TSL_COMPRESSION_DATUM_SERIALIZE_H +#define TIMESCALEDB_TSL_COMPRESSION_DATUM_SERIALIZE_H + +#include +#include + +/* SERIALIZATION */ +typedef struct DatumSerializer DatumSerializer; +DatumSerializer *create_datum_serializer(Oid type); + +/* serialize to bytes in memory. */ +Size datum_get_bytes_size(DatumSerializer *serializer, Size start_offset, Datum val); +char *datum_to_bytes_and_advance(DatumSerializer *serializer, char *start, Size *max_size, + Datum val); + +/* serialize to a binary string (for send functions) */ +void type_append_to_binary_string(Oid type_oid, StringInfo data); +void datum_append_to_binary_string(DatumSerializer *serializer, StringInfo data, Datum datum); + +/* DESERIALIZATION */ +typedef struct DatumDeserializer DatumDeserializer; +DatumDeserializer *create_datum_deserializer(Oid type); + +/* deserialization from bytes in memory */ +Datum bytes_to_datum_and_advance(DatumDeserializer *deserializer, char **bytes); + +/* deserialization from binary strings (for recv functions) */ +Datum binary_string_to_datum(DatumDeserializer *deserializer, StringInfo data); +Oid binary_string_get_type(StringInfo data); + +#endif diff --git a/tsl/src/compression/segment_meta.c b/tsl/src/compression/segment_meta.c new file mode 100644 index 000000000..53f97f72e --- /dev/null +++ b/tsl/src/compression/segment_meta.c @@ -0,0 +1,258 @@ + +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include +#include +#include + +#include "segment_meta.h" +#include "datum_serialize.h" + +typedef struct SegmentMetaMinMaxBuilder +{ + Oid type_oid; + bool empty; + bool has_null; + + SortSupportData ssup; + bool type_by_val; + int16 type_len; + Datum min; + Datum max; +} SegmentMetaMinMaxBuilder; + +typedef enum SegmentMetaMinMaxVersion +{ + /* Not a real version, if this does get used, it's a bug in the code */ + _INVALID_SEGMENT_MIN_MAX_VERSION = 0, + + SEGMENT_SEGMENT_MIN_MAX_V1, + + /* end of real values */ + _MAX_SEGMENT_MIN_MAX_VERSION = 128, +} SegmentMetaMinMaxVersion; + +typedef enum SegmentMetaMinMaxFlags +{ + /* Has nulls allows us to optimize IS NULL quals */ + HAS_NULLS = (1 << 0), + + /* All Nulls should result in a NULL value for entire SegmentMetaMinMax */ + _MAX_FLAG = (1 << 8), +} SegmentMetaMinMaxFlags; + +/* start must be aligned according to the alignment of the stored type */ +typedef struct SegmentMetaMinMax +{ + char vl_len_[4]; + uint8 version; /* SegmentMetaMinMaxVersion */ + uint8 flags; /* SegmentMetaMinMaxFlags */ + char padding[2]; + Oid type; + /* optional alignment padding for type */ + /*char data[FLEXIBLE_ARRAY_MEMBER]; bound values for two datums with alignment padding in + * between. First datum is min, second is max. Size determined by the datum type or the VARLENA + * header */ +} SegmentMetaMinMax; + +SegmentMetaMinMaxBuilder * +segment_meta_min_max_builder_create(Oid type_oid, Oid collation) +{ + SegmentMetaMinMaxBuilder *builder = palloc(sizeof(*builder)); + TypeCacheEntry *type = lookup_type_cache(type_oid, TYPECACHE_LT_OPR); + + if (!OidIsValid(type->lt_opr)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an less-than operator for type %s", + format_type_be(type_oid)))); + + *builder = (SegmentMetaMinMaxBuilder){ + .type_oid = type_oid, + .empty = true, + .has_null = false, + .type_by_val = type->typbyval, + .type_len = type->typlen, + }; + + builder->ssup.ssup_cxt = CurrentMemoryContext; + builder->ssup.ssup_collation = collation; + builder->ssup.ssup_nulls_first = false; + + PrepareSortSupportFromOrderingOp(type->lt_opr, &builder->ssup); + + return builder; +} + +void +segment_meta_min_max_builder_update_val(SegmentMetaMinMaxBuilder *builder, Datum val) +{ + int cmp; + + if (builder->empty) + { + builder->min = datumCopy(val, builder->type_by_val, builder->type_len); + builder->max = datumCopy(val, builder->type_by_val, builder->type_len); + builder->empty = false; + return; + } + + cmp = ApplySortComparator(builder->min, false, val, false, &builder->ssup); + if (cmp > 0) + builder->min = datumCopy(val, builder->type_by_val, builder->type_len); + + cmp = ApplySortComparator(builder->max, false, val, false, &builder->ssup); + if (cmp < 0) + builder->max = datumCopy(val, builder->type_by_val, builder->type_len); +} + +void +segment_meta_min_max_builder_update_null(SegmentMetaMinMaxBuilder *builder) +{ + builder->has_null = true; +} + +SegmentMetaMinMax * +segment_meta_min_max_builder_finish(SegmentMetaMinMaxBuilder *builder) +{ + SegmentMetaMinMax *res; + uint8 flags = 0; + Size total_size = sizeof(*res); + DatumSerializer *serializer; + char *data; + + if (builder->empty) + return NULL; + + serializer = create_datum_serializer(builder->type_oid); + + if (builder->has_null) + flags |= HAS_NULLS; + + if (builder->type_len == -1) + { + /* detoast if necessary. should never store toast pointers */ + builder->min = PointerGetDatum(PG_DETOAST_DATUM_PACKED(builder->min)); + builder->max = PointerGetDatum(PG_DETOAST_DATUM_PACKED(builder->max)); + } + + total_size = datum_get_bytes_size(serializer, total_size, builder->min); + total_size = datum_get_bytes_size(serializer, total_size, builder->max); + + res = palloc0(total_size); + *res = (SegmentMetaMinMax){ + .version = SEGMENT_SEGMENT_MIN_MAX_V1, + .flags = flags, + .type = builder->type_oid, + }; + + SET_VARSIZE(res, total_size); + + data = (char *) res + sizeof(*res); + total_size -= sizeof(*res); + + data = datum_to_bytes_and_advance(serializer, data, &total_size, builder->min); + data = datum_to_bytes_and_advance(serializer, data, &total_size, builder->max); + + Assert(total_size == 0); + + return res; +} + +static void +segment_meta_min_max_get_deconstruct(SegmentMetaMinMax *meta, DatumDeserializer *deser, Datum *min, + Datum *max) +{ + char *data = (char *) meta + sizeof(*meta); + /* skip the min */ + *min = bytes_to_datum_and_advance(deser, &data); + *max = bytes_to_datum_and_advance(deser, &data); +} + +bytea * +segment_meta_min_max_to_binary_string(SegmentMetaMinMax *meta) +{ + StringInfoData buf; + DatumDeserializer *deser = create_datum_deserializer(meta->type); + DatumSerializer *ser = create_datum_serializer(meta->type); + Datum min, max; + + segment_meta_min_max_get_deconstruct(meta, deser, &min, &max); + pq_begintypsend(&buf); + pq_sendbyte(&buf, meta->version); + pq_sendbyte(&buf, meta->flags); + type_append_to_binary_string(meta->type, &buf); + + datum_append_to_binary_string(ser, &buf, min); + datum_append_to_binary_string(ser, &buf, max); + + return pq_endtypsend(&buf); +} + +SegmentMetaMinMax * +segment_meta_min_max_from_binary_string(StringInfo buf) +{ + uint8 version = pq_getmsgbyte(buf); + + if (version == SEGMENT_SEGMENT_MIN_MAX_V1) + { + uint8 flags = pq_getmsgbyte(buf); + Oid type_oid = binary_string_get_type(buf); + DatumDeserializer *deser = create_datum_deserializer(type_oid); + TypeCacheEntry *type = lookup_type_cache(type_oid, 0); + SegmentMetaMinMaxBuilder builder = (SegmentMetaMinMaxBuilder){ + .type_oid = type_oid, + .empty = false, + .has_null = (flags & HAS_NULLS) != 0, + .type_by_val = type->typbyval, + .type_len = type->typlen, + .min = binary_string_to_datum(deser, buf), + .max = binary_string_to_datum(deser, buf), + }; + + return segment_meta_min_max_builder_finish(&builder); + } + else + elog(ERROR, "Unknown version number for segment meta min max: %d", version); +} + +Datum +tsl_segment_meta_min_max_get_min(Datum meta_datum, Oid type) +{ + SegmentMetaMinMax *meta = (SegmentMetaMinMax *) DatumGetPointer(meta_datum); + DatumDeserializer *deser; + Datum min, max; + + if (type != meta->type) + elog(ERROR, "wrong type requested from segment_meta_min_max"); + + deser = create_datum_deserializer(meta->type); + segment_meta_min_max_get_deconstruct(meta, deser, &min, &max); + return min; +} + +Datum +tsl_segment_meta_min_max_get_max(Datum meta_datum, Oid type) +{ + SegmentMetaMinMax *meta = (SegmentMetaMinMax *) DatumGetPointer(meta_datum); + DatumDeserializer *deser = create_datum_deserializer(meta->type); + Datum min, max; + + if (type != meta->type) + elog(ERROR, "wrong type requested from segment_meta_min_max"); + segment_meta_min_max_get_deconstruct(meta, deser, &min, &max); + return max; +} + +bool +tsl_segment_meta_min_max_has_null(Datum meta_datum) +{ + SegmentMetaMinMax *meta = (SegmentMetaMinMax *) DatumGetPointer(meta_datum); + return (meta->flags & HAS_NULLS) != 0; +} diff --git a/tsl/src/compression/segment_meta.h b/tsl/src/compression/segment_meta.h new file mode 100644 index 000000000..c31458845 --- /dev/null +++ b/tsl/src/compression/segment_meta.h @@ -0,0 +1,39 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#ifndef TIMESCALEDB_TSL_COMPRESSION_SEGMENT_META_H +#define TIMESCALEDB_TSL_COMPRESSION_SEGMENT_META_H + +#include +#include + +typedef struct SegmentMetaMinMax SegmentMetaMinMax; +typedef struct SegmentMetaMinMaxBuilder SegmentMetaMinMaxBuilder; + +SegmentMetaMinMaxBuilder *segment_meta_min_max_builder_create(Oid type, Oid collation); +void segment_meta_min_max_builder_update_val(SegmentMetaMinMaxBuilder *builder, Datum val); +void segment_meta_min_max_builder_update_null(SegmentMetaMinMaxBuilder *builder); +SegmentMetaMinMax *segment_meta_min_max_builder_finish(SegmentMetaMinMaxBuilder *builder); + +Datum tsl_segment_meta_min_max_get_min(Datum meta, Oid type); +Datum tsl_segment_meta_min_max_get_max(Datum meta, Oid type); +bool tsl_segment_meta_min_max_has_null(Datum meta); + +bytea *segment_meta_min_max_to_binary_string(SegmentMetaMinMax *meta); + +SegmentMetaMinMax *segment_meta_min_max_from_binary_string(StringInfo buf); + +static inline bytea * +tsl_segment_meta_min_max_send(Datum arg1) +{ + return segment_meta_min_max_to_binary_string((SegmentMetaMinMax *) DatumGetPointer(arg1)); +} + +static inline Datum +tsl_segment_meta_min_max_recv(StringInfo buf) +{ + return PointerGetDatum(segment_meta_min_max_from_binary_string(buf)); +} +#endif diff --git a/tsl/src/init.c b/tsl/src/init.c index f50132ce4..6cf1121b1 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -35,6 +35,7 @@ #include "hypertable.h" #include "compression/create.h" #include "compression/compress_utils.h" +#include "compression/segment_meta.h" #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; @@ -110,6 +111,11 @@ CrossModuleFunctions tsl_cm_functions = { .process_compress_table = tsl_process_compress_table, .compress_chunk = tsl_compress_chunk, .decompress_chunk = tsl_decompress_chunk, + .segment_meta_min_max_send = tsl_segment_meta_min_max_send, + .segment_meta_min_max_recv = tsl_segment_meta_min_max_recv, + .segment_meta_min_max_get_min = tsl_segment_meta_min_max_get_min, + .segment_meta_min_max_get_max = tsl_segment_meta_min_max_get_max, + .segment_meta_min_max_has_null = tsl_segment_meta_min_max_has_null, }; TS_FUNCTION_INFO_V1(ts_module_init); diff --git a/tsl/test/expected/compression_algos.out b/tsl/test/expected/compression_algos.out index 7614c3363..45ebcfcfc 100644 --- a/tsl/test/expected/compression_algos.out +++ b/tsl/test/expected/compression_algos.out @@ -34,7 +34,7 @@ $$ select (16807 * $1) % 2147483647 $$; create function gen_rand_minstd() returns bigint -language sql as +language sql security definer as $$ update rand_minstd_state set i = rand_minstd_advance(i) returning i $$; diff --git a/tsl/test/expected/compression_hypertable.out b/tsl/test/expected/compression_hypertable.out index 0cb58fa3b..8e43924db 100644 --- a/tsl/test/expected/compression_hypertable.out +++ b/tsl/test/expected/compression_hypertable.out @@ -15,7 +15,7 @@ $$ select (16807 * $1) % 2147483647 $$; create function gen_rand_minstd() returns bigint -language sql as +language sql security definer as $$ update rand_minstd_state set i = rand_minstd_advance(i) returning i $$; diff --git a/tsl/test/expected/compression_segment_meta.out b/tsl/test/expected/compression_segment_meta.out new file mode 100644 index 000000000..05a89e898 --- /dev/null +++ b/tsl/test/expected/compression_segment_meta.out @@ -0,0 +1,275 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE OR REPLACE FUNCTION _timescaledb_internal.tsl_segment_meta_min_max_append(internal, ANYELEMENT) + RETURNS internal + AS :TSL_MODULE_PATHNAME, 'tsl_segment_meta_min_max_append' + LANGUAGE C IMMUTABLE PARALLEL SAFE; +CREATE OR REPLACE FUNCTION _timescaledb_internal.tsl_segment_meta_min_max_finish(internal) + RETURNS _timescaledb_internal.segment_meta_min_max + AS :TSL_MODULE_PATHNAME, 'tsl_segment_meta_min_max_finish' + LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT; +CREATE AGGREGATE _timescaledb_internal.segment_meta_min_max_agg(ANYELEMENT) ( + STYPE = internal, + SFUNC = _timescaledb_internal.tsl_segment_meta_min_max_append, + FINALFUNC = _timescaledb_internal.tsl_segment_meta_min_max_finish +); +\ir include/rand_generator.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license +-------------------------- +-- cheap rand generator -- +-------------------------- +create table rand_minstd_state(i bigint); +create function rand_minstd_advance(bigint) returns bigint +language sql immutable as +$$ + select (16807 * $1) % 2147483647 +$$; +create function gen_rand_minstd() returns bigint +language sql security definer as +$$ + update rand_minstd_state set i = rand_minstd_advance(i) returning i +$$; +-- seed the random num generator +insert into rand_minstd_state values (321); +--use a custom type without send and recv functions to test +--the input/output fallback path. +CREATE TYPE customtype_no_send_recv; +CREATE OR REPLACE FUNCTION customtype_in(cstring) RETURNS customtype_no_send_recv AS +'timestamptz_in' +LANGUAGE internal IMMUTABLE STRICT; +NOTICE: return type customtype_no_send_recv is only a shell +CREATE OR REPLACE FUNCTION customtype_out( customtype_no_send_recv) RETURNS cstring AS +'timestamptz_out' +LANGUAGE internal IMMUTABLE STRICT; +NOTICE: argument type customtype_no_send_recv is only a shell +CREATE TYPE customtype_no_send_recv ( + INPUT = customtype_in, + OUTPUT = customtype_out, + INTERNALLENGTH = 8, + PASSEDBYVALUE, + ALIGNMENT = double, + STORAGE = plain +); +CREATE CAST (customtype_no_send_recv AS bigint) +WITHOUT FUNCTION AS IMPLICIT; +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +CREATE TABLE metric (i int); +insert into metric select i from generate_series(1, 10) i; +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(meta, NULL::int), + _timescaledb_internal.segment_meta_min_max_get_max(meta, NULL::int), + _timescaledb_internal.segment_meta_min_max_has_null(meta) +FROM +( + SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) as meta + FROM metric +) AS meta_gen; + segment_meta_min_max_get_min | segment_meta_min_max_get_max | segment_meta_min_max_has_null +------------------------------+------------------------------+------------------------------- + 1 | 10 | f +(1 row) + +\set TYPE int +\set TABLE metric +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +----NULL tests +--First +truncate metric; +insert into metric select NULLIF(i,1) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--Last +truncate metric; +insert into metric select NULLIF(i,10) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--Middle +truncate metric; +insert into metric select NULLIF(i,5) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--All NULLS should return null object +truncate metric; +insert into metric select NULL from generate_series(1, 10) i; +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) is NULL, + _timescaledb_internal.segment_meta_min_max_agg(i)::text is NULL +FROM metric; + ?column? | ?column? +----------+---------- + t | t +(1 row) + +--accessor functions work on NULLs +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(NULL, NULL::int) IS NULL, + _timescaledb_internal.segment_meta_min_max_get_max(NULL, NULL::int) IS NULL, + _timescaledb_internal.segment_meta_min_max_has_null(NULL); + ?column? | ?column? | segment_meta_min_max_has_null +----------+----------+------------------------------- + t | t | t +(1 row) + +-- +--type tests +-- +--untoasted text +CREATE TABLE base_texts AS SELECT + repeat(item::text, 1) as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set TYPE text +\set TABLE base_texts +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--toasted text +DROP TABLE base_texts; +CREATE TABLE base_texts AS SELECT + repeat(item::text, 100000) as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +--make sure it's toasted +SELECT pg_total_relation_size(reltoastrelid) + FROM pg_class c + WHERE relname = 'base_texts'; + pg_total_relation_size +------------------------ + 24576 +(1 row) + +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--name is a fixed-length pass by reference type +CREATE TABLE base_name AS SELECT + item::name as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set TYPE name +\set TABLE base_name +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--array +CREATE TABLE text_array AS SELECT + array[item::text, 'abab'] as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set TYPE text[] +\set TABLE text_array +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--Points doesn't have an ordering so make sure it errors +CREATE TABLE points AS SELECT + point '(0,1)' as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set ON_ERROR_STOP 0 +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) +FROM points; +ERROR: could not identify an less-than operator for type point +\set ON_ERROR_STOP 1 +--test with a custom type with no send/recv +CREATE TABLE customtype_table AS SELECT + item::text::customtype_no_send_recv as i + FROM + (SELECT sub.item from + (SELECT generate_series('2001-01-01 01:01:01', '2001-01-02 01:01:01', INTERVAL '1 hour') item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set TYPE customtype_no_send_recv +\set TABLE customtype_table +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 941eb76f2..ebbebc84a 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -18,6 +18,7 @@ set(TEST_FILES_DEBUG compression_algos.sql compression_errors.sql compression_hypertable.sql + compression_segment_meta.sql continuous_aggs.sql continuous_aggs_bgw.sql continuous_aggs_materialize.sql diff --git a/tsl/test/sql/compression_segment_meta.sql b/tsl/test/sql/compression_segment_meta.sql new file mode 100644 index 000000000..d8544c839 --- /dev/null +++ b/tsl/test/sql/compression_segment_meta.sql @@ -0,0 +1,190 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license + +\c :TEST_DBNAME :ROLE_SUPERUSER + +CREATE OR REPLACE FUNCTION _timescaledb_internal.tsl_segment_meta_min_max_append(internal, ANYELEMENT) + RETURNS internal + AS :TSL_MODULE_PATHNAME, 'tsl_segment_meta_min_max_append' + LANGUAGE C IMMUTABLE PARALLEL SAFE; + +CREATE OR REPLACE FUNCTION _timescaledb_internal.tsl_segment_meta_min_max_finish(internal) + RETURNS _timescaledb_internal.segment_meta_min_max + AS :TSL_MODULE_PATHNAME, 'tsl_segment_meta_min_max_finish' + LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT; + +CREATE AGGREGATE _timescaledb_internal.segment_meta_min_max_agg(ANYELEMENT) ( + STYPE = internal, + SFUNC = _timescaledb_internal.tsl_segment_meta_min_max_append, + FINALFUNC = _timescaledb_internal.tsl_segment_meta_min_max_finish +); + +\ir include/rand_generator.sql + +--use a custom type without send and recv functions to test +--the input/output fallback path. +CREATE TYPE customtype_no_send_recv; + +CREATE OR REPLACE FUNCTION customtype_in(cstring) RETURNS customtype_no_send_recv AS +'timestamptz_in' +LANGUAGE internal IMMUTABLE STRICT; +CREATE OR REPLACE FUNCTION customtype_out( customtype_no_send_recv) RETURNS cstring AS +'timestamptz_out' +LANGUAGE internal IMMUTABLE STRICT; + +CREATE TYPE customtype_no_send_recv ( + INPUT = customtype_in, + OUTPUT = customtype_out, + INTERNALLENGTH = 8, + PASSEDBYVALUE, + ALIGNMENT = double, + STORAGE = plain +); + +CREATE CAST (customtype_no_send_recv AS bigint) +WITHOUT FUNCTION AS IMPLICIT; + + +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER + +CREATE TABLE metric (i int); +insert into metric select i from generate_series(1, 10) i; + +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(meta, NULL::int), + _timescaledb_internal.segment_meta_min_max_get_max(meta, NULL::int), + _timescaledb_internal.segment_meta_min_max_has_null(meta) +FROM +( + SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) as meta + FROM metric +) AS meta_gen; + +\set TYPE int +\set TABLE metric +\ir include/compression_test_segment_meta.sql + +----NULL tests +--First +truncate metric; +insert into metric select NULLIF(i,1) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql + +--Last +truncate metric; +insert into metric select NULLIF(i,10) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql + +--Middle +truncate metric; +insert into metric select NULLIF(i,5) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql + +--All NULLS should return null object +truncate metric; +insert into metric select NULL from generate_series(1, 10) i; + +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) is NULL, + _timescaledb_internal.segment_meta_min_max_agg(i)::text is NULL +FROM metric; + +--accessor functions work on NULLs +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(NULL, NULL::int) IS NULL, + _timescaledb_internal.segment_meta_min_max_get_max(NULL, NULL::int) IS NULL, + _timescaledb_internal.segment_meta_min_max_has_null(NULL); + + + +-- +--type tests +-- + +--untoasted text +CREATE TABLE base_texts AS SELECT + repeat(item::text, 1) as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set TYPE text +\set TABLE base_texts +\ir include/compression_test_segment_meta.sql + +--toasted text +DROP TABLE base_texts; +CREATE TABLE base_texts AS SELECT + repeat(item::text, 100000) as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +--make sure it's toasted +SELECT pg_total_relation_size(reltoastrelid) + FROM pg_class c + WHERE relname = 'base_texts'; + +\ir include/compression_test_segment_meta.sql + +--name is a fixed-length pass by reference type +CREATE TABLE base_name AS SELECT + item::name as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set TYPE name +\set TABLE base_name +\ir include/compression_test_segment_meta.sql + + +--array + +CREATE TABLE text_array AS SELECT + array[item::text, 'abab'] as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set TYPE text[] +\set TABLE text_array +\ir include/compression_test_segment_meta.sql + +--Points doesn't have an ordering so make sure it errors +CREATE TABLE points AS SELECT + point '(0,1)' as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set ON_ERROR_STOP 0 +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) +FROM points; +\set ON_ERROR_STOP 1 + +--test with a custom type with no send/recv + +CREATE TABLE customtype_table AS SELECT + item::text::customtype_no_send_recv as i + FROM + (SELECT sub.item from + (SELECT generate_series('2001-01-01 01:01:01', '2001-01-02 01:01:01', INTERVAL '1 hour') item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set TYPE customtype_no_send_recv +\set TABLE customtype_table +\ir include/compression_test_segment_meta.sql diff --git a/tsl/test/sql/include/compression_test_segment_meta.sql b/tsl/test/sql/include/compression_test_segment_meta.sql new file mode 100644 index 000000000..f507331db --- /dev/null +++ b/tsl/test/sql/include/compression_test_segment_meta.sql @@ -0,0 +1,28 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\set ECHO errors + +SELECT 'NULL::'||:'TYPE' as "NULLTYPE" \gset + +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i)::text as "META_TEXT", + min(i) as "TRUE_MIN", + max(i) as "TRUE_MAX", + (count(*)-count(i)) > 0 as "TRUE_HAS_NULL" +FROM :"TABLE" \gset + +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(meta, :NULLTYPE) = :'TRUE_MIN' as min_correct, + _timescaledb_internal.segment_meta_min_max_get_max(meta, :NULLTYPE) = :'TRUE_MAX' as max_correct, + _timescaledb_internal.segment_meta_min_max_has_null(meta) = :'TRUE_HAS_NULL' as has_null_correct +FROM +( + SELECT + :'META_TEXT'::_timescaledb_internal.segment_meta_min_max as meta +) AS meta_gen; + + + +\set ECHO all diff --git a/tsl/test/sql/include/rand_generator.sql b/tsl/test/sql/include/rand_generator.sql index 35d39653d..482924ac2 100644 --- a/tsl/test/sql/include/rand_generator.sql +++ b/tsl/test/sql/include/rand_generator.sql @@ -14,7 +14,7 @@ $$ $$; create function gen_rand_minstd() returns bigint -language sql as +language sql security definer as $$ update rand_minstd_state set i = rand_minstd_advance(i) returning i $$; diff --git a/tsl/test/src/test_compression.c b/tsl/test/src/test_compression.c index a36a6dbb6..e95c1846a 100644 --- a/tsl/test/src/test_compression.c +++ b/tsl/test/src/test_compression.c @@ -28,6 +28,7 @@ #include "compression/gorilla.h" #include "compression/deltadelta.h" #include "compression/utils.h" +#include "compression/segment_meta.h" #define VEC_PREFIX compression_info #define VEC_ELEMENT_TYPE Form_hypertable_compression @@ -543,3 +544,53 @@ ts_decompress_table(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +TS_FUNCTION_INFO_V1(tsl_segment_meta_min_max_append); + +Datum +tsl_segment_meta_min_max_append(PG_FUNCTION_ARGS) +{ + SegmentMetaMinMaxBuilder *builder = + (SegmentMetaMinMaxBuilder *) (PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0)); + MemoryContext agg_context; + MemoryContext old_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + { + /* cannot be called directly because of internal-type argument */ + elog(ERROR, "tsl_segment_meta_min_max_append called in non-aggregate context"); + } + + old_context = MemoryContextSwitchTo(agg_context); + + if (builder == NULL) + { + Oid type_to_compress = get_fn_expr_argtype(fcinfo->flinfo, 1); + builder = segment_meta_min_max_builder_create(type_to_compress, fcinfo->fncollation); + } + if (PG_ARGISNULL(1)) + segment_meta_min_max_builder_update_null(builder); + else + segment_meta_min_max_builder_update_val(builder, PG_GETARG_DATUM(1)); + + MemoryContextSwitchTo(old_context); + PG_RETURN_POINTER(builder); +} + +TS_FUNCTION_INFO_V1(tsl_segment_meta_min_max_finish); +Datum +tsl_segment_meta_min_max_finish(PG_FUNCTION_ARGS) +{ + SegmentMetaMinMaxBuilder *builder = + (SegmentMetaMinMaxBuilder *) (PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0)); + SegmentMetaMinMax *res; + + if (builder == NULL) + PG_RETURN_NULL(); + + res = segment_meta_min_max_builder_finish(builder); + if (res == NULL) + PG_RETURN_NULL(); + + PG_RETURN_POINTER(res); +}