diff --git a/cmake/ScriptFiles.cmake b/cmake/ScriptFiles.cmake index 3cea2a29e..b9e987955 100644 --- a/cmake/ScriptFiles.cmake +++ b/cmake/ScriptFiles.cmake @@ -27,6 +27,7 @@ set(PRE_INSTALL_FUNCTION_FILES set(SOURCE_FILES hypertable.sql chunk.sql + compression.sql ddl_internal.sql util_time.sql util_internal_table_ddl.sql diff --git a/sql/compression.sql b/sql/compression.sql new file mode 100644 index 000000000..e1bd7aac4 --- /dev/null +++ b/sql/compression.sql @@ -0,0 +1,9 @@ +-- This file and its contents are licensed under the Apache License 2.0. +-- Please see the included NOTICE for copyright information and +-- LICENSE-APACHE for a copy of the license. + +CREATE FUNCTION ts_compressionam_handler(internal) RETURNS table_am_handler +AS '@MODULE_PATHNAME@', 'ts_compressionam_handler' LANGUAGE C; + +CREATE ACCESS METHOD tscompression TYPE TABLE HANDLER ts_compressionam_handler; +COMMENT ON ACCESS METHOD tscompression IS 'TimescaleDB columnar compression'; diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index d3f5a12fa..aa9ca0cae 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -1 +1,3 @@ - +-- Hyperstore AM +DROP ACCESS METHOD IF EXISTS tscompression; +DROP FUNCTION IF EXISTS ts_compressionam_handler; diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 886f58aa5..0c3ba54fc 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -78,6 +78,7 @@ CROSSMODULE_WRAPPER(array_compressor_finish); CROSSMODULE_WRAPPER(create_compressed_chunk); CROSSMODULE_WRAPPER(compress_chunk); CROSSMODULE_WRAPPER(decompress_chunk); +CROSSMODULE_WRAPPER(compressionam_handler); /* continuous aggregate */ CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger); @@ -276,6 +277,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .set_rel_pathlist_dml = NULL, .set_rel_pathlist_query = NULL, .set_rel_pathlist = NULL, + .ddl_command_start = NULL, .process_altertable_cmd = NULL, .process_rename_cmd = NULL, @@ -360,6 +362,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .dictionary_compressor_finish = error_no_default_fn_pg_community, .array_compressor_append = error_no_default_fn_pg_community, .array_compressor_finish = error_no_default_fn_pg_community, + .compressionam_handler = error_no_default_fn_pg_community, .show_chunk = error_no_default_fn_pg_community, .create_chunk = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 08548d4cf..e9992bb7d 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -89,6 +89,8 @@ typedef struct CrossModuleFunctions PGFunction reorder_chunk; PGFunction move_chunk; + void (*ddl_command_start)(ProcessUtilityArgs *args); + /* Vectorized queries */ void (*tsl_postprocess_plan)(PlannedStmt *stmt); @@ -143,6 +145,7 @@ typedef struct CrossModuleFunctions PGFunction dictionary_compressor_finish; PGFunction array_compressor_append; PGFunction array_compressor_finish; + PGFunction compressionam_handler; PGFunction create_chunk; PGFunction show_chunk; diff --git a/src/process_utility.c b/src/process_utility.c index ce2f0ee15..21f4c45f0 100644 --- a/src/process_utility.c +++ b/src/process_utility.c @@ -140,6 +140,10 @@ check_chunk_alter_table_operation_allowed(Oid relid, AlterTableStmt *stmt) case AT_SetTableSpace: case AT_ReAddStatistics: case AT_SetCompression: +#if PG15_GE + + case AT_SetAccessMethod: +#endif /* allowed on chunks */ break; case AT_AddConstraint: @@ -4618,6 +4622,14 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool */ result = process_ddl_command_start(&args); + /* + * We need to run tsl-side ddl_command_start hook before + * standard process utility hook to maintain proper invocation + * order of sql_drop and ddl_command_end triggers. + */ + if (ts_cm_functions->ddl_command_start) + ts_cm_functions->ddl_command_start(&args); + /* * We need to run tsl-side ddl_command_start hook before * standard process utility hook to maintain proper invocation diff --git a/src/utils.c b/src/utils.c index 155a9e0b1..2c3a0cf9a 100644 --- a/src/utils.c +++ b/src/utils.c @@ -1802,24 +1802,11 @@ static Oid hypercore_amoid = InvalidOid; bool ts_is_hypercore_am(Oid amoid) { - /* Can't use InvalidOid as an indication of non-cached value since - get_am_oid() will return InvalidOid when the access method does not - exist and we will do the lookup every time this query is called. This - boolean can be removed once we know that there should exist an access - method with the given name. */ - static bool iscached = false; - - if (!iscached && !OidIsValid(hypercore_amoid)) - { - hypercore_amoid = get_am_oid("hypercore", true); - iscached = true; - } - if (!OidIsValid(hypercore_amoid)) - return false; + hypercore_amoid = get_table_am_oid("tscompression", true); - /* Shouldn't get here for now */ - Assert(false); + if (!OidIsValid(amoid) || !OidIsValid(hypercore_amoid)) + return false; return amoid == hypercore_amoid; } diff --git a/test/pgtest/CMakeLists.txt b/test/pgtest/CMakeLists.txt index dbe793fe1..6c02a9ab5 100644 --- a/test/pgtest/CMakeLists.txt +++ b/test/pgtest/CMakeLists.txt @@ -20,7 +20,14 @@ file(READ ${PG_REGRESS_DIR}/parallel_schedule PG_TEST_SCHEDULE) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/testtablespace) # Tests to ignore -set(PG_IGNORE_TESTS database event_trigger opr_sanity sanity_check type_sanity) +set(PG_IGNORE_TESTS + database + event_trigger + opr_sanity + sanity_check + type_sanity + create_am + psql) # Modify the test schedule to ignore some tests foreach(IGNORE_TEST ${PG_IGNORE_TESTS}) diff --git a/tsl/src/compression/CMakeLists.txt b/tsl/src/compression/CMakeLists.txt index 75e8486dd..9fe8ed099 100644 --- a/tsl/src/compression/CMakeLists.txt +++ b/tsl/src/compression/CMakeLists.txt @@ -1,9 +1,11 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/api.c + ${CMAKE_CURRENT_SOURCE_DIR}/arrow_tts.c ${CMAKE_CURRENT_SOURCE_DIR}/compression.c ${CMAKE_CURRENT_SOURCE_DIR}/compression_dml.c ${CMAKE_CURRENT_SOURCE_DIR}/compression_scankey.c ${CMAKE_CURRENT_SOURCE_DIR}/compression_storage.c + ${CMAKE_CURRENT_SOURCE_DIR}/compressionam_handler.c ${CMAKE_CURRENT_SOURCE_DIR}/create.c ${CMAKE_CURRENT_SOURCE_DIR}/segment_meta.c) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/compression/api.c b/tsl/src/compression/api.c index 88da50d3b..03d86606e 100644 --- a/tsl/src/compression/api.c +++ b/tsl/src/compression/api.c @@ -71,7 +71,7 @@ create_dummy_query() return (Node *) query; } -static void +void compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *src_size, int32 compress_chunk_id, const RelationSize *compress_size, int64 rowcnt_pre_compression, int64 rowcnt_post_compression, diff --git a/tsl/src/compression/api.h b/tsl/src/compression/api.h index 5975a2d52..44c8660da 100644 --- a/tsl/src/compression/api.h +++ b/tsl/src/compression/api.h @@ -7,6 +7,7 @@ #include #include +#include #include "chunk.h" @@ -18,3 +19,10 @@ extern Datum tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS); extern Datum tsl_get_compressed_chunk_index_for_recompression( PG_FUNCTION_ARGS); // arg is oid of uncompressed chunk + +extern void compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *src_size, + int32 compress_chunk_id, + const RelationSize *compress_size, + int64 rowcnt_pre_compression, + int64 rowcnt_post_compression, + int64 rowcnt_frozen); diff --git a/tsl/src/compression/arrow_tts.c b/tsl/src/compression/arrow_tts.c new file mode 100644 index 000000000..6fcf7a5f7 --- /dev/null +++ b/tsl/src/compression/arrow_tts.c @@ -0,0 +1,380 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include +#include + +#include "arrow_tts.h" +#include "compression.h" +#include "custom_type_cache.h" +#include "utils/palloc.h" + +/* + * Get a map of attribute offsets that maps non-compressed offsets to + * compressed offsets. + * + * The map is needed since the compressed relation has additional metadata + * columns that the non-compressed relation doesn't have. When adding new + * columns, the new column's attribute number will be higher on the compressed + * relation compared to the regular one. + */ +static const int16 * +arrow_slot_get_attribute_offset_map(TupleTableSlot *slot) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + const TupleDesc tupdesc = slot->tts_tupleDescriptor; + const TupleDesc ctupdesc = aslot->compressed_slot->tts_tupleDescriptor; + + if (NULL == aslot->attrs_offset_map) + { + MemoryContext oldmcxt = MemoryContextSwitchTo(slot->tts_mcxt); + aslot->attrs_offset_map = build_attribute_offset_map(tupdesc, ctupdesc, NULL); + MemoryContextSwitchTo(oldmcxt); + } + + return aslot->attrs_offset_map; +} + +static void +tts_arrow_init(TupleTableSlot *slot) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + aslot->arrow_columns = NULL; + aslot->compressed_slot = NULL; + aslot->segmentby_columns = NULL; + aslot->decompression_mcxt = AllocSetContextCreate(slot->tts_mcxt, + "bulk decompression", + /* minContextSize = */ 0, + /* initBlockSize = */ 64 * 1024, + /* maxBlockSize = */ 64 * 1024); +} + +static void +tts_arrow_release(TupleTableSlot *slot) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + + if (NULL != aslot->arrow_columns) + { + aslot->arrow_columns = NULL; + } +} + +static void +tts_arrow_clear(TupleTableSlot *slot) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + + if (unlikely(TTS_SHOULDFREE(slot))) + { + /* The tuple is materialized, so free materialized memory */ + } + + if (aslot->arrow_columns) + { + for (int i = 0; i < slot->tts_tupleDescriptor->natts; i++) + { + ArrowArray *arr = aslot->arrow_columns[i]; + + if (arr) + { + pfree(arr); + } + + aslot->arrow_columns[i] = NULL; + } + + pfree(aslot->arrow_columns); + aslot->arrow_columns = NULL; + } + + aslot->compressed_slot = NULL; + + slot->tts_nvalid = 0; + slot->tts_flags |= TTS_FLAG_EMPTY; + ItemPointerSetInvalid(&slot->tts_tid); +} + +static inline void +tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *compressed_slot, uint16 tuple_index) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + + slot->tts_flags &= ~TTS_FLAG_EMPTY; + aslot->compressed_slot = compressed_slot; + aslot->tuple_index = tuple_index; + ItemPointerCopy(&compressed_slot->tts_tid, &slot->tts_tid); + slot->tts_tid.ip_blkid.bi_hi = tuple_index; + Assert(!TTS_EMPTY(aslot->compressed_slot)); +} + +TupleTableSlot * +ExecStoreArrowTuple(TupleTableSlot *slot, TupleTableSlot *compressed_slot, uint16 tuple_index) +{ + Assert(slot != NULL); + Assert(slot->tts_tupleDescriptor != NULL); + Assert(!TTS_EMPTY(compressed_slot)); + + if (unlikely(!TTS_IS_ARROWTUPLE(slot))) + elog(ERROR, "trying to store an on-disk parquet tuple into wrong type of slot"); + + ExecClearTuple(slot); + tts_arrow_store_tuple(slot, compressed_slot, tuple_index); + + Assert(!TTS_EMPTY(slot)); + Assert(!TTS_SHOULDFREE(slot)); + + return slot; +} + +TupleTableSlot * +ExecStoreArrowTupleExisting(TupleTableSlot *slot, uint16 tuple_index) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + + Assert(!TTS_EMPTY(slot)); + aslot->tuple_index = tuple_index; + slot->tts_tid.ip_blkid.bi_hi = aslot->tuple_index; + slot->tts_nvalid = 0; + + return slot; +} + +/* + * Materialize an Arrow slot. + */ +static void +tts_arrow_materialize(TupleTableSlot *slot) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + + Assert(!TTS_EMPTY(slot)); + + TTSOpsBufferHeapTuple.materialize(slot); + + if (aslot->compressed_slot) + ExecMaterializeSlot(aslot->compressed_slot); +} + +static bool +is_compressed_col(const TupleDesc tupdesc, AttrNumber attno) +{ + static CustomTypeInfo *typinfo = NULL; + Oid coltypid = tupdesc->attrs[AttrNumberGetAttrOffset(attno)].atttypid; + + if (typinfo == NULL) + typinfo = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA); + + return coltypid == typinfo->type_oid; +} + +static void +tts_arrow_getsomeattrs(TupleTableSlot *slot, int natts) +{ + ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot; + const TupleDesc tupdesc = slot->tts_tupleDescriptor; + const TupleDesc compressed_tupdesc = aslot->compressed_slot->tts_tupleDescriptor; + const int16 *attrs_map; + int16 cnatts = -1; + + if (natts < 1 || natts > slot->tts_tupleDescriptor->natts) + elog(ERROR, "invalid number of attributes requested"); + + attrs_map = arrow_slot_get_attribute_offset_map(slot); + + /* Find highest attribute number/offset in compressed relation */ + for (int i = 0; i < natts; i++) + { + int16 coff = attrs_map[AttrNumberGetAttrOffset(natts)]; + + if (AttrOffsetGetAttrNumber(coff) > cnatts) + cnatts = AttrOffsetGetAttrNumber(coff); + } + + Assert(cnatts > 0); + + slot_getsomeattrs(aslot->compressed_slot, cnatts); + + if (NULL == aslot->arrow_columns) + { + aslot->arrow_columns = + MemoryContextAllocZero(slot->tts_mcxt, + sizeof(ArrowArray *) * slot->tts_tupleDescriptor->natts); + } + + for (int i = 0; i < natts; i++) + { + const int16 cattoff = attrs_map[i]; + const AttrNumber attno = AttrOffsetGetAttrNumber(i); + const AttrNumber cattno = AttrOffsetGetAttrNumber(cattoff); + + /* Decompress the column if not already done. */ + if (aslot->arrow_columns[i] == NULL) + { + if (is_compressed_col(compressed_tupdesc, cattno)) + { + bool isnull; + Datum value = slot_getattr(aslot->compressed_slot, cattno, &isnull); + + if (isnull) + { + // do nothing + } + else + { + const Form_pg_attribute attr = &tupdesc->attrs[i]; + const CompressedDataHeader *header = + (CompressedDataHeader *) PG_DETOAST_DATUM(value); + DecompressAllFunction decompress_all = + tsl_get_decompress_all_function(header->compression_algorithm, + attr->atttypid); + Assert(decompress_all != NULL); + MemoryContext oldcxt = MemoryContextSwitchTo(aslot->decompression_mcxt); + aslot->arrow_columns[i] = + decompress_all(PointerGetDatum(header), + slot->tts_tupleDescriptor->attrs[i].atttypid, + slot->tts_mcxt); + MemoryContextReset(aslot->decompression_mcxt); + MemoryContextSwitchTo(oldcxt); + } + } + else + { + /* Since we are looping over the attributes of the + * non-compressed slot, we will either see only compressed + * columns or the segment-by column. If the column is not + * compressed, it must be the segment-by columns. The + * segment-by column is not compressed and the value is the + * same for all rows in the compressed tuple. */ + aslot->arrow_columns[i] = NULL; + slot->tts_values[i] = + slot_getattr(aslot->compressed_slot, cattno, &slot->tts_isnull[i]); + + /* Remember the segment-by column */ + MemoryContext oldcxt = MemoryContextSwitchTo(slot->tts_mcxt); + aslot->segmentby_columns = bms_add_member(aslot->segmentby_columns, attno); + MemoryContextSwitchTo(oldcxt); + } + } + + /* At this point the column should be decompressed, if it is a + * compressed column. */ + if (bms_is_member(attno, aslot->segmentby_columns)) + { + /* Segment-by column. Value already set. */ + } + else if (aslot->arrow_columns[i] == NULL) + { + /* Since the column is not the segment-by column, and there is no + * decompressed data, the column must be NULL. Use the default + * value. */ + slot->tts_values[i] = getmissingattr(slot->tts_tupleDescriptor, + AttrOffsetGetAttrNumber(i), + &slot->tts_isnull[i]); + } + else + { + const char *restrict values = aslot->arrow_columns[i]->buffers[1]; + const uint64 *restrict validity = aslot->arrow_columns[i]->buffers[0]; + int16 value_bytes = get_typlen(slot->tts_tupleDescriptor->attrs[i].atttypid); + + /* + * The conversion of Datum to more narrow types will truncate + * the higher bytes, so we don't care if we read some garbage + * into them, and can always read 8 bytes. These are unaligned + * reads, so technically we have to do memcpy. + */ + uint64 value; + memcpy(&value, &values[value_bytes * aslot->tuple_index], 8); + +#ifdef USE_FLOAT8_BYVAL + Datum datum = Int64GetDatum(value); +#else + /* + * On 32-bit systems, the data larger than 4 bytes go by + * reference, so we have to jump through these hoops. + */ + Datum datum; + if (value_bytes <= 4) + { + datum = Int32GetDatum((uint32) value); + } + else + { + datum = Int64GetDatum(value); + } +#endif + slot->tts_values[i] = datum; + slot->tts_isnull[i] = !arrow_row_is_valid(validity, aslot->tuple_index); + } + } + + slot->tts_nvalid = natts; +} + +/* + */ +static Datum +tts_arrow_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull) +{ + Assert(!TTS_EMPTY(slot)); + + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot retrieve a system column in this context"))); + + return 0; /* silence compiler warnings */ +} + +static void +tts_arrow_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot) +{ + tts_arrow_clear(dstslot); + slot_getallattrs(srcslot); + + dstslot->tts_flags &= ~TTS_FLAG_EMPTY; + + /* make sure storage doesn't depend on external memory */ + tts_arrow_materialize(dstslot); +} + +static HeapTuple +tts_arrow_copy_heap_tuple(TupleTableSlot *slot) +{ + HeapTuple tuple; + + Assert(!TTS_EMPTY(slot)); + + tts_arrow_materialize(slot); + tuple = heap_form_tuple(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull); + ItemPointerCopy(&slot->tts_tid, &tuple->t_self); + + return tuple; +} + +static MinimalTuple +tts_arrow_copy_minimal_tuple(TupleTableSlot *slot) +{ + Assert(!TTS_EMPTY(slot)); + tts_arrow_materialize(slot); + + return heap_form_minimal_tuple(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull); +} + +const TupleTableSlotOps TTSOpsArrowTuple = { .base_slot_size = sizeof(ArrowTupleTableSlot), + .init = tts_arrow_init, + .release = tts_arrow_release, + .clear = tts_arrow_clear, + .getsomeattrs = tts_arrow_getsomeattrs, + .getsysattr = tts_arrow_getsysattr, + .materialize = tts_arrow_materialize, + .copyslot = tts_arrow_copyslot, + .get_heap_tuple = NULL, + .get_minimal_tuple = NULL, + .copy_heap_tuple = tts_arrow_copy_heap_tuple, + .copy_minimal_tuple = tts_arrow_copy_minimal_tuple }; diff --git a/tsl/src/compression/arrow_tts.h b/tsl/src/compression/arrow_tts.h new file mode 100644 index 000000000..2f28f1363 --- /dev/null +++ b/tsl/src/compression/arrow_tts.h @@ -0,0 +1,96 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#ifndef PG_ARROW_TUPTABLE_H +#define PG_ARROW_TUPTABLE_H + +#include +#include +#include +#include +#include + +#include "arrow_c_data_interface.h" +#include "compression/create.h" +#include "nodes/decompress_chunk/detoaster.h" + +typedef struct ArrowTupleTableSlot +{ + BufferHeapTupleTableSlot base; + TupleTableSlot *compressed_slot; + ArrowArray **arrow_columns; + uint16 tuple_index; /* Index of this particular tuple in the compressed (columnar data) tuple */ + MemoryContext decompression_mcxt; + Bitmapset *segmentby_columns; + char *data; + int16 *attrs_offset_map; +} ArrowTupleTableSlot; + +extern const TupleTableSlotOps TTSOpsArrowTuple; + +static inline int16 * +build_attribute_offset_map(const TupleDesc tupdesc, const TupleDesc ctupdesc, + AttrNumber *count_attno) +{ + int16 *attrs_offset_map = palloc0(sizeof(int16) * tupdesc->natts); + + for (int i = 0; i < tupdesc->natts; i++) + { + const Form_pg_attribute attr = TupleDescAttr(tupdesc, i); + + if (attr->attisdropped) + { + attrs_offset_map[i] = -1; + } + else + { + bool found = false; + + for (int j = 0; j < ctupdesc->natts; j++) + { + const Form_pg_attribute cattr = TupleDescAttr(ctupdesc, j); + + if (!cattr->attisdropped && + namestrcmp(&attr->attname, NameStr(cattr->attname)) == 0) + { + attrs_offset_map[i] = AttrNumberGetAttrOffset(cattr->attnum); + found = true; + break; + } + } + + Ensure(found, "missing attribute in compressed relation"); + } + } + + if (count_attno) + { + *count_attno = InvalidAttrNumber; + + /* Find the count column attno */ + for (int i = 0; i < ctupdesc->natts; i++) + { + const Form_pg_attribute cattr = TupleDescAttr(ctupdesc, i); + + if (namestrcmp(&cattr->attname, COMPRESSION_COLUMN_METADATA_COUNT_NAME) == 0) + { + *count_attno = cattr->attnum; + break; + } + } + + Assert(*count_attno != InvalidAttrNumber); + } + + return attrs_offset_map; +} + +extern TupleTableSlot *ExecStoreArrowTuple(TupleTableSlot *slot, TupleTableSlot *compressed_slot, + uint16 tuple_index); +extern TupleTableSlot *ExecStoreArrowTupleExisting(TupleTableSlot *slot, uint16 tuple_index); + +#define TTS_IS_ARROWTUPLE(slot) ((slot)->tts_ops == &TTSOpsArrowTuple) + +#endif /* PG_ARROW_TUPTABLE_H */ diff --git a/tsl/src/compression/compressionam_handler.c b/tsl/src/compression/compressionam_handler.c new file mode 100644 index 000000000..db218ad98 --- /dev/null +++ b/tsl/src/compression/compressionam_handler.c @@ -0,0 +1,1191 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "compression.h" +#include "compression/api.h" +#include "compression/arrow_tts.h" +#include "compressionam_handler.h" +#include "create.h" +#include "trigger.h" +#include "ts_catalog/array_utils.h" +#include "ts_catalog/catalog.h" +#include "ts_catalog/compression_settings.h" + +static const TableAmRoutine compressionam_methods; +static void compressionam_handler_end_conversion(Oid relid); + +static Oid +get_compressed_chunk_relid(Oid chunk_relid) +{ + Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true); + return ts_chunk_get_relid(chunk->fd.compressed_chunk_id, false); +} + +/* ------------------------------------------------------------------------ + * Slot related callbacks for compression AM + * ------------------------------------------------------------------------ + */ +static const TupleTableSlotOps * +compressionam_slot_callbacks(Relation relation) +{ + return &TTSOpsArrowTuple; +} + +#define FEATURE_NOT_SUPPORTED \ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("%s not supported", __func__))) + +#define FUNCTION_DOES_NOTHING \ + ereport(WARNING, \ + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \ + errmsg("%s does not do anything yet", __func__))) + +#define pgstat_count_compression_scan(rel) pgstat_count_heap_scan(rel) + +#define pgstat_count_compression_getnext(rel) pgstat_count_heap_getnext(rel) + +typedef struct CompressionInfoData +{ + CompressionSettings *settings; + int16 *column_offsets; + Bitmapset *segmentby_cols; + Bitmapset *orderby_cols; + int hypertable_id; + int num_columns; + int num_segmentby; + int num_orderby; + int num_keys; +} CompressionInfoData; + +static CompressionInfoData * +build_compression_info_data(Relation rel) +{ + TupleDesc tupdesc = RelationGetDescr(rel); + CompressionInfoData *cdata = palloc0(sizeof(CompressionInfoData)); + CompressionSettings *settings; + int32 hyper_id = ts_chunk_get_hypertable_id_by_reloid(rel->rd_id); + Oid hyper_relid = ts_hypertable_id_to_relid(hyper_id, false); + + settings = ts_compression_settings_get(hyper_relid); + Ensure(settings, + "no compression settings for relation %s", + get_rel_name(RelationGetRelid(rel))); + + cdata->num_columns = tupdesc->natts; + cdata->column_offsets = palloc0(sizeof(int) * tupdesc->natts); + cdata->hypertable_id = hyper_id; + cdata->settings = settings; + + for (int i = 0; i < cdata->num_columns; i++) + { + const Form_pg_attribute attr = &tupdesc->attrs[i]; + bool is_segmentby = ts_array_is_member(settings->fd.segmentby, NameStr(attr->attname)); + bool is_orderby = ts_array_is_member(settings->fd.orderby, NameStr(attr->attname)); + + cdata->column_offsets[i] = AttrNumberGetAttrOffset(attr->attnum); + + if (is_segmentby) + { + cdata->num_segmentby += 1; + cdata->segmentby_cols = bms_add_member(cdata->segmentby_cols, attr->attnum); + } + + if (is_orderby) + { + cdata->num_orderby += 1; + cdata->orderby_cols = bms_add_member(cdata->orderby_cols, attr->attnum); + } + + if (is_segmentby || is_orderby) + cdata->num_keys += 1; + } + + Assert(cdata->num_segmentby == ts_array_length(settings->fd.segmentby)); + Assert(cdata->num_orderby == ts_array_length(settings->fd.orderby)); + + return cdata; +} + +typedef struct CompressionScanDescData +{ + TableScanDescData rs_base; + TableScanDesc heap_scan; + Relation compressed_rel; + TupleTableSlot *compressed_slot; + TableScanDesc compressed_scan_desc; + uint16 compressed_tuple_index; + int64 returned_row_count; + int32 compressed_row_count; + AttrNumber count_colattno; + CompressionInfoData *cdata; + int16 *attrs_map; + bool compressed_read_done; +} CompressionScanDescData; + +typedef struct CompressionScanDescData *CompressionScanDesc; + +/* + * Initialization common for beginscan and rescan. + */ +static void +initscan(CompressionScanDesc scan, ScanKey key) +{ + if (key != NULL && scan->rs_base.rs_nkeys > 0) + memcpy(scan->rs_base.rs_key, key, scan->rs_base.rs_nkeys * sizeof(ScanKeyData)); + + if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN) + pgstat_count_compression_scan(scan->rs_base.rs_rd); +} + +static TableScanDesc +compressionam_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelTableScanDesc parallel_scan, uint32 flags) +{ + CompressionScanDesc scan; + const TableAmRoutine *heapam = GetHeapamTableAmRoutine(); + + RelationIncrementReferenceCount(relation); + + scan = palloc0(sizeof(CompressionScanDescData)); + scan->rs_base.rs_rd = relation; + scan->rs_base.rs_snapshot = snapshot; + scan->rs_base.rs_nkeys = nkeys; + scan->rs_base.rs_flags = flags; + scan->rs_base.rs_parallel = parallel_scan; + + Chunk *chunk = ts_chunk_get_by_relid(RelationGetRelid(relation), true); + Chunk *c_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true); + + scan->compressed_rel = table_open(c_chunk->table_id, AccessShareLock); + scan->compressed_tuple_index = 0; + scan->compressed_slot = table_slot_create(scan->compressed_rel, NULL); + scan->returned_row_count = 0; + scan->compressed_row_count = 0; + + TupleDesc tupdesc = RelationGetDescr(relation); + TupleDesc c_tupdesc = RelationGetDescr(scan->compressed_rel); + + scan->attrs_map = build_attribute_offset_map(tupdesc, c_tupdesc, &scan->count_colattno); + scan->cdata = build_compression_info_data(relation); + + if (nkeys > 0) + scan->rs_base.rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys); + else + scan->rs_base.rs_key = NULL; + + initscan(scan, key); + + if (flags & SO_TYPE_ANALYZE) + scan->compressed_scan_desc = table_beginscan_analyze(scan->compressed_rel); + else + scan->compressed_scan_desc = table_beginscan(scan->compressed_rel, snapshot, 0, NULL); + + scan->heap_scan = heapam->scan_begin(relation, snapshot, nkeys, key, parallel_scan, flags); + + return &scan->rs_base; +} + +static void +compressionam_rescan(TableScanDesc sscan, ScanKey key, bool set_params, bool allow_strat, + bool allow_sync, bool allow_pagemode) +{ + CompressionScanDesc scan = (CompressionScanDesc) sscan; + const TableAmRoutine *heapam = GetHeapamTableAmRoutine(); + + initscan(scan, key); + scan->compressed_tuple_index = 0; + table_rescan(scan->compressed_scan_desc, NULL); + heapam->scan_rescan(scan->heap_scan, key, set_params, allow_strat, allow_sync, allow_pagemode); +} + +static void +compressionam_endscan(TableScanDesc sscan) +{ + CompressionScanDesc scan = (CompressionScanDesc) sscan; + const TableAmRoutine *heapam = GetHeapamTableAmRoutine(); + + RelationDecrementReferenceCount(sscan->rs_rd); + ExecDropSingleTupleTableSlot(scan->compressed_slot); + table_endscan(scan->compressed_scan_desc); + table_close(scan->compressed_rel, AccessShareLock); + heapam->scan_end(scan->heap_scan); + + if (scan->rs_base.rs_key) + pfree(scan->rs_base.rs_key); + + pfree(scan->attrs_map); + pfree(scan); +} + +static bool +compressionam_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) +{ + CompressionScanDesc scan = (CompressionScanDesc) sscan; + + if (scan->compressed_read_done) + { + const TableAmRoutine *heapam = GetHeapamTableAmRoutine(); + return heapam->scan_getnextslot(scan->heap_scan, direction, slot); + } + + Assert(scan->compressed_tuple_index <= scan->compressed_row_count); + + if (TupIsNull(scan->compressed_slot) || + (scan->compressed_tuple_index + 1 == scan->compressed_row_count)) + { + if (!table_scan_getnextslot(scan->compressed_scan_desc, direction, scan->compressed_slot)) + { + ExecClearTuple(slot); + scan->compressed_read_done = true; + return false; + } + + bool isnull; + Datum count = slot_getattr(scan->compressed_slot, scan->count_colattno, &isnull); + Assert(!isnull); + scan->compressed_row_count = DatumGetInt32(count); + scan->compressed_tuple_index = 0; + ExecStoreArrowTuple(slot, scan->compressed_slot, scan->compressed_tuple_index); + } + else + { + scan->compressed_tuple_index++; + ExecStoreArrowTupleExisting(slot, scan->compressed_tuple_index); + } + + Assert(!TTS_EMPTY(scan->compressed_slot)); + pgstat_count_compression_getnext(sscan->rs_rd); + + return true; +} + +static void +compressionam_get_latest_tid(TableScanDesc sscan, ItemPointer tid) +{ + FEATURE_NOT_SUPPORTED; +} + +static void +compressionam_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, + int options, BulkInsertStateData *bistate) +{ + FEATURE_NOT_SUPPORTED; +} + +typedef struct IndexFetchComprData +{ + IndexFetchTableData h_base; /* AM independent part of the descriptor */ + IndexFetchTableData *compr_hscan; + Relation compr_rel; + TupleTableSlot *compressed_slot; + ItemPointerData tid; + int64 num_decompressions; +} IndexFetchComprData; + +/* ------------------------------------------------------------------------ + * Index Scan Callbacks for compression AM + * ------------------------------------------------------------------------ + */ +static IndexFetchTableData * +compressionam_index_fetch_begin(Relation rel) +{ + IndexFetchComprData *cscan = palloc0(sizeof(IndexFetchComprData)); + Oid cchunk_relid = get_compressed_chunk_relid(RelationGetRelid(rel)); + Relation crel = table_open(cchunk_relid, AccessShareLock); + + cscan->h_base.rel = rel; + cscan->compr_rel = crel; + cscan->compressed_slot = table_slot_create(crel, NULL); + cscan->compr_hscan = crel->rd_tableam->index_fetch_begin(crel); + ItemPointerSetInvalid(&cscan->tid); + + return &cscan->h_base; +} + +static void +compressionam_index_fetch_reset(IndexFetchTableData *scan) +{ + IndexFetchComprData *cscan = (IndexFetchComprData *) scan; + + ItemPointerSetInvalid(&cscan->tid); + cscan->compr_rel->rd_tableam->index_fetch_reset(cscan->compr_hscan); +} + +static void +compressionam_index_fetch_end(IndexFetchTableData *scan) +{ + IndexFetchComprData *cscan = (IndexFetchComprData *) scan; + Relation crel = cscan->compr_rel; + + crel->rd_tableam->index_fetch_end(cscan->compr_hscan); + ExecDropSingleTupleTableSlot(cscan->compressed_slot); + table_close(crel, AccessShareLock); + pfree(cscan); +} + +static bool +compressionam_index_fetch_tuple(struct IndexFetchTableData *scan, ItemPointer tid, + Snapshot snapshot, TupleTableSlot *slot, bool *call_again, + bool *all_dead) +{ + IndexFetchComprData *cscan = (IndexFetchComprData *) scan; + Relation crel = cscan->compr_rel; + ItemPointerData compr_tid; + + /* Read the compressed tuple index out of the TID */ + uint16 compr_index = tid->ip_blkid.bi_hi; + + /* Recreate the original TID for the compressed table */ + ItemPointerCopy(tid, &compr_tid); + compr_tid.ip_blkid.bi_hi = 0; + + /* + * Avoid decompression if the new TID from the index points to the same + * compressed tuple as the previous call to this function. + * + * There are cases, however, we're the index scan jumps between the same + * compressed tuples to get the right order, which will lead to + * decompressing the same compressed tuple multiple times. This happens, + * for example, when there's a segmentby column and orderby on + * time. Returning data in time order requires interleaving rows from two + * or more compressed tuples with different segmenby values. It is + * possible to optimize that case further by retaining a window/cache of + * decompressed tuples, keyed on TID. + */ + if (!TTS_EMPTY(slot) && ItemPointerEquals(&cscan->tid, &compr_tid)) + { + /* Still in the same compressed tuple, so just update tuple index and + * return the same Arrow slot */ + ExecStoreArrowTupleExisting(slot, compr_index); + ItemPointerCopy(tid, &slot->tts_tid); + return true; + } + + bool result = crel->rd_tableam->index_fetch_tuple(cscan->compr_hscan, + &compr_tid, + snapshot, + cscan->compressed_slot, + call_again, + all_dead); + + if (result) + { + ExecStoreArrowTuple(slot, cscan->compressed_slot, compr_index); + /* Save the current compressed TID */ + ItemPointerCopy(&compr_tid, &cscan->tid); + cscan->num_decompressions++; + } + + return result; +} + +/* ------------------------------------------------------------------------ + * Callbacks for non-modifying operations on individual tuples for compression AM + * ------------------------------------------------------------------------ + */ + +static bool +compressionam_fetch_row_version(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot) +{ + FEATURE_NOT_SUPPORTED; + return false; +} + +static bool +compressionam_tuple_tid_valid(TableScanDesc scan, ItemPointer tid) +{ + FEATURE_NOT_SUPPORTED; + return false; +} + +static bool +compressionam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, Snapshot snapshot) +{ + FEATURE_NOT_SUPPORTED; + return false; +} + +#if PG14_GE +static TransactionId +compressionam_index_delete_tuples(Relation rel, TM_IndexDeleteOp *delstate) +{ + return 0; +} +#endif + +/* ---------------------------------------------------------------------------- + * Functions for manipulations of physical tuples for compression AM. + * ---------------------------------------------------------------------------- + */ + +typedef struct ConversionState +{ + Oid relid; + Tuplesortstate *tuplesortstate; + CompressionInfoData *cdata; +} ConversionState; + +static ConversionState *conversionstate = NULL; + +static void +compressionam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, int options, + BulkInsertStateData *bistate) +{ + if (conversionstate) + tuplesort_puttupleslot(conversionstate->tuplesortstate, slot); + else + { + const TableAmRoutine *heapam = GetHeapamTableAmRoutine(); + heapam->tuple_insert(relation, slot, cid, options, bistate); + } +} + +static void +compressionam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, CommandId cid, + int options, BulkInsertStateData *bistate, uint32 specToken) +{ + FEATURE_NOT_SUPPORTED; +} + +static void +compressionam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, uint32 specToken, + bool succeeded) +{ + FEATURE_NOT_SUPPORTED; +} + +static TM_Result +compressionam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot snapshot, + Snapshot crosscheck, bool wait, TM_FailureData *tmfd, bool changingPart) +{ + FEATURE_NOT_SUPPORTED; + return TM_Ok; +} + +#if PG16_LT +typedef bool TU_UpdateIndexes; +#endif + +static TM_Result +compressionam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, CommandId cid, + Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, + LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes) +{ + FEATURE_NOT_SUPPORTED; + return TM_Ok; +} + +static TM_Result +compressionam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd) +{ + FEATURE_NOT_SUPPORTED; + return TM_Ok; +} + +static void +compressionam_finish_bulk_insert(Relation rel, int options) +{ + compressionam_handler_end_conversion(rel->rd_id); +} + +/* ------------------------------------------------------------------------ + * DDL related callbacks for compression AM. + * ------------------------------------------------------------------------ + */ + +#if PG16_LT +/* Account for API differences in pre-PG16 versions */ +typedef RelFileNode RelFileLocator; +#define relation_set_new_filelocator relation_set_new_filenode +#endif + +static void +compressionam_relation_set_new_filelocator(Relation rel, const RelFileLocator *newrlocator, + char persistence, TransactionId *freezeXid, + MultiXactId *minmulti) +{ + const TableAmRoutine *heapam = GetHeapamTableAmRoutine(); + + heapam->relation_set_new_filelocator(rel, newrlocator, persistence, freezeXid, minmulti); +} + +static void +compressionam_relation_nontransactional_truncate(Relation rel) +{ + RelationTruncate(rel, 0); +} + +static void +compressionam_relation_copy_data(Relation rel, const RelFileLocator *newrlocator) +{ + FEATURE_NOT_SUPPORTED; +} + +static void +compressionam_relation_copy_for_cluster(Relation OldCompression, Relation NewCompression, + Relation OldIndex, bool use_sort, TransactionId OldestXmin, + TransactionId *xid_cutoff, MultiXactId *multi_cutoff, + double *num_tuples, double *tups_vacuumed, + double *tups_recently_dead) +{ + FEATURE_NOT_SUPPORTED; +} + +static void +compressionam_vacuum_rel(Relation rel, VacuumParams *params, BufferAccessStrategy bstrategy) +{ + Oid cchunk_relid = get_compressed_chunk_relid(RelationGetRelid(rel)); + LOCKMODE lmode = + (params->options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock; + Relation crel = vacuum_open_relation(cchunk_relid, + NULL, + params->options, + params->log_min_duration >= 0, + lmode); + + pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM, RelationGetRelid(rel)); + + /* Vacuum the uncompressed relation */ + // const TableAmRoutine *heapam = GetHeapamTableAmRoutine(); + // heapam->relation_vacuum(rel, params, bstrategy); + + /* The compressed relation can be vacuumed too, but might not need it + * unless we do a lot of insert/deletes of compressed rows */ + crel->rd_tableam->relation_vacuum(crel, params, bstrategy); + table_close(crel, NoLock); +} + +static bool +compressionam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, + BufferAccessStrategy bstrategy) +{ + CompressionScanDescData *cscan = (CompressionScanDescData *) scan; + + return cscan->compressed_rel->rd_tableam->scan_analyze_next_block(cscan->compressed_scan_desc, + blockno, + bstrategy); +} + +/* + * Sample from the compressed chunk. + * + * TODO: this needs more work and it is not clear that this is the best way to + * analyze. + */ +static bool +compressionam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, + double *liverows, double *deadrows, TupleTableSlot *slot) +{ + CompressionScanDescData *cscan = (CompressionScanDescData *) scan; + + bool result = + cscan->compressed_rel->rd_tableam->scan_analyze_next_tuple(cscan->compressed_scan_desc, + OldestXmin, + liverows, + deadrows, + cscan->compressed_slot); + + if (result) + { + ExecStoreArrowTuple(slot, cscan->compressed_slot, 0); + } + else + { + ExecClearTuple(slot); + } + + return result; +} + +typedef struct IndexCallbackState +{ + IndexBuildCallback callback; + CompressionInfoData *cdata; + IndexInfo *index_info; + EState *estate; + void *orig_state; + int16 tuple_index; + double ntuples; + Datum *values; + bool *isnull; + MemoryContext decompression_mcxt; + ArrowArray **arrow_columns; + int16 *attrs_map; + AttrNumber count_cattno; +} IndexCallbackState; + +/* + * TODO: need to rerun filters on uncompressed tuples. + */ +static void +compression_index_build_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull, + bool tupleIsAlive, void *state) +{ + IndexCallbackState *icstate = state; + // bool checking_uniqueness = (callback_state->index_info->ii_Unique || + // callback_state->index_info->ii_ExclusionOps != NULL); + int num_rows = -1; + TupleDesc idesc = RelationGetDescr(index); + + for (int i = 0; i < icstate->index_info->ii_NumIndexAttrs; i++) + { + const AttrNumber attno = icstate->index_info->ii_IndexAttrNumbers[i]; + + if (bms_is_member(attno, icstate->cdata->segmentby_cols)) + { + // Segment by column, nothing to decompress. Just return the value + // from the compressed chunk since it is the same for every row in + // the compressed tuple. + int countoff = icstate->index_info->ii_NumIndexAttrs; + Assert(num_rows == -1 || num_rows == DatumGetInt32(values[countoff])); + num_rows = DatumGetInt32(values[countoff]); + } + else + { + if (isnull[i]) + { + // do nothing + } + else + { + const Form_pg_attribute attr = &idesc->attrs[i]; + const CompressedDataHeader *header = + (CompressedDataHeader *) PG_DETOAST_DATUM(values[i]); + DecompressAllFunction decompress_all = + tsl_get_decompress_all_function(header->compression_algorithm, attr->atttypid); + Assert(decompress_all != NULL); + MemoryContext oldcxt = MemoryContextSwitchTo(icstate->decompression_mcxt); + icstate->arrow_columns[i] = + decompress_all(PointerGetDatum(header), idesc->attrs[i].atttypid, oldcxt); + Assert(num_rows == -1 || num_rows == icstate->arrow_columns[i]->length); + num_rows = icstate->arrow_columns[i]->length; + MemoryContextReset(icstate->decompression_mcxt); + MemoryContextSwitchTo(oldcxt); + } + } + } + + Assert(num_rows > 0); + + for (int rownum = 0; rownum < num_rows; rownum++) + { + for (int colnum = 0; colnum < icstate->index_info->ii_NumIndexAttrs; colnum++) + { + const AttrNumber table_attno = icstate->index_info->ii_IndexAttrNumbers[colnum]; + + if (bms_is_member(table_attno, icstate->cdata->segmentby_cols)) + { + // Segment by column + } + else + { + const char *restrict arrow_values = icstate->arrow_columns[colnum]->buffers[1]; + const uint64 *restrict validity = icstate->arrow_columns[colnum]->buffers[0]; + int16 value_bytes = get_typlen(idesc->attrs[colnum].atttypid); + + /* + * The conversion of Datum to more narrow types will truncate + * the higher bytes, so we don't care if we read some garbage + * into them, and can always read 8 bytes. These are unaligned + * reads, so technically we have to do memcpy. + */ + uint64 value; + memcpy(&value, &arrow_values[value_bytes * rownum], 8); + +#ifdef USE_FLOAT8_BYVAL + Datum datum = Int64GetDatum(value); +#else + /* + * On 32-bit systems, the data larger than 4 bytes go by + * reference, so we have to jump through these hoops. + */ + Datum datum; + if (value_bytes <= 4) + { + datum = Int32GetDatum((uint32) value); + } + else + { + datum = Int64GetDatum(value); + } +#endif + values[colnum] = datum; + isnull[colnum] = !arrow_row_is_valid(validity, rownum); + } + } + + // Encode compressed tuple index (rownum) into high blocknumber. This + // is currently a hack and we should probably use less bits. + tid->ip_blkid.bi_hi = (uint16) rownum; + icstate->callback(index, tid, values, isnull, tupleIsAlive, icstate->orig_state); + } +} + +static double +compressionam_index_build_range_scan(Relation relation, Relation indexRelation, + IndexInfo *indexInfo, bool allow_sync, bool anyvisible, + bool progress, BlockNumber start_blockno, + BlockNumber numblocks, IndexBuildCallback callback, + void *callback_state, TableScanDesc scan) +{ + Oid cchunk_relid = get_compressed_chunk_relid(RelationGetRelid(relation)); + Relation crel = table_open(cchunk_relid, AccessShareLock); + AttrNumber count_cattno = InvalidAttrNumber; + IndexCallbackState icstate = { + .callback = callback, + .orig_state = callback_state, + .estate = CreateExecutorState(), + .index_info = indexInfo, + .tuple_index = -1, + .ntuples = 0, + .decompression_mcxt = AllocSetContextCreate(CurrentMemoryContext, + "bulk decompression", + /* minContextSize = */ 0, + /* initBlockSize = */ 64 * 1024, + /* maxBlockSize = */ 64 * 1024), + .cdata = build_compression_info_data(relation), + .attrs_map = build_attribute_offset_map(RelationGetDescr(relation), + RelationGetDescr(crel), + &count_cattno), + .arrow_columns = (ArrowArray **) palloc(sizeof(ArrowArray *) * indexInfo->ii_NumIndexAttrs), + }; + IndexInfo iinfo = *indexInfo; + + /* Translate index attribute numbers for the compressed relation */ + for (int i = 0; i < indexInfo->ii_NumIndexAttrs; i++) + { + const AttrNumber attno = indexInfo->ii_IndexAttrNumbers[i]; + const int16 cattoff = icstate.attrs_map[AttrNumberGetAttrOffset(attno)]; + const AttrNumber cattno = AttrOffsetGetAttrNumber(cattoff); + + iinfo.ii_IndexAttrNumbers[i] = cattno; + icstate.arrow_columns[i] = NULL; + } + + /* Include the count column in the callback. It is needed to know the + * uncompressed tuple count in case of building an index on the segmentby + * column. */ + iinfo.ii_IndexAttrNumbers[iinfo.ii_NumIndexAttrs++] = count_cattno; + + /* Check uniqueness on compressed */ + iinfo.ii_Unique = false; + iinfo.ii_ExclusionOps = NULL; + iinfo.ii_Predicate = NULL; + + /* TODO: special case for segmentby column */ + crel->rd_tableam->index_build_range_scan(crel, + indexRelation, + &iinfo, + allow_sync, + anyvisible, + progress, + start_blockno, + numblocks, + compression_index_build_callback, + &icstate, + scan); + + table_close(crel, NoLock); + FreeExecutorState(icstate.estate); + MemoryContextDelete(icstate.decompression_mcxt); + pfree(icstate.attrs_map); + pfree((void *) icstate.arrow_columns); + + return icstate.ntuples; +} + +static void +compressionam_index_validate_scan(Relation compressionRelation, Relation indexRelation, + IndexInfo *indexInfo, Snapshot snapshot, + ValidateIndexState *state) +{ + FEATURE_NOT_SUPPORTED; +} + +/* ------------------------------------------------------------------------ + * Miscellaneous callbacks for the compression AM + * ------------------------------------------------------------------------ + */ +static bool +compressionam_relation_needs_toast_table(Relation rel) +{ + return false; +} + +static Oid +compressionam_relation_toast_am(Relation rel) +{ + FEATURE_NOT_SUPPORTED; + return InvalidOid; +} + +/* ------------------------------------------------------------------------ + * Planner related callbacks for the compression AM + * ------------------------------------------------------------------------ + */ + +static uint64 +compressionam_relation_size(Relation rel, ForkNumber forkNumber) +{ + Oid cchunk_relid = get_compressed_chunk_relid(RelationGetRelid(rel)); + Relation crel = try_relation_open(cchunk_relid, AccessShareLock); + + if (crel == NULL) + return 0; + + uint64 size = table_block_relation_size(rel, forkNumber); + + size += crel->rd_tableam->relation_size(crel, forkNumber); + relation_close(crel, AccessShareLock); + return size; +} + +static void +compressionam_relation_estimate_size(Relation rel, int32 *attr_widths, BlockNumber *pages, + double *tuples, double *allvisfrac) +{ + Oid cchunk_relid = get_compressed_chunk_relid(RelationGetRelid(rel)); + + if (!OidIsValid(cchunk_relid)) + return; + + Relation crel = table_open(cchunk_relid, AccessShareLock); + + /* Cannot pass on attr_widths since they are cached widths for the + * non-compressed relation which also doesn't have the same number of + * attribute (columns). If we pass NULL it will use an estimate + * instead. */ + crel->rd_tableam->relation_estimate_size(crel, NULL, pages, tuples, allvisfrac); + + *tuples *= GLOBAL_MAX_ROWS_PER_COMPRESSION; + + // TODO: merge with uncompressed rel size + table_close(crel, AccessShareLock); +} + +static void +compressionam_fetch_toast_slice(Relation toastrel, Oid valueid, int32 attrsize, int32 sliceoffset, + int32 slicelength, struct varlena *result) +{ + FEATURE_NOT_SUPPORTED; +} + +/* ------------------------------------------------------------------------ + * Executor related callbacks for the compression AM + * ------------------------------------------------------------------------ + */ + +static bool +compressionam_scan_bitmap_next_block(TableScanDesc scan, TBMIterateResult *tbmres) +{ + FEATURE_NOT_SUPPORTED; + return false; +} + +static bool +compressionam_scan_bitmap_next_tuple(TableScanDesc scan, TBMIterateResult *tbmres, + TupleTableSlot *slot) +{ + FEATURE_NOT_SUPPORTED; + return false; +} + +static bool +compressionam_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate) +{ + FEATURE_NOT_SUPPORTED; + return false; +} + +static bool +compressionam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, + TupleTableSlot *slot) +{ + FEATURE_NOT_SUPPORTED; + return false; +} + +void +compressionam_handler_start_conversion(Oid relid) +{ + MemoryContext oldcxt = MemoryContextSwitchTo(CurTransactionContext); + ConversionState *state = palloc0(sizeof(ConversionState)); + Relation relation = table_open(relid, AccessShareLock); + TupleDesc tupdesc = RelationGetDescr(relation); + CompressionInfoData *cdata = build_compression_info_data(relation); + AttrNumber *sort_keys = palloc0(sizeof(*sort_keys) * cdata->num_keys); + Oid *sort_operators = palloc0(sizeof(*sort_operators) * cdata->num_keys); + Oid *sort_collations = palloc0(sizeof(*sort_collations) * cdata->num_keys); + bool *nulls_first = palloc0(sizeof(*nulls_first) * cdata->num_keys); + const CompressionSettings *settings = cdata->settings; + + Assert(cdata->num_segmentby == ts_array_length(settings->fd.segmentby)); + + state->relid = relid; + + for (int i = 0; i < tupdesc->natts; i++) + { + const Form_pg_attribute attr = &tupdesc->attrs[i]; + const char *attname = NameStr(attr->attname); + int segmentby_pos = ts_array_position(settings->fd.segmentby, attname); + int orderby_pos = ts_array_position(settings->fd.orderby, attname); + + if (segmentby_pos > 0 || orderby_pos > 0) + { + TypeCacheEntry *tentry; + int sort_index = -1; + Oid sort_op = InvalidOid; + + tentry = lookup_type_cache(attr->atttypid, TYPECACHE_LT_OPR | TYPECACHE_GT_OPR); + + if (segmentby_pos > 0) + { + sort_index = segmentby_pos - 1; + sort_op = tentry->lt_opr; + } + else if (orderby_pos > 0) + { + bool orderby_asc = + !ts_array_get_element_bool(settings->fd.orderby_desc, orderby_pos); + sort_index = cdata->num_segmentby + (orderby_pos - 1); + sort_op = orderby_asc ? tentry->lt_opr : tentry->gt_opr; + } + + if (!OidIsValid(sort_op)) + elog(ERROR, + "no valid sort operator for column \"%s\" of type \"%s\"", + attname, + format_type_be(attr->atttypid)); + + sort_keys[sort_index] = attr->attnum; + sort_operators[sort_index] = sort_op; + } + } + + state->cdata = cdata; + state->tuplesortstate = tuplesort_begin_heap(tupdesc, + cdata->num_keys, + sort_keys, + sort_operators, + sort_collations, + nulls_first, + maintenance_work_mem, + NULL, + false /*=randomAccess*/); + + relation_close(relation, AccessShareLock); + conversionstate = state; + MemoryContextSwitchTo(oldcxt); +} + +void +compressionam_handler_end_conversion(Oid relid) +{ + Chunk *chunk = ts_chunk_get_by_relid(conversionstate->relid, true); + Relation relation = table_open(conversionstate->relid, AccessShareLock); + TupleDesc tupdesc = RelationGetDescr(relation); + + if (!chunk) + elog(ERROR, "could not find uncompressed chunk for relation %s", get_rel_name(relid)); + + Hypertable *ht = ts_hypertable_get_by_id(chunk->fd.hypertable_id); + Hypertable *ht_compressed = ts_hypertable_get_by_id(ht->fd.compressed_hypertable_id); + Chunk *c_chunk = create_compress_chunk(ht_compressed, chunk, InvalidOid); + Relation compressed_rel = table_open(c_chunk->table_id, RowExclusiveLock); + RowCompressor row_compressor; + + tuplesort_performsort(conversionstate->tuplesortstate); + + row_compressor_init(conversionstate->cdata->settings, + &row_compressor, + relation, + compressed_rel, + RelationGetDescr(compressed_rel)->natts, + true /*need_bistate*/, + HEAP_INSERT_FROZEN); + + row_compressor_append_sorted_rows(&row_compressor, + conversionstate->tuplesortstate, + tupdesc, + compressed_rel); + + row_compressor_close(&row_compressor); + tuplesort_end(conversionstate->tuplesortstate); + + /* Update compression statistics */ + RelationSize before_size = ts_relation_size_impl(chunk->table_id); + RelationSize after_size = ts_relation_size_impl(c_chunk->table_id); + compression_chunk_size_catalog_insert(chunk->fd.id, + &before_size, + c_chunk->fd.id, + &after_size, + row_compressor.rowcnt_pre_compression, + row_compressor.num_compressed_rows, + row_compressor.num_compressed_rows); + + /* Copy chunk constraints (including fkey) to compressed chunk. + * Do this after compressing the chunk to avoid holding strong, unnecessary locks on the + * referenced table during compression. + */ + ts_chunk_constraints_create(ht_compressed, c_chunk); + ts_trigger_create_all_on_chunk(c_chunk); + ts_chunk_set_compressed_chunk(chunk, c_chunk->fd.id); + + table_close(relation, NoLock); + table_close(compressed_rel, NoLock); + conversionstate = NULL; +} + +/* ------------------------------------------------------------------------ + * Definition of the compression table access method. + * ------------------------------------------------------------------------ + */ + +static const TableAmRoutine compressionam_methods = { + .type = T_TableAmRoutine, + + .slot_callbacks = compressionam_slot_callbacks, + + .scan_begin = compressionam_beginscan, + .scan_end = compressionam_endscan, + .scan_rescan = compressionam_rescan, + .scan_getnextslot = compressionam_getnextslot, +#if PG14_GE + /*----------- + * Optional functions to provide scanning for ranges of ItemPointers. + * Implementations must either provide both of these functions, or neither + * of them. + */ + .scan_set_tidrange = NULL, + .scan_getnextslot_tidrange = NULL, +#endif + /* ------------------------------------------------------------------------ + * Parallel table scan related functions. + * ------------------------------------------------------------------------ + */ + .parallelscan_estimate = table_block_parallelscan_estimate, + .parallelscan_initialize = table_block_parallelscan_initialize, + .parallelscan_reinitialize = table_block_parallelscan_reinitialize, + + /* ------------------------------------------------------------------------ + * Index Scan Callbacks + * ------------------------------------------------------------------------ + */ + .index_fetch_begin = compressionam_index_fetch_begin, + .index_fetch_reset = compressionam_index_fetch_reset, + .index_fetch_end = compressionam_index_fetch_end, + .index_fetch_tuple = compressionam_index_fetch_tuple, + + /* ------------------------------------------------------------------------ + * Manipulations of physical tuples. + * ------------------------------------------------------------------------ + */ + .tuple_insert = compressionam_tuple_insert, + .tuple_insert_speculative = compressionam_tuple_insert_speculative, + .tuple_complete_speculative = compressionam_tuple_complete_speculative, + .multi_insert = compressionam_multi_insert, + .tuple_delete = compressionam_tuple_delete, + .tuple_update = compressionam_tuple_update, + .tuple_lock = compressionam_tuple_lock, + + .finish_bulk_insert = compressionam_finish_bulk_insert, + + /* ------------------------------------------------------------------------ + * Callbacks for non-modifying operations on individual tuples + * ------------------------------------------------------------------------ + */ + .tuple_fetch_row_version = compressionam_fetch_row_version, + + .tuple_get_latest_tid = compressionam_get_latest_tid, + .tuple_tid_valid = compressionam_tuple_tid_valid, + .tuple_satisfies_snapshot = compressionam_tuple_satisfies_snapshot, +#if PG14_GE + .index_delete_tuples = compressionam_index_delete_tuples, +#endif + +/* ------------------------------------------------------------------------ + * DDL related functionality. + * ------------------------------------------------------------------------ + */ +#if PG16_GE + .relation_set_new_filelocator = compressionam_relation_set_new_filelocator, +#else + .relation_set_new_filenode = compressionam_relation_set_new_filelocator, +#endif + .relation_nontransactional_truncate = compressionam_relation_nontransactional_truncate, + .relation_copy_data = compressionam_relation_copy_data, + .relation_copy_for_cluster = compressionam_relation_copy_for_cluster, + .relation_vacuum = compressionam_vacuum_rel, + .scan_analyze_next_block = compressionam_scan_analyze_next_block, + .scan_analyze_next_tuple = compressionam_scan_analyze_next_tuple, + .index_build_range_scan = compressionam_index_build_range_scan, + .index_validate_scan = compressionam_index_validate_scan, + + /* ------------------------------------------------------------------------ + * Miscellaneous functions. + * ------------------------------------------------------------------------ + */ + .relation_size = compressionam_relation_size, + .relation_needs_toast_table = compressionam_relation_needs_toast_table, + .relation_toast_am = compressionam_relation_toast_am, + .relation_fetch_toast_slice = compressionam_fetch_toast_slice, + + /* ------------------------------------------------------------------------ + * Planner related functions. + * ------------------------------------------------------------------------ + */ + .relation_estimate_size = compressionam_relation_estimate_size, + + /* ------------------------------------------------------------------------ + * Executor related functions. + * ------------------------------------------------------------------------ + */ + .scan_bitmap_next_block = compressionam_scan_bitmap_next_block, + .scan_bitmap_next_tuple = compressionam_scan_bitmap_next_tuple, + .scan_sample_next_block = compressionam_scan_sample_next_block, + .scan_sample_next_tuple = compressionam_scan_sample_next_tuple, +}; + +const TableAmRoutine * +compressionam_routine(void) +{ + return &compressionam_methods; +} + +Datum +compressionam_handler(PG_FUNCTION_ARGS) +{ + PG_RETURN_POINTER(&compressionam_methods); +} diff --git a/tsl/src/compression/compressionam_handler.h b/tsl/src/compression/compressionam_handler.h new file mode 100644 index 000000000..8eeee9175 --- /dev/null +++ b/tsl/src/compression/compressionam_handler.h @@ -0,0 +1,20 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#ifndef TIMESCALEDB_TSL_COMPRESSIONAM_HANDLER_H +#define TIMESCALEDB_TSL_COMPRESSIONAM_HANDLER_H + +#include +#include +#include +#include + +#include "hypertable.h" + +extern const TableAmRoutine *compressionam_routine(void); +extern void compressionam_handler_start_conversion(Oid relid); +extern Datum compressionam_handler(PG_FUNCTION_ARGS); + +#endif /* TIMESCALEDB_TSL_COMPRESSIONAM_HANDLER_H */ diff --git a/tsl/src/init.c b/tsl/src/init.c index 81acbe34a..c0a3ba56a 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -22,6 +22,7 @@ #include "compression/algorithms/gorilla.h" #include "compression/api.h" #include "compression/compression.h" +#include "compression/compressionam_handler.h" #include "compression/create.h" #include "compression/segment_meta.h" #include "config.h" @@ -160,7 +161,8 @@ CrossModuleFunctions tsl_cm_functions = { .decompress_chunk = tsl_decompress_chunk, .decompress_batches_for_insert = decompress_batches_for_insert, .decompress_target_segments = decompress_target_segments, - + .compressionam_handler = compressionam_handler, + .ddl_command_start = tsl_ddl_command_start, .show_chunk = chunk_show, .create_compressed_chunk = tsl_create_compressed_chunk, .create_chunk = chunk_create, diff --git a/tsl/src/planner.c b/tsl/src/planner.c index 578f876d7..f2325533b 100644 --- a/tsl/src/planner.c +++ b/tsl/src/planner.c @@ -14,13 +14,11 @@ #include #include "compat/compat.h" - #include "chunk.h" #include "chunkwise_agg.h" #include "continuous_aggs/planner.h" #include "guc.h" #include "hypertable.h" -#include "hypertable_cache.h" #include "nodes/decompress_chunk/decompress_chunk.h" #include "nodes/frozen_chunk_dml/frozen_chunk_dml.h" #include "nodes/gapfill/gapfill.h" @@ -150,7 +148,6 @@ tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeT else if (ts_is_hypercore_am(chunk->amoid)) { /* To be implemented */ - Assert(false); } } diff --git a/tsl/src/process_utility.c b/tsl/src/process_utility.c index dd662af95..d706a77f6 100644 --- a/tsl/src/process_utility.c +++ b/tsl/src/process_utility.c @@ -8,13 +8,54 @@ #include #include #include +#include +#include +#include +#include "compression/compressionam_handler.h" #include "compression/create.h" #include "continuous_aggs/create.h" #include "hypertable_cache.h" #include "process_utility.h" #include "ts_catalog/continuous_agg.h" +void +tsl_ddl_command_start(ProcessUtilityArgs *args) +{ + switch (nodeTag(args->parsetree)) + { + case T_AlterTableStmt: + { + AlterTableStmt *stmt = castNode(AlterTableStmt, args->parsetree); + ListCell *lc; + + foreach (lc, stmt->cmds) + { + AlterTableCmd *cmd = lfirst_node(AlterTableCmd, lc); + + switch (cmd->subtype) + { +#if PG15_GE + case AT_SetAccessMethod: + if (strcmp(cmd->name, "tscompression") == 0) + { + Oid relid = AlterTableLookupRelation(stmt, NoLock); + compressionam_handler_start_conversion(relid); + } + break; +#endif + default: + break; + } + } + + break; + } + default: + break; + } +} + /* AlterTableCmds that need tsl side processing invoke this function * we only process AddColumn command right now. */ diff --git a/tsl/src/process_utility.h b/tsl/src/process_utility.h index 6a8368892..1cb53295f 100644 --- a/tsl/src/process_utility.h +++ b/tsl/src/process_utility.h @@ -5,7 +5,10 @@ */ #pragma once +#include #include extern void tsl_process_altertable_cmd(Hypertable *ht, const AlterTableCmd *cmd); extern void tsl_process_rename_cmd(Oid relid, Cache *hcache, const RenameStmt *stmt); +extern void tsl_ddl_command_start(ProcessUtilityArgs *args); +extern void tsl_ddl_command_end(EventTriggerData *command); diff --git a/tsl/test/expected/compression_tam.out b/tsl/test/expected/compression_tam.out new file mode 100644 index 000000000..b367c0ac0 --- /dev/null +++ b/tsl/test/expected/compression_tam.out @@ -0,0 +1,55 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +CREATE TABLE readings(time timestamptz, location int, device int, temp float, humidity float); +SELECT create_hypertable('readings', 'time'); +NOTICE: adding not-null constraint to column "time" + create_hypertable +----------------------- + (1,public,readings,t) +(1 row) + +INSERT INTO readings (time, location, device, temp, humidity) +SELECT t, ceil(random()*10), ceil(random()*30), random()*40, random()*100 +FROM generate_series('2022-06-01'::timestamptz, '2022-07-01', '5s') t; +ALTER TABLE readings SET ( + timescaledb.compress, + timescaledb.compress_orderby = 'time', + timescaledb.compress_segmentby = 'device' +); +SET timescaledb.enable_transparent_decompression TO false; +SELECT format('%I.%I', chunk_schema, chunk_name)::regclass AS chunk + FROM timescaledb_information.chunks + WHERE format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'readings'::regclass + LIMIT 1 \gset +-- We do some basic checks that the compressed data is the same as the +-- uncompressed. In this case, we just count the rows for each device. +SELECT device, count(*) INTO orig FROM readings GROUP BY device; +-- We should be able to set the table access method for a chunk, which +-- will automatically compress the chunk. +ALTER TABLE :chunk SET ACCESS METHOD tscompression; +-- This should compress the chunk +SELECT chunk_name FROM chunk_compression_stats('readings') WHERE compression_status='Compressed'; + chunk_name +------------------ + _hyper_1_1_chunk +(1 row) + +-- Should give the same result as above +SELECT device, count(*) INTO comp FROM readings GROUP BY device; +-- Row counts for each device should match, so this should be empty. +SELECT device FROM orig JOIN comp USING (device) WHERE orig.count != comp.count; + device +-------- +(0 rows) + +-- We should be able to change it back to heap. +ALTER TABLE :chunk SET ACCESS METHOD heap; +-- Should give the same result as above +SELECT device, count(*) INTO decomp FROM readings GROUP BY device; +-- Row counts for each device should match, so this should be empty. +SELECT device FROM orig JOIN decomp USING (device) WHERE orig.count != decomp.count; + device +-------- +(0 rows) + diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index d7b414bf2..0d7db0344 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -206,6 +206,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text debug_waitpoint_enable(text) debug_waitpoint_id(text) debug_waitpoint_release(text) + ts_compressionam_handler(internal) ts_now_mock() add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval) add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text) diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 7c09db6f3..0dfe5b0eb 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -110,7 +110,7 @@ if((${PG_VERSION_MAJOR} GREATER_EQUAL "15")) list(APPEND TEST_FILES bgw_scheduler_control.sql) endif() list(APPEND TEST_FILES merge_compress.sql cagg_query_using_merge.sql - cagg_refresh_using_merge.sql) + cagg_refresh_using_merge.sql compression_tam.sql) endif() if((${PG_VERSION_MAJOR} GREATER_EQUAL "17")) diff --git a/tsl/test/sql/compression_tam.sql b/tsl/test/sql/compression_tam.sql new file mode 100644 index 000000000..c70f02a7d --- /dev/null +++ b/tsl/test/sql/compression_tam.sql @@ -0,0 +1,50 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +CREATE TABLE readings(time timestamptz, location int, device int, temp float, humidity float); + +SELECT create_hypertable('readings', 'time'); + +INSERT INTO readings (time, location, device, temp, humidity) +SELECT t, ceil(random()*10), ceil(random()*30), random()*40, random()*100 +FROM generate_series('2022-06-01'::timestamptz, '2022-07-01', '5s') t; + +ALTER TABLE readings SET ( + timescaledb.compress, + timescaledb.compress_orderby = 'time', + timescaledb.compress_segmentby = 'device' +); + +SET timescaledb.enable_transparent_decompression TO false; + +SELECT format('%I.%I', chunk_schema, chunk_name)::regclass AS chunk + FROM timescaledb_information.chunks + WHERE format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'readings'::regclass + LIMIT 1 \gset + +-- We do some basic checks that the compressed data is the same as the +-- uncompressed. In this case, we just count the rows for each device. +SELECT device, count(*) INTO orig FROM readings GROUP BY device; + +-- We should be able to set the table access method for a chunk, which +-- will automatically compress the chunk. +ALTER TABLE :chunk SET ACCESS METHOD tscompression; + +-- This should compress the chunk +SELECT chunk_name FROM chunk_compression_stats('readings') WHERE compression_status='Compressed'; + +-- Should give the same result as above +SELECT device, count(*) INTO comp FROM readings GROUP BY device; + +-- Row counts for each device should match, so this should be empty. +SELECT device FROM orig JOIN comp USING (device) WHERE orig.count != comp.count; + +-- We should be able to change it back to heap. +ALTER TABLE :chunk SET ACCESS METHOD heap; + +-- Should give the same result as above +SELECT device, count(*) INTO decomp FROM readings GROUP BY device; + +-- Row counts for each device should match, so this should be empty. +SELECT device FROM orig JOIN decomp USING (device) WHERE orig.count != decomp.count;