diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 3795f7e3c..b4e3269f5 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -21,6 +21,7 @@ set(IMMUTABLE_API_SOURCE_FILES set(SOURCE_FILES hypertable.sql chunk.sql + compression.sql ddl_internal.sql edition.sql util_time.sql diff --git a/sql/compression.sql b/sql/compression.sql new file mode 100644 index 000000000..5d7a07a25 --- /dev/null +++ b/sql/compression.sql @@ -0,0 +1,18 @@ +-- This file and its contents are licensed under the Apache License 2.0. +-- Please see the included NOTICE for copyright information and +-- LICENSE-APACHE for a copy of the license. + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_get_min(_timescaledb_internal.segment_meta_min_max, ANYELEMENT) + RETURNS ANYELEMENT + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_get_min' + LANGUAGE C IMMUTABLE; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_get_max(_timescaledb_internal.segment_meta_min_max, ANYELEMENT) + RETURNS ANYELEMENT + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_get_max' + LANGUAGE C IMMUTABLE; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_has_null(_timescaledb_internal.segment_meta_min_max) + RETURNS boolean + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_has_null' + LANGUAGE C IMMUTABLE; diff --git a/sql/pre_install/types.sql b/sql/pre_install/types.sql index 6a053455b..ccde98aa8 100644 --- a/sql/pre_install/types.sql +++ b/sql/pre_install/types.sql @@ -57,3 +57,40 @@ CREATE TYPE _timescaledb_internal.compressed_data ( RECEIVE = _timescaledb_internal.compressed_data_recv, SEND = _timescaledb_internal.compressed_data_send ); + +-- +-- _timescaledb_internal.segment_meta_min_max keeps the min/max range of compressed data +-- + +CREATE TYPE _timescaledb_internal.segment_meta_min_max; + +--the textual input/output is simply base64 encoding of the binary representation +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_in(CSTRING) + RETURNS _timescaledb_internal.segment_meta_min_max + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_in' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_out(_timescaledb_internal.segment_meta_min_max) + RETURNS CSTRING + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_out' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_send(_timescaledb_internal.segment_meta_min_max) + RETURNS BYTEA + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_send' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_recv(internal) + RETURNS _timescaledb_internal.segment_meta_min_max + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_recv' + LANGUAGE C IMMUTABLE STRICT; + +CREATE TYPE _timescaledb_internal.segment_meta_min_max ( + INTERNALLENGTH = VARIABLE, + STORAGE = EXTERNAL, --move to toast, don't compress + ALIGNMENT = DOUBLE, --needed for alignment to work with arbitrary datums + INPUT = _timescaledb_internal.segment_meta_min_max_in, + OUTPUT = _timescaledb_internal.segment_meta_min_max_out, + RECEIVE = _timescaledb_internal.segment_meta_min_max_recv, + SEND = _timescaledb_internal.segment_meta_min_max_send +); diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index 8c4583444..c5f095eef 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -211,5 +211,42 @@ insert into _timescaledb_catalog.compression_algorithm values ( 2, 1, 'COMPRESSION_ALGORITHM_DICTIONARY', 'dictionary'), ( 3, 1, 'COMPRESSION_ALGORITHM_GORILLA', 'gorilla'), ( 4, 1, 'COMPRESSION_ALGORITHM_DELTADELTA', 'deltadelta') -on conflict(id) do update set (version, name, description) +on conflict(id) do update set (version, name, description) = (excluded.version, excluded.name, excluded.description); + +-- +-- _timescaledb_internal.segment_meta_min_max keeps the min/max range of compressed data +-- + +CREATE TYPE _timescaledb_internal.segment_meta_min_max; + +--the textual input/output is simply base64 encoding of the binary representation +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_in(CSTRING) + RETURNS _timescaledb_internal.segment_meta_min_max + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_in' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_out(_timescaledb_internal.segment_meta_min_max) + RETURNS CSTRING + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_out' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_send(_timescaledb_internal.segment_meta_min_max) + RETURNS BYTEA + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_send' + LANGUAGE C IMMUTABLE STRICT; + +CREATE FUNCTION _timescaledb_internal.segment_meta_min_max_recv(internal) + RETURNS _timescaledb_internal.segment_meta_min_max + AS '@MODULE_PATHNAME@', 'ts_segment_meta_min_max_recv' + LANGUAGE C IMMUTABLE STRICT; + +CREATE TYPE _timescaledb_internal.segment_meta_min_max ( + INTERNALLENGTH = VARIABLE, + STORAGE = EXTERNAL, --move to toast, don't compress + ALIGNMENT = DOUBLE, --needed for alignment to work with arbitrary datums + INPUT = _timescaledb_internal.segment_meta_min_max_in, + OUTPUT = _timescaledb_internal.segment_meta_min_max_out, + RECEIVE = _timescaledb_internal.segment_meta_min_max_recv, + SEND = _timescaledb_internal.segment_meta_min_max_send +); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fc9e33758..ebef9e1c6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -18,6 +18,7 @@ set(SOURCES cross_module_fn.c copy.c compression_with_clause.c + compression_segment_meta_min_max.c dimension.c dimension_slice.c dimension_vector.c diff --git a/src/compression_segment_meta_min_max.c b/src/compression_segment_meta_min_max.c new file mode 100644 index 000000000..ecc296237 --- /dev/null +++ b/src/compression_segment_meta_min_max.c @@ -0,0 +1,107 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ + +#include + +#include "cross_module_fn.h" +#include "compat.h" +#include "base64_compat.h" +#include "license_guc.h" + +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_send); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_recv); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_out); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_in); + +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_get_min); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_get_max); +TS_FUNCTION_INFO_V1(ts_segment_meta_min_max_has_null); + +Datum +ts_segment_meta_min_max_send(PG_FUNCTION_ARGS) +{ + Datum meta = PG_GETARG_DATUM(0); + PG_RETURN_DATUM(PointerGetDatum(ts_cm_functions->segment_meta_min_max_send(meta))); +} + +Datum +ts_segment_meta_min_max_recv(PG_FUNCTION_ARGS) +{ + StringInfo buf = (StringInfo) PG_GETARG_POINTER(0); + PG_RETURN_DATUM(ts_cm_functions->segment_meta_min_max_recv(buf)); +} + +Datum +ts_segment_meta_min_max_out(PG_FUNCTION_ARGS) +{ + Datum meta = PG_GETARG_DATUM(0); + bytea *bytes = ts_cm_functions->segment_meta_min_max_send(meta); + + int raw_len = VARSIZE_ANY_EXHDR(bytes); + const char *raw_data = VARDATA(bytes); + int encoded_len = pg_b64_enc_len(raw_len); + char *encoded = palloc(encoded_len + 1); + encoded_len = pg_b64_encode(raw_data, raw_len, encoded); + encoded[encoded_len] = '\0'; + + PG_RETURN_CSTRING(encoded); +} + +Datum +ts_segment_meta_min_max_in(PG_FUNCTION_ARGS) +{ + const char *input = PG_GETARG_CSTRING(0); + size_t input_len = strlen(input); + int decoded_len; + char *decoded; + StringInfoData data; + + /* Load TSL explicitly in case this is called during parsing */ + ts_license_enable_module_loading(); + + if (input_len > PG_INT32_MAX) + elog(ERROR, "input too long"); + + decoded_len = pg_b64_dec_len(input_len); + decoded = palloc(decoded_len + 1); + decoded_len = pg_b64_decode(input, input_len, decoded); + decoded[decoded_len] = '\0'; + data = (StringInfoData){ + .data = decoded, + .len = decoded_len, + .maxlen = decoded_len, + }; + + PG_RETURN_DATUM(ts_cm_functions->segment_meta_min_max_recv(&data)); +} + +Datum +ts_segment_meta_min_max_get_min(PG_FUNCTION_ARGS) +{ + if (PG_ARGISNULL(0)) + PG_RETURN_NULL(); + PG_RETURN_DATUM(PointerGetDatum( + ts_cm_functions->segment_meta_min_max_get_min(PG_GETARG_DATUM(0), + get_fn_expr_argtype(fcinfo->flinfo, 1)))); +} + +Datum +ts_segment_meta_min_max_get_max(PG_FUNCTION_ARGS) +{ + if (PG_ARGISNULL(0)) + PG_RETURN_NULL(); + PG_RETURN_DATUM(PointerGetDatum( + ts_cm_functions->segment_meta_min_max_get_max(PG_GETARG_DATUM(0), + get_fn_expr_argtype(fcinfo->flinfo, 1)))); +} + +Datum +ts_segment_meta_min_max_has_null(PG_FUNCTION_ARGS) +{ + if (PG_ARGISNULL(0)) + PG_RETURN_BOOL(true); + PG_RETURN_BOOL(ts_cm_functions->segment_meta_min_max_has_null(PG_GETARG_DATUM(0))); +} diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 069d02b35..68b4faedf 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -336,6 +336,41 @@ continuous_agg_drop_chunks_by_chunk_id_default(int32 raw_hypertable_id, Chunk ** error_no_default_fn_community(); } +static bytea * +segment_meta_min_max_send_default(Datum arg1) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + +static Datum +segment_meta_min_max_recv_default(StringInfo buf) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + +static Datum +segment_meta_min_max_get_min_default(Datum meta, Oid type) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + +static Datum +segment_meta_min_max_get_max_default(Datum meta, Oid type) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + +static bool +segment_meta_min_max_has_null_default(Datum meta) +{ + error_no_default_fn_enterprise(); + pg_unreachable(); +} + /* * Define cross-module functions' default values: * If the submodule isn't activated, using one of the cm functions will throw an @@ -377,12 +412,22 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .continuous_agg_drop_chunks_by_chunk_id = continuous_agg_drop_chunks_by_chunk_id_default, .continuous_agg_trigfn = error_no_default_fn_pg_community, .continuous_agg_update_options = continuous_agg_update_options_default, - .compressed_data_decompress_forward = error_no_default_fn_pg_community, - .compressed_data_decompress_reverse = error_no_default_fn_pg_community, + .compressed_data_send = error_no_default_fn_pg_community, .compressed_data_recv = error_no_default_fn_pg_community, .compressed_data_in = error_no_default_fn_pg_community, .compressed_data_out = error_no_default_fn_pg_community, + .process_compress_table = process_compress_table_default, + .compress_chunk = error_no_default_fn_pg_enterprise, + .decompress_chunk = error_no_default_fn_pg_enterprise, + .segment_meta_min_max_send = segment_meta_min_max_send_default, + .segment_meta_min_max_recv = segment_meta_min_max_recv_default, + .segment_meta_min_max_get_min = segment_meta_min_max_get_min_default, + .segment_meta_min_max_get_max = segment_meta_min_max_get_max_default, + .segment_meta_min_max_has_null = segment_meta_min_max_has_null_default, + + .compressed_data_decompress_forward = error_no_default_fn_pg_community, + .compressed_data_decompress_reverse = error_no_default_fn_pg_community, .deltadelta_compressor_append = error_no_default_fn_pg_community, .deltadelta_compressor_finish = error_no_default_fn_pg_community, .gorilla_compressor_append = error_no_default_fn_pg_community, @@ -391,9 +436,6 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .dictionary_compressor_finish = error_no_default_fn_pg_community, .array_compressor_append = error_no_default_fn_pg_community, .array_compressor_finish = error_no_default_fn_pg_community, - .process_compress_table = process_compress_table_default, - .compress_chunk = error_no_default_fn_pg_enterprise, - .decompress_chunk = error_no_default_fn_pg_enterprise, }; TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default; diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 7668a0a32..3a4215943 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -69,6 +69,20 @@ typedef struct CrossModuleFunctions void (*continuous_agg_update_options)(ContinuousAgg *cagg, WithClauseResult *with_clause_options); + PGFunction compressed_data_send; + PGFunction compressed_data_recv; + PGFunction compressed_data_in; + PGFunction compressed_data_out; + bool (*process_compress_table)(AlterTableCmd *cmd, Hypertable *ht, + WithClauseResult *with_clause_options); + PGFunction compress_chunk; + PGFunction decompress_chunk; + bytea *(*segment_meta_min_max_send)(Datum); + Datum (*segment_meta_min_max_recv)(StringInfo buf); + Datum (*segment_meta_min_max_get_min)(Datum, Oid type); + Datum (*segment_meta_min_max_get_max)(Datum, Oid type); + bool (*segment_meta_min_max_has_null)(Datum); + /* The compression functions below are not installed in SQL as part of create extension; * They are installed and tested during testing scripts. They are exposed in cross-module * functions because they may be very useful for debugging customer problems if the sql @@ -76,10 +90,6 @@ typedef struct CrossModuleFunctions */ PGFunction compressed_data_decompress_forward; PGFunction compressed_data_decompress_reverse; - PGFunction compressed_data_send; - PGFunction compressed_data_recv; - PGFunction compressed_data_in; - PGFunction compressed_data_out; PGFunction deltadelta_compressor_append; PGFunction deltadelta_compressor_finish; PGFunction gorilla_compressor_append; @@ -88,10 +98,6 @@ typedef struct CrossModuleFunctions PGFunction dictionary_compressor_finish; PGFunction array_compressor_append; PGFunction array_compressor_finish; - bool (*process_compress_table)(AlterTableCmd *cmd, Hypertable *ht, - WithClauseResult *with_clause_options); - PGFunction compress_chunk; - PGFunction decompress_chunk; } CrossModuleFunctions; extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions; diff --git a/tsl/src/compression/CMakeLists.txt b/tsl/src/compression/CMakeLists.txt index 2a9a3a9ad..efc81224f 100644 --- a/tsl/src/compression/CMakeLists.txt +++ b/tsl/src/compression/CMakeLists.txt @@ -3,8 +3,10 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/compression.c ${CMAKE_CURRENT_SOURCE_DIR}/create.c ${CMAKE_CURRENT_SOURCE_DIR}/compress_utils.c + ${CMAKE_CURRENT_SOURCE_DIR}/datum_serialize.c ${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c ${CMAKE_CURRENT_SOURCE_DIR}/dictionary.c ${CMAKE_CURRENT_SOURCE_DIR}/gorilla.c + ${CMAKE_CURRENT_SOURCE_DIR}/segment_meta.c ) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/compression/datum_serialize.c b/tsl/src/compression/datum_serialize.c new file mode 100644 index 000000000..860930b80 --- /dev/null +++ b/tsl/src/compression/datum_serialize.c @@ -0,0 +1,388 @@ + +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "datum_serialize.h" +#include "compat.h" + +typedef struct DatumSerializer +{ + Oid type_oid; + bool type_by_val; + int16 type_len; + char type_align; + char type_storage; + Oid type_send; + Oid type_out; + + /* lazy load */ + bool send_info_set; + FmgrInfo send_flinfo; + bool use_binary_send; +} DatumSerializer; + +DatumSerializer * +create_datum_serializer(Oid type_oid) +{ + DatumSerializer *res = palloc(sizeof(*res)); + /* we use the syscache and not the type cache here b/c we need the + * send/recv in/out functions that aren't in type cache */ + Form_pg_type type; + HeapTuple tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", type_oid); + type = (Form_pg_type) GETSTRUCT(tup); + + *res = (DatumSerializer){ + .type_oid = type_oid, + .type_by_val = type->typbyval, + .type_len = type->typlen, + .type_align = type->typalign, + .type_storage = type->typstorage, + .type_send = type->typsend, + .type_out = type->typoutput, + }; + + ReleaseSysCache(tup); + return res; +} + +static inline void +load_send_fn(DatumSerializer *ser) +{ + if (ser->send_info_set) + return; + + ser->send_info_set = true; + + ser->use_binary_send = OidIsValid(ser->type_send); + + if (ser->use_binary_send) + fmgr_info(ser->type_send, &ser->send_flinfo); + else + fmgr_info(ser->type_out, &ser->send_flinfo); +} + +#define TYPE_IS_PACKABLE(typlen, typstorage) ((typlen) == -1 && (typstorage) != 'p') + +/* Inspired by datum_compute_size in rangetypes.c */ +Size +datum_get_bytes_size(DatumSerializer *serializer, Size start_offset, Datum val) +{ + Size data_length = start_offset; + + if (serializer->type_len == -1) + { + /* varlena */ + Pointer ptr = DatumGetPointer(val); + + if (VARATT_IS_EXTERNAL(ptr)) + { + /* + * Throw error, because we should never get a toasted datum. + * Caller should have detoasted it. + */ + elog(ERROR, "datum should be detoasted before passed to datum_get_bytes_size"); + } + } + + if (TYPE_IS_PACKABLE(serializer->type_len, serializer->type_storage) && + VARATT_CAN_MAKE_SHORT(DatumGetPointer(val))) + { + /* + * we're anticipating converting to a short varlena header, so adjust + * length and don't count any alignment (the case where the Datum is already + * in short format is handled by att_align_datum) + */ + data_length += VARATT_CONVERTED_SHORT_SIZE(DatumGetPointer(val)); + } + else + { + data_length = + att_align_datum(data_length, serializer->type_align, serializer->type_len, val); + data_length = att_addlength_datum(data_length, serializer->type_len, val); + } + + return data_length; +} + +static void +check_allowed_data_len(Size data_length, Size max_size) +{ + if (max_size < data_length) + elog(ERROR, "trying to serialize more data than was allocated"); +} + +static inline char * +align_and_zero(char *ptr, char type_align, Size *max_size) +{ + char *new_pos = (char *) att_align_nominal(ptr, type_align); + if (new_pos != ptr) + { + Size padding = new_pos - ptr; + check_allowed_data_len(padding, *max_size); + memset(ptr, 0, padding); + *max_size = *max_size - padding; + } + return new_pos; +} + +/* Inspired by datum_write in rangetypes.c. This reduces the max_size by the data length before + * exiting */ +char * +datum_to_bytes_and_advance(DatumSerializer *serializer, char *ptr, Size *max_size, Datum datum) +{ + Size data_length; + + if (serializer->type_by_val) + { + /* pass-by-value */ + ptr = align_and_zero(ptr, serializer->type_align, max_size); + data_length = serializer->type_len; + check_allowed_data_len(data_length, *max_size); + store_att_byval(ptr, datum, data_length); + } + else if (serializer->type_len == -1) + { + /* varlena */ + Pointer val = DatumGetPointer(datum); + + if (VARATT_IS_EXTERNAL(val)) + { + /* + * Throw error, because we should never get a toast datum. + * Caller should have detoasted it. + */ + elog(ERROR, "datum should be detoasted before passed to datum_to_bytes_and_advance"); + data_length = 0; /* keep compiler quiet */ + } + else if (VARATT_IS_SHORT(val)) + { + /* no alignment for short varlenas */ + data_length = VARSIZE_SHORT(val); + check_allowed_data_len(data_length, *max_size); + memcpy(ptr, val, data_length); + } + else if (TYPE_IS_PACKABLE(serializer->type_len, serializer->type_storage) && + VARATT_CAN_MAKE_SHORT(val)) + { + /* convert to short varlena -- no alignment */ + data_length = VARATT_CONVERTED_SHORT_SIZE(val); + check_allowed_data_len(data_length, *max_size); + SET_VARSIZE_SHORT(ptr, data_length); + memcpy(ptr + 1, VARDATA(val), data_length - 1); + } + else + { + /* full 4-byte header varlena */ + ptr = align_and_zero(ptr, serializer->type_align, max_size); + data_length = VARSIZE(val); + check_allowed_data_len(data_length, *max_size); + memcpy(ptr, val, data_length); + } + } + else if (serializer->type_len == -2) + { + /* cstring ... never needs alignment */ + Assert(serializer->type_align == 'c'); + data_length = strlen(DatumGetCString(datum)) + 1; + check_allowed_data_len(data_length, *max_size); + memcpy(ptr, DatumGetPointer(datum), data_length); + } + else + { + /* fixed-length pass-by-reference */ + ptr = align_and_zero(ptr, serializer->type_align, max_size); + Assert(serializer->type_len > 0); + data_length = serializer->type_len; + check_allowed_data_len(data_length, *max_size); + memcpy(ptr, DatumGetPointer(datum), data_length); + } + + ptr += data_length; + *max_size = *max_size - data_length; + + return ptr; +} + +typedef struct DatumDeserializer +{ + bool type_by_val; + int16 type_len; + char type_align; + char type_storage; + + Oid type_recv; + + Oid type_in; + Oid type_io_param; + int32 type_mod; + /* lazy load */ + bool recv_info_set; + FmgrInfo recv_flinfo; + bool use_binary_recv; +} DatumDeserializer; + +DatumDeserializer * +create_datum_deserializer(Oid type_oid) +{ + DatumDeserializer *res = palloc(sizeof(*res)); + /* we use the syscache and not the type cache here b/c we need the + * send/recv in/out functions that aren't in type cache */ + Form_pg_type type; + HeapTuple tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", type_oid); + type = (Form_pg_type) GETSTRUCT(tup); + + *res = (DatumDeserializer){ + .type_by_val = type->typbyval, + .type_len = type->typlen, + .type_align = type->typalign, + .type_storage = type->typstorage, + .type_recv = type->typreceive, + .type_in = type->typinput, + .type_io_param = getTypeIOParam(tup), + .type_mod = type->typtypmod, + }; + + ReleaseSysCache(tup); + return res; +} + +static inline void +load_recv_fn(DatumDeserializer *des, bool use_binary) +{ + if (des->recv_info_set && des->use_binary_recv == use_binary) + return; + + des->recv_info_set = true; + des->use_binary_recv = use_binary; + + if (des->use_binary_recv) + fmgr_info(des->type_recv, &des->recv_flinfo); + else + fmgr_info(des->type_in, &des->recv_flinfo); +} + +/* Loosely based on `range_deserialize` in rangetypes.c */ +Datum +bytes_to_datum_and_advance(DatumDeserializer *deserializer, char **ptr) +{ + Datum res; + + /* att_align_pointer can handle the case where an unaligned short-varlen follows any other + * varlen by detecting padding. padding bytes _must always_ be set to 0, while the first byte of + * a varlen header is _never_ 0. This means that if the next byte is non-zero, it must be the + * start of a short-varlen, otherwise we need to align the pointer. + */ + + *ptr = + (Pointer) att_align_pointer(*ptr, deserializer->type_align, deserializer->type_len, *ptr); + res = fetch_att(*ptr, deserializer->type_by_val, deserializer->type_len); + *ptr = att_addlength_pointer(*ptr, deserializer->type_len, *ptr); + return res; +} + +void +type_append_to_binary_string(Oid type_oid, StringInfo buffer) +{ + Form_pg_type type_tuple; + HeapTuple tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid)); + char *namespace_name; + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", type_oid); + + type_tuple = (Form_pg_type) GETSTRUCT(tup); + + namespace_name = get_namespace_name(type_tuple->typnamespace); + + pq_sendstring(buffer, namespace_name); + pq_sendstring(buffer, NameStr(type_tuple->typname)); + + ReleaseSysCache(tup); +} + +Oid +binary_string_get_type(StringInfo buffer) +{ + const char *element_type_namespace = pq_getmsgstring(buffer); + const char *element_type_name = pq_getmsgstring(buffer); + Oid namespace_oid; + Oid type_oid; + + namespace_oid = LookupExplicitNamespace(element_type_namespace, false); + + type_oid = GetSysCacheOid2(TYPENAMENSP, + PointerGetDatum(element_type_name), + ObjectIdGetDatum(namespace_oid)); + if (!OidIsValid(type_oid)) + elog(ERROR, "could not find type %s.%s", element_type_namespace, element_type_name); + + return type_oid; +} + +void +datum_append_to_binary_string(DatumSerializer *serializer, StringInfo buffer, Datum datum) +{ + load_send_fn(serializer); + pq_sendbyte(buffer, serializer->use_binary_send); + if (serializer->use_binary_send) + { + bytea *output = SendFunctionCall(&serializer->send_flinfo, datum); + pq_sendint32(buffer, VARSIZE_ANY_EXHDR(output)); + pq_sendbytes(buffer, VARDATA(output), VARSIZE_ANY_EXHDR(output)); + } + else + { + char *output = OutputFunctionCall(&serializer->send_flinfo, datum); + pq_sendstring(buffer, output); + } +} + +Datum +binary_string_to_datum(DatumDeserializer *deserializer, StringInfo buffer) +{ + bool use_binary_recv = pq_getmsgbyte(buffer) != 0; + Datum res; + load_recv_fn(deserializer, use_binary_recv); + + if (use_binary_recv) + { + uint32 data_size = pq_getmsgint32(buffer); + const char *bytes = pq_getmsgbytes(buffer, data_size); + StringInfoData d = { + .data = (char *) bytes, + .len = data_size, + .maxlen = data_size, + }; + res = ReceiveFunctionCall(&deserializer->recv_flinfo, + &d, + deserializer->type_io_param, + deserializer->type_mod); + } + else + { + const char *string = pq_getmsgstring(buffer); + res = InputFunctionCall(&deserializer->recv_flinfo, + (char *) string, + deserializer->type_io_param, + deserializer->type_mod); + } + return res; +} diff --git a/tsl/src/compression/datum_serialize.h b/tsl/src/compression/datum_serialize.h new file mode 100644 index 000000000..39a0fb274 --- /dev/null +++ b/tsl/src/compression/datum_serialize.h @@ -0,0 +1,36 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#ifndef TIMESCALEDB_TSL_COMPRESSION_DATUM_SERIALIZE_H +#define TIMESCALEDB_TSL_COMPRESSION_DATUM_SERIALIZE_H + +#include +#include + +/* SERIALIZATION */ +typedef struct DatumSerializer DatumSerializer; +DatumSerializer *create_datum_serializer(Oid type); + +/* serialize to bytes in memory. */ +Size datum_get_bytes_size(DatumSerializer *serializer, Size start_offset, Datum val); +char *datum_to_bytes_and_advance(DatumSerializer *serializer, char *start, Size *max_size, + Datum val); + +/* serialize to a binary string (for send functions) */ +void type_append_to_binary_string(Oid type_oid, StringInfo data); +void datum_append_to_binary_string(DatumSerializer *serializer, StringInfo data, Datum datum); + +/* DESERIALIZATION */ +typedef struct DatumDeserializer DatumDeserializer; +DatumDeserializer *create_datum_deserializer(Oid type); + +/* deserialization from bytes in memory */ +Datum bytes_to_datum_and_advance(DatumDeserializer *deserializer, char **bytes); + +/* deserialization from binary strings (for recv functions) */ +Datum binary_string_to_datum(DatumDeserializer *deserializer, StringInfo data); +Oid binary_string_get_type(StringInfo data); + +#endif diff --git a/tsl/src/compression/segment_meta.c b/tsl/src/compression/segment_meta.c new file mode 100644 index 000000000..53f97f72e --- /dev/null +++ b/tsl/src/compression/segment_meta.c @@ -0,0 +1,258 @@ + +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include +#include +#include + +#include "segment_meta.h" +#include "datum_serialize.h" + +typedef struct SegmentMetaMinMaxBuilder +{ + Oid type_oid; + bool empty; + bool has_null; + + SortSupportData ssup; + bool type_by_val; + int16 type_len; + Datum min; + Datum max; +} SegmentMetaMinMaxBuilder; + +typedef enum SegmentMetaMinMaxVersion +{ + /* Not a real version, if this does get used, it's a bug in the code */ + _INVALID_SEGMENT_MIN_MAX_VERSION = 0, + + SEGMENT_SEGMENT_MIN_MAX_V1, + + /* end of real values */ + _MAX_SEGMENT_MIN_MAX_VERSION = 128, +} SegmentMetaMinMaxVersion; + +typedef enum SegmentMetaMinMaxFlags +{ + /* Has nulls allows us to optimize IS NULL quals */ + HAS_NULLS = (1 << 0), + + /* All Nulls should result in a NULL value for entire SegmentMetaMinMax */ + _MAX_FLAG = (1 << 8), +} SegmentMetaMinMaxFlags; + +/* start must be aligned according to the alignment of the stored type */ +typedef struct SegmentMetaMinMax +{ + char vl_len_[4]; + uint8 version; /* SegmentMetaMinMaxVersion */ + uint8 flags; /* SegmentMetaMinMaxFlags */ + char padding[2]; + Oid type; + /* optional alignment padding for type */ + /*char data[FLEXIBLE_ARRAY_MEMBER]; bound values for two datums with alignment padding in + * between. First datum is min, second is max. Size determined by the datum type or the VARLENA + * header */ +} SegmentMetaMinMax; + +SegmentMetaMinMaxBuilder * +segment_meta_min_max_builder_create(Oid type_oid, Oid collation) +{ + SegmentMetaMinMaxBuilder *builder = palloc(sizeof(*builder)); + TypeCacheEntry *type = lookup_type_cache(type_oid, TYPECACHE_LT_OPR); + + if (!OidIsValid(type->lt_opr)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an less-than operator for type %s", + format_type_be(type_oid)))); + + *builder = (SegmentMetaMinMaxBuilder){ + .type_oid = type_oid, + .empty = true, + .has_null = false, + .type_by_val = type->typbyval, + .type_len = type->typlen, + }; + + builder->ssup.ssup_cxt = CurrentMemoryContext; + builder->ssup.ssup_collation = collation; + builder->ssup.ssup_nulls_first = false; + + PrepareSortSupportFromOrderingOp(type->lt_opr, &builder->ssup); + + return builder; +} + +void +segment_meta_min_max_builder_update_val(SegmentMetaMinMaxBuilder *builder, Datum val) +{ + int cmp; + + if (builder->empty) + { + builder->min = datumCopy(val, builder->type_by_val, builder->type_len); + builder->max = datumCopy(val, builder->type_by_val, builder->type_len); + builder->empty = false; + return; + } + + cmp = ApplySortComparator(builder->min, false, val, false, &builder->ssup); + if (cmp > 0) + builder->min = datumCopy(val, builder->type_by_val, builder->type_len); + + cmp = ApplySortComparator(builder->max, false, val, false, &builder->ssup); + if (cmp < 0) + builder->max = datumCopy(val, builder->type_by_val, builder->type_len); +} + +void +segment_meta_min_max_builder_update_null(SegmentMetaMinMaxBuilder *builder) +{ + builder->has_null = true; +} + +SegmentMetaMinMax * +segment_meta_min_max_builder_finish(SegmentMetaMinMaxBuilder *builder) +{ + SegmentMetaMinMax *res; + uint8 flags = 0; + Size total_size = sizeof(*res); + DatumSerializer *serializer; + char *data; + + if (builder->empty) + return NULL; + + serializer = create_datum_serializer(builder->type_oid); + + if (builder->has_null) + flags |= HAS_NULLS; + + if (builder->type_len == -1) + { + /* detoast if necessary. should never store toast pointers */ + builder->min = PointerGetDatum(PG_DETOAST_DATUM_PACKED(builder->min)); + builder->max = PointerGetDatum(PG_DETOAST_DATUM_PACKED(builder->max)); + } + + total_size = datum_get_bytes_size(serializer, total_size, builder->min); + total_size = datum_get_bytes_size(serializer, total_size, builder->max); + + res = palloc0(total_size); + *res = (SegmentMetaMinMax){ + .version = SEGMENT_SEGMENT_MIN_MAX_V1, + .flags = flags, + .type = builder->type_oid, + }; + + SET_VARSIZE(res, total_size); + + data = (char *) res + sizeof(*res); + total_size -= sizeof(*res); + + data = datum_to_bytes_and_advance(serializer, data, &total_size, builder->min); + data = datum_to_bytes_and_advance(serializer, data, &total_size, builder->max); + + Assert(total_size == 0); + + return res; +} + +static void +segment_meta_min_max_get_deconstruct(SegmentMetaMinMax *meta, DatumDeserializer *deser, Datum *min, + Datum *max) +{ + char *data = (char *) meta + sizeof(*meta); + /* skip the min */ + *min = bytes_to_datum_and_advance(deser, &data); + *max = bytes_to_datum_and_advance(deser, &data); +} + +bytea * +segment_meta_min_max_to_binary_string(SegmentMetaMinMax *meta) +{ + StringInfoData buf; + DatumDeserializer *deser = create_datum_deserializer(meta->type); + DatumSerializer *ser = create_datum_serializer(meta->type); + Datum min, max; + + segment_meta_min_max_get_deconstruct(meta, deser, &min, &max); + pq_begintypsend(&buf); + pq_sendbyte(&buf, meta->version); + pq_sendbyte(&buf, meta->flags); + type_append_to_binary_string(meta->type, &buf); + + datum_append_to_binary_string(ser, &buf, min); + datum_append_to_binary_string(ser, &buf, max); + + return pq_endtypsend(&buf); +} + +SegmentMetaMinMax * +segment_meta_min_max_from_binary_string(StringInfo buf) +{ + uint8 version = pq_getmsgbyte(buf); + + if (version == SEGMENT_SEGMENT_MIN_MAX_V1) + { + uint8 flags = pq_getmsgbyte(buf); + Oid type_oid = binary_string_get_type(buf); + DatumDeserializer *deser = create_datum_deserializer(type_oid); + TypeCacheEntry *type = lookup_type_cache(type_oid, 0); + SegmentMetaMinMaxBuilder builder = (SegmentMetaMinMaxBuilder){ + .type_oid = type_oid, + .empty = false, + .has_null = (flags & HAS_NULLS) != 0, + .type_by_val = type->typbyval, + .type_len = type->typlen, + .min = binary_string_to_datum(deser, buf), + .max = binary_string_to_datum(deser, buf), + }; + + return segment_meta_min_max_builder_finish(&builder); + } + else + elog(ERROR, "Unknown version number for segment meta min max: %d", version); +} + +Datum +tsl_segment_meta_min_max_get_min(Datum meta_datum, Oid type) +{ + SegmentMetaMinMax *meta = (SegmentMetaMinMax *) DatumGetPointer(meta_datum); + DatumDeserializer *deser; + Datum min, max; + + if (type != meta->type) + elog(ERROR, "wrong type requested from segment_meta_min_max"); + + deser = create_datum_deserializer(meta->type); + segment_meta_min_max_get_deconstruct(meta, deser, &min, &max); + return min; +} + +Datum +tsl_segment_meta_min_max_get_max(Datum meta_datum, Oid type) +{ + SegmentMetaMinMax *meta = (SegmentMetaMinMax *) DatumGetPointer(meta_datum); + DatumDeserializer *deser = create_datum_deserializer(meta->type); + Datum min, max; + + if (type != meta->type) + elog(ERROR, "wrong type requested from segment_meta_min_max"); + segment_meta_min_max_get_deconstruct(meta, deser, &min, &max); + return max; +} + +bool +tsl_segment_meta_min_max_has_null(Datum meta_datum) +{ + SegmentMetaMinMax *meta = (SegmentMetaMinMax *) DatumGetPointer(meta_datum); + return (meta->flags & HAS_NULLS) != 0; +} diff --git a/tsl/src/compression/segment_meta.h b/tsl/src/compression/segment_meta.h new file mode 100644 index 000000000..c31458845 --- /dev/null +++ b/tsl/src/compression/segment_meta.h @@ -0,0 +1,39 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#ifndef TIMESCALEDB_TSL_COMPRESSION_SEGMENT_META_H +#define TIMESCALEDB_TSL_COMPRESSION_SEGMENT_META_H + +#include +#include + +typedef struct SegmentMetaMinMax SegmentMetaMinMax; +typedef struct SegmentMetaMinMaxBuilder SegmentMetaMinMaxBuilder; + +SegmentMetaMinMaxBuilder *segment_meta_min_max_builder_create(Oid type, Oid collation); +void segment_meta_min_max_builder_update_val(SegmentMetaMinMaxBuilder *builder, Datum val); +void segment_meta_min_max_builder_update_null(SegmentMetaMinMaxBuilder *builder); +SegmentMetaMinMax *segment_meta_min_max_builder_finish(SegmentMetaMinMaxBuilder *builder); + +Datum tsl_segment_meta_min_max_get_min(Datum meta, Oid type); +Datum tsl_segment_meta_min_max_get_max(Datum meta, Oid type); +bool tsl_segment_meta_min_max_has_null(Datum meta); + +bytea *segment_meta_min_max_to_binary_string(SegmentMetaMinMax *meta); + +SegmentMetaMinMax *segment_meta_min_max_from_binary_string(StringInfo buf); + +static inline bytea * +tsl_segment_meta_min_max_send(Datum arg1) +{ + return segment_meta_min_max_to_binary_string((SegmentMetaMinMax *) DatumGetPointer(arg1)); +} + +static inline Datum +tsl_segment_meta_min_max_recv(StringInfo buf) +{ + return PointerGetDatum(segment_meta_min_max_from_binary_string(buf)); +} +#endif diff --git a/tsl/src/init.c b/tsl/src/init.c index f50132ce4..6cf1121b1 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -35,6 +35,7 @@ #include "hypertable.h" #include "compression/create.h" #include "compression/compress_utils.h" +#include "compression/segment_meta.h" #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; @@ -110,6 +111,11 @@ CrossModuleFunctions tsl_cm_functions = { .process_compress_table = tsl_process_compress_table, .compress_chunk = tsl_compress_chunk, .decompress_chunk = tsl_decompress_chunk, + .segment_meta_min_max_send = tsl_segment_meta_min_max_send, + .segment_meta_min_max_recv = tsl_segment_meta_min_max_recv, + .segment_meta_min_max_get_min = tsl_segment_meta_min_max_get_min, + .segment_meta_min_max_get_max = tsl_segment_meta_min_max_get_max, + .segment_meta_min_max_has_null = tsl_segment_meta_min_max_has_null, }; TS_FUNCTION_INFO_V1(ts_module_init); diff --git a/tsl/test/expected/compression_algos.out b/tsl/test/expected/compression_algos.out index 7614c3363..45ebcfcfc 100644 --- a/tsl/test/expected/compression_algos.out +++ b/tsl/test/expected/compression_algos.out @@ -34,7 +34,7 @@ $$ select (16807 * $1) % 2147483647 $$; create function gen_rand_minstd() returns bigint -language sql as +language sql security definer as $$ update rand_minstd_state set i = rand_minstd_advance(i) returning i $$; diff --git a/tsl/test/expected/compression_hypertable.out b/tsl/test/expected/compression_hypertable.out index 0cb58fa3b..8e43924db 100644 --- a/tsl/test/expected/compression_hypertable.out +++ b/tsl/test/expected/compression_hypertable.out @@ -15,7 +15,7 @@ $$ select (16807 * $1) % 2147483647 $$; create function gen_rand_minstd() returns bigint -language sql as +language sql security definer as $$ update rand_minstd_state set i = rand_minstd_advance(i) returning i $$; diff --git a/tsl/test/expected/compression_segment_meta.out b/tsl/test/expected/compression_segment_meta.out new file mode 100644 index 000000000..05a89e898 --- /dev/null +++ b/tsl/test/expected/compression_segment_meta.out @@ -0,0 +1,275 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE OR REPLACE FUNCTION _timescaledb_internal.tsl_segment_meta_min_max_append(internal, ANYELEMENT) + RETURNS internal + AS :TSL_MODULE_PATHNAME, 'tsl_segment_meta_min_max_append' + LANGUAGE C IMMUTABLE PARALLEL SAFE; +CREATE OR REPLACE FUNCTION _timescaledb_internal.tsl_segment_meta_min_max_finish(internal) + RETURNS _timescaledb_internal.segment_meta_min_max + AS :TSL_MODULE_PATHNAME, 'tsl_segment_meta_min_max_finish' + LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT; +CREATE AGGREGATE _timescaledb_internal.segment_meta_min_max_agg(ANYELEMENT) ( + STYPE = internal, + SFUNC = _timescaledb_internal.tsl_segment_meta_min_max_append, + FINALFUNC = _timescaledb_internal.tsl_segment_meta_min_max_finish +); +\ir include/rand_generator.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license +-------------------------- +-- cheap rand generator -- +-------------------------- +create table rand_minstd_state(i bigint); +create function rand_minstd_advance(bigint) returns bigint +language sql immutable as +$$ + select (16807 * $1) % 2147483647 +$$; +create function gen_rand_minstd() returns bigint +language sql security definer as +$$ + update rand_minstd_state set i = rand_minstd_advance(i) returning i +$$; +-- seed the random num generator +insert into rand_minstd_state values (321); +--use a custom type without send and recv functions to test +--the input/output fallback path. +CREATE TYPE customtype_no_send_recv; +CREATE OR REPLACE FUNCTION customtype_in(cstring) RETURNS customtype_no_send_recv AS +'timestamptz_in' +LANGUAGE internal IMMUTABLE STRICT; +NOTICE: return type customtype_no_send_recv is only a shell +CREATE OR REPLACE FUNCTION customtype_out( customtype_no_send_recv) RETURNS cstring AS +'timestamptz_out' +LANGUAGE internal IMMUTABLE STRICT; +NOTICE: argument type customtype_no_send_recv is only a shell +CREATE TYPE customtype_no_send_recv ( + INPUT = customtype_in, + OUTPUT = customtype_out, + INTERNALLENGTH = 8, + PASSEDBYVALUE, + ALIGNMENT = double, + STORAGE = plain +); +CREATE CAST (customtype_no_send_recv AS bigint) +WITHOUT FUNCTION AS IMPLICIT; +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +CREATE TABLE metric (i int); +insert into metric select i from generate_series(1, 10) i; +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(meta, NULL::int), + _timescaledb_internal.segment_meta_min_max_get_max(meta, NULL::int), + _timescaledb_internal.segment_meta_min_max_has_null(meta) +FROM +( + SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) as meta + FROM metric +) AS meta_gen; + segment_meta_min_max_get_min | segment_meta_min_max_get_max | segment_meta_min_max_has_null +------------------------------+------------------------------+------------------------------- + 1 | 10 | f +(1 row) + +\set TYPE int +\set TABLE metric +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +----NULL tests +--First +truncate metric; +insert into metric select NULLIF(i,1) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--Last +truncate metric; +insert into metric select NULLIF(i,10) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--Middle +truncate metric; +insert into metric select NULLIF(i,5) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--All NULLS should return null object +truncate metric; +insert into metric select NULL from generate_series(1, 10) i; +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) is NULL, + _timescaledb_internal.segment_meta_min_max_agg(i)::text is NULL +FROM metric; + ?column? | ?column? +----------+---------- + t | t +(1 row) + +--accessor functions work on NULLs +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(NULL, NULL::int) IS NULL, + _timescaledb_internal.segment_meta_min_max_get_max(NULL, NULL::int) IS NULL, + _timescaledb_internal.segment_meta_min_max_has_null(NULL); + ?column? | ?column? | segment_meta_min_max_has_null +----------+----------+------------------------------- + t | t | t +(1 row) + +-- +--type tests +-- +--untoasted text +CREATE TABLE base_texts AS SELECT + repeat(item::text, 1) as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set TYPE text +\set TABLE base_texts +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--toasted text +DROP TABLE base_texts; +CREATE TABLE base_texts AS SELECT + repeat(item::text, 100000) as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +--make sure it's toasted +SELECT pg_total_relation_size(reltoastrelid) + FROM pg_class c + WHERE relname = 'base_texts'; + pg_total_relation_size +------------------------ + 24576 +(1 row) + +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--name is a fixed-length pass by reference type +CREATE TABLE base_name AS SELECT + item::name as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set TYPE name +\set TABLE base_name +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--array +CREATE TABLE text_array AS SELECT + array[item::text, 'abab'] as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set TYPE text[] +\set TABLE text_array +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + +--Points doesn't have an ordering so make sure it errors +CREATE TABLE points AS SELECT + point '(0,1)' as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set ON_ERROR_STOP 0 +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) +FROM points; +ERROR: could not identify an less-than operator for type point +\set ON_ERROR_STOP 1 +--test with a custom type with no send/recv +CREATE TABLE customtype_table AS SELECT + item::text::customtype_no_send_recv as i + FROM + (SELECT sub.item from + (SELECT generate_series('2001-01-01 01:01:01', '2001-01-02 01:01:01', INTERVAL '1 hour') item) as sub + ORDER BY gen_rand_minstd() + ) sub; +\set TYPE customtype_no_send_recv +\set TABLE customtype_table +\ir include/compression_test_segment_meta.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set ECHO errors + min_correct | max_correct | has_null_correct +-------------+-------------+------------------ + t | t | t +(1 row) + diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 941eb76f2..ebbebc84a 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -18,6 +18,7 @@ set(TEST_FILES_DEBUG compression_algos.sql compression_errors.sql compression_hypertable.sql + compression_segment_meta.sql continuous_aggs.sql continuous_aggs_bgw.sql continuous_aggs_materialize.sql diff --git a/tsl/test/sql/compression_segment_meta.sql b/tsl/test/sql/compression_segment_meta.sql new file mode 100644 index 000000000..d8544c839 --- /dev/null +++ b/tsl/test/sql/compression_segment_meta.sql @@ -0,0 +1,190 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license + +\c :TEST_DBNAME :ROLE_SUPERUSER + +CREATE OR REPLACE FUNCTION _timescaledb_internal.tsl_segment_meta_min_max_append(internal, ANYELEMENT) + RETURNS internal + AS :TSL_MODULE_PATHNAME, 'tsl_segment_meta_min_max_append' + LANGUAGE C IMMUTABLE PARALLEL SAFE; + +CREATE OR REPLACE FUNCTION _timescaledb_internal.tsl_segment_meta_min_max_finish(internal) + RETURNS _timescaledb_internal.segment_meta_min_max + AS :TSL_MODULE_PATHNAME, 'tsl_segment_meta_min_max_finish' + LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT; + +CREATE AGGREGATE _timescaledb_internal.segment_meta_min_max_agg(ANYELEMENT) ( + STYPE = internal, + SFUNC = _timescaledb_internal.tsl_segment_meta_min_max_append, + FINALFUNC = _timescaledb_internal.tsl_segment_meta_min_max_finish +); + +\ir include/rand_generator.sql + +--use a custom type without send and recv functions to test +--the input/output fallback path. +CREATE TYPE customtype_no_send_recv; + +CREATE OR REPLACE FUNCTION customtype_in(cstring) RETURNS customtype_no_send_recv AS +'timestamptz_in' +LANGUAGE internal IMMUTABLE STRICT; +CREATE OR REPLACE FUNCTION customtype_out( customtype_no_send_recv) RETURNS cstring AS +'timestamptz_out' +LANGUAGE internal IMMUTABLE STRICT; + +CREATE TYPE customtype_no_send_recv ( + INPUT = customtype_in, + OUTPUT = customtype_out, + INTERNALLENGTH = 8, + PASSEDBYVALUE, + ALIGNMENT = double, + STORAGE = plain +); + +CREATE CAST (customtype_no_send_recv AS bigint) +WITHOUT FUNCTION AS IMPLICIT; + + +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER + +CREATE TABLE metric (i int); +insert into metric select i from generate_series(1, 10) i; + +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(meta, NULL::int), + _timescaledb_internal.segment_meta_min_max_get_max(meta, NULL::int), + _timescaledb_internal.segment_meta_min_max_has_null(meta) +FROM +( + SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) as meta + FROM metric +) AS meta_gen; + +\set TYPE int +\set TABLE metric +\ir include/compression_test_segment_meta.sql + +----NULL tests +--First +truncate metric; +insert into metric select NULLIF(i,1) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql + +--Last +truncate metric; +insert into metric select NULLIF(i,10) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql + +--Middle +truncate metric; +insert into metric select NULLIF(i,5) from generate_series(1, 10) i; +\ir include/compression_test_segment_meta.sql + +--All NULLS should return null object +truncate metric; +insert into metric select NULL from generate_series(1, 10) i; + +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) is NULL, + _timescaledb_internal.segment_meta_min_max_agg(i)::text is NULL +FROM metric; + +--accessor functions work on NULLs +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(NULL, NULL::int) IS NULL, + _timescaledb_internal.segment_meta_min_max_get_max(NULL, NULL::int) IS NULL, + _timescaledb_internal.segment_meta_min_max_has_null(NULL); + + + +-- +--type tests +-- + +--untoasted text +CREATE TABLE base_texts AS SELECT + repeat(item::text, 1) as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set TYPE text +\set TABLE base_texts +\ir include/compression_test_segment_meta.sql + +--toasted text +DROP TABLE base_texts; +CREATE TABLE base_texts AS SELECT + repeat(item::text, 100000) as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; +--make sure it's toasted +SELECT pg_total_relation_size(reltoastrelid) + FROM pg_class c + WHERE relname = 'base_texts'; + +\ir include/compression_test_segment_meta.sql + +--name is a fixed-length pass by reference type +CREATE TABLE base_name AS SELECT + item::name as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set TYPE name +\set TABLE base_name +\ir include/compression_test_segment_meta.sql + + +--array + +CREATE TABLE text_array AS SELECT + array[item::text, 'abab'] as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set TYPE text[] +\set TABLE text_array +\ir include/compression_test_segment_meta.sql + +--Points doesn't have an ordering so make sure it errors +CREATE TABLE points AS SELECT + point '(0,1)' as i + FROM + (SELECT sub.item from + (SELECT generate_series(1, 10) item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set ON_ERROR_STOP 0 +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i) +FROM points; +\set ON_ERROR_STOP 1 + +--test with a custom type with no send/recv + +CREATE TABLE customtype_table AS SELECT + item::text::customtype_no_send_recv as i + FROM + (SELECT sub.item from + (SELECT generate_series('2001-01-01 01:01:01', '2001-01-02 01:01:01', INTERVAL '1 hour') item) as sub + ORDER BY gen_rand_minstd() + ) sub; + +\set TYPE customtype_no_send_recv +\set TABLE customtype_table +\ir include/compression_test_segment_meta.sql diff --git a/tsl/test/sql/include/compression_test_segment_meta.sql b/tsl/test/sql/include/compression_test_segment_meta.sql new file mode 100644 index 000000000..f507331db --- /dev/null +++ b/tsl/test/sql/include/compression_test_segment_meta.sql @@ -0,0 +1,28 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\set ECHO errors + +SELECT 'NULL::'||:'TYPE' as "NULLTYPE" \gset + +SELECT + _timescaledb_internal.segment_meta_min_max_agg(i)::text as "META_TEXT", + min(i) as "TRUE_MIN", + max(i) as "TRUE_MAX", + (count(*)-count(i)) > 0 as "TRUE_HAS_NULL" +FROM :"TABLE" \gset + +SELECT + _timescaledb_internal.segment_meta_min_max_get_min(meta, :NULLTYPE) = :'TRUE_MIN' as min_correct, + _timescaledb_internal.segment_meta_min_max_get_max(meta, :NULLTYPE) = :'TRUE_MAX' as max_correct, + _timescaledb_internal.segment_meta_min_max_has_null(meta) = :'TRUE_HAS_NULL' as has_null_correct +FROM +( + SELECT + :'META_TEXT'::_timescaledb_internal.segment_meta_min_max as meta +) AS meta_gen; + + + +\set ECHO all diff --git a/tsl/test/sql/include/rand_generator.sql b/tsl/test/sql/include/rand_generator.sql index 35d39653d..482924ac2 100644 --- a/tsl/test/sql/include/rand_generator.sql +++ b/tsl/test/sql/include/rand_generator.sql @@ -14,7 +14,7 @@ $$ $$; create function gen_rand_minstd() returns bigint -language sql as +language sql security definer as $$ update rand_minstd_state set i = rand_minstd_advance(i) returning i $$; diff --git a/tsl/test/src/test_compression.c b/tsl/test/src/test_compression.c index a36a6dbb6..e95c1846a 100644 --- a/tsl/test/src/test_compression.c +++ b/tsl/test/src/test_compression.c @@ -28,6 +28,7 @@ #include "compression/gorilla.h" #include "compression/deltadelta.h" #include "compression/utils.h" +#include "compression/segment_meta.h" #define VEC_PREFIX compression_info #define VEC_ELEMENT_TYPE Form_hypertable_compression @@ -543,3 +544,53 @@ ts_decompress_table(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +TS_FUNCTION_INFO_V1(tsl_segment_meta_min_max_append); + +Datum +tsl_segment_meta_min_max_append(PG_FUNCTION_ARGS) +{ + SegmentMetaMinMaxBuilder *builder = + (SegmentMetaMinMaxBuilder *) (PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0)); + MemoryContext agg_context; + MemoryContext old_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + { + /* cannot be called directly because of internal-type argument */ + elog(ERROR, "tsl_segment_meta_min_max_append called in non-aggregate context"); + } + + old_context = MemoryContextSwitchTo(agg_context); + + if (builder == NULL) + { + Oid type_to_compress = get_fn_expr_argtype(fcinfo->flinfo, 1); + builder = segment_meta_min_max_builder_create(type_to_compress, fcinfo->fncollation); + } + if (PG_ARGISNULL(1)) + segment_meta_min_max_builder_update_null(builder); + else + segment_meta_min_max_builder_update_val(builder, PG_GETARG_DATUM(1)); + + MemoryContextSwitchTo(old_context); + PG_RETURN_POINTER(builder); +} + +TS_FUNCTION_INFO_V1(tsl_segment_meta_min_max_finish); +Datum +tsl_segment_meta_min_max_finish(PG_FUNCTION_ARGS) +{ + SegmentMetaMinMaxBuilder *builder = + (SegmentMetaMinMaxBuilder *) (PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0)); + SegmentMetaMinMax *res; + + if (builder == NULL) + PG_RETURN_NULL(); + + res = segment_meta_min_max_builder_finish(builder); + if (res == NULL) + PG_RETURN_NULL(); + + PG_RETURN_POINTER(res); +}