mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-25 15:50:27 +08:00
Add initial compression TAM
Implement the table-access method API around compression in order to have, among other things, seamless index support on compressed data. The current functionality is rudimentary but common operations work, including sequence scans.
This commit is contained in:
parent
7d7792c1f0
commit
cb8c756a1d
cmake
sql
src
test/pgtest
tsl
@ -27,6 +27,7 @@ set(PRE_INSTALL_FUNCTION_FILES
|
||||
set(SOURCE_FILES
|
||||
hypertable.sql
|
||||
chunk.sql
|
||||
compression.sql
|
||||
ddl_internal.sql
|
||||
util_time.sql
|
||||
util_internal_table_ddl.sql
|
||||
|
9
sql/compression.sql
Normal file
9
sql/compression.sql
Normal file
@ -0,0 +1,9 @@
|
||||
-- This file and its contents are licensed under the Apache License 2.0.
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-APACHE for a copy of the license.
|
||||
|
||||
CREATE FUNCTION ts_compressionam_handler(internal) RETURNS table_am_handler
|
||||
AS '@MODULE_PATHNAME@', 'ts_compressionam_handler' LANGUAGE C;
|
||||
|
||||
CREATE ACCESS METHOD tscompression TYPE TABLE HANDLER ts_compressionam_handler;
|
||||
COMMENT ON ACCESS METHOD tscompression IS 'TimescaleDB columnar compression';
|
@ -1 +1,3 @@
|
||||
|
||||
-- Hyperstore AM
|
||||
DROP ACCESS METHOD IF EXISTS tscompression;
|
||||
DROP FUNCTION IF EXISTS ts_compressionam_handler;
|
||||
|
@ -78,6 +78,7 @@ CROSSMODULE_WRAPPER(array_compressor_finish);
|
||||
CROSSMODULE_WRAPPER(create_compressed_chunk);
|
||||
CROSSMODULE_WRAPPER(compress_chunk);
|
||||
CROSSMODULE_WRAPPER(decompress_chunk);
|
||||
CROSSMODULE_WRAPPER(compressionam_handler);
|
||||
|
||||
/* continuous aggregate */
|
||||
CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger);
|
||||
@ -276,6 +277,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
|
||||
.set_rel_pathlist_dml = NULL,
|
||||
.set_rel_pathlist_query = NULL,
|
||||
.set_rel_pathlist = NULL,
|
||||
.ddl_command_start = NULL,
|
||||
.process_altertable_cmd = NULL,
|
||||
.process_rename_cmd = NULL,
|
||||
|
||||
@ -360,6 +362,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
|
||||
.dictionary_compressor_finish = error_no_default_fn_pg_community,
|
||||
.array_compressor_append = error_no_default_fn_pg_community,
|
||||
.array_compressor_finish = error_no_default_fn_pg_community,
|
||||
.compressionam_handler = error_no_default_fn_pg_community,
|
||||
|
||||
.show_chunk = error_no_default_fn_pg_community,
|
||||
.create_chunk = error_no_default_fn_pg_community,
|
||||
|
@ -89,6 +89,8 @@ typedef struct CrossModuleFunctions
|
||||
PGFunction reorder_chunk;
|
||||
PGFunction move_chunk;
|
||||
|
||||
void (*ddl_command_start)(ProcessUtilityArgs *args);
|
||||
|
||||
/* Vectorized queries */
|
||||
void (*tsl_postprocess_plan)(PlannedStmt *stmt);
|
||||
|
||||
@ -143,6 +145,7 @@ typedef struct CrossModuleFunctions
|
||||
PGFunction dictionary_compressor_finish;
|
||||
PGFunction array_compressor_append;
|
||||
PGFunction array_compressor_finish;
|
||||
PGFunction compressionam_handler;
|
||||
|
||||
PGFunction create_chunk;
|
||||
PGFunction show_chunk;
|
||||
|
@ -140,6 +140,10 @@ check_chunk_alter_table_operation_allowed(Oid relid, AlterTableStmt *stmt)
|
||||
case AT_SetTableSpace:
|
||||
case AT_ReAddStatistics:
|
||||
case AT_SetCompression:
|
||||
#if PG15_GE
|
||||
|
||||
case AT_SetAccessMethod:
|
||||
#endif
|
||||
/* allowed on chunks */
|
||||
break;
|
||||
case AT_AddConstraint:
|
||||
@ -4618,6 +4622,14 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool
|
||||
*/
|
||||
result = process_ddl_command_start(&args);
|
||||
|
||||
/*
|
||||
* We need to run tsl-side ddl_command_start hook before
|
||||
* standard process utility hook to maintain proper invocation
|
||||
* order of sql_drop and ddl_command_end triggers.
|
||||
*/
|
||||
if (ts_cm_functions->ddl_command_start)
|
||||
ts_cm_functions->ddl_command_start(&args);
|
||||
|
||||
/*
|
||||
* We need to run tsl-side ddl_command_start hook before
|
||||
* standard process utility hook to maintain proper invocation
|
||||
|
19
src/utils.c
19
src/utils.c
@ -1802,24 +1802,11 @@ static Oid hypercore_amoid = InvalidOid;
|
||||
bool
|
||||
ts_is_hypercore_am(Oid amoid)
|
||||
{
|
||||
/* Can't use InvalidOid as an indication of non-cached value since
|
||||
get_am_oid() will return InvalidOid when the access method does not
|
||||
exist and we will do the lookup every time this query is called. This
|
||||
boolean can be removed once we know that there should exist an access
|
||||
method with the given name. */
|
||||
static bool iscached = false;
|
||||
|
||||
if (!iscached && !OidIsValid(hypercore_amoid))
|
||||
{
|
||||
hypercore_amoid = get_am_oid("hypercore", true);
|
||||
iscached = true;
|
||||
}
|
||||
|
||||
if (!OidIsValid(hypercore_amoid))
|
||||
return false;
|
||||
hypercore_amoid = get_table_am_oid("tscompression", true);
|
||||
|
||||
/* Shouldn't get here for now */
|
||||
Assert(false);
|
||||
if (!OidIsValid(amoid) || !OidIsValid(hypercore_amoid))
|
||||
return false;
|
||||
|
||||
return amoid == hypercore_amoid;
|
||||
}
|
||||
|
@ -20,7 +20,14 @@ file(READ ${PG_REGRESS_DIR}/parallel_schedule PG_TEST_SCHEDULE)
|
||||
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/testtablespace)
|
||||
|
||||
# Tests to ignore
|
||||
set(PG_IGNORE_TESTS database event_trigger opr_sanity sanity_check type_sanity)
|
||||
set(PG_IGNORE_TESTS
|
||||
database
|
||||
event_trigger
|
||||
opr_sanity
|
||||
sanity_check
|
||||
type_sanity
|
||||
create_am
|
||||
psql)
|
||||
|
||||
# Modify the test schedule to ignore some tests
|
||||
foreach(IGNORE_TEST ${PG_IGNORE_TESTS})
|
||||
|
@ -1,9 +1,11 @@
|
||||
set(SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/api.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/arrow_tts.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/compression.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/compression_dml.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/compression_scankey.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/compression_storage.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/compressionam_handler.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/create.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/segment_meta.c)
|
||||
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
|
||||
|
@ -71,7 +71,7 @@ create_dummy_query()
|
||||
return (Node *) query;
|
||||
}
|
||||
|
||||
static void
|
||||
void
|
||||
compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *src_size,
|
||||
int32 compress_chunk_id, const RelationSize *compress_size,
|
||||
int64 rowcnt_pre_compression, int64 rowcnt_post_compression,
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
#include <postgres.h>
|
||||
#include <fmgr.h>
|
||||
#include <utils.h>
|
||||
|
||||
#include "chunk.h"
|
||||
|
||||
@ -18,3 +19,10 @@ extern Datum tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS);
|
||||
|
||||
extern Datum tsl_get_compressed_chunk_index_for_recompression(
|
||||
PG_FUNCTION_ARGS); // arg is oid of uncompressed chunk
|
||||
|
||||
extern void compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *src_size,
|
||||
int32 compress_chunk_id,
|
||||
const RelationSize *compress_size,
|
||||
int64 rowcnt_pre_compression,
|
||||
int64 rowcnt_post_compression,
|
||||
int64 rowcnt_frozen);
|
||||
|
380
tsl/src/compression/arrow_tts.c
Normal file
380
tsl/src/compression/arrow_tts.c
Normal file
@ -0,0 +1,380 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Timescale License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
#include <postgres.h>
|
||||
#include <access/attnum.h>
|
||||
#include <catalog/pg_attribute.h>
|
||||
#include <executor/tuptable.h>
|
||||
#include <utils/expandeddatum.h>
|
||||
|
||||
#include "arrow_tts.h"
|
||||
#include "compression.h"
|
||||
#include "custom_type_cache.h"
|
||||
#include "utils/palloc.h"
|
||||
|
||||
/*
|
||||
* Get a map of attribute offsets that maps non-compressed offsets to
|
||||
* compressed offsets.
|
||||
*
|
||||
* The map is needed since the compressed relation has additional metadata
|
||||
* columns that the non-compressed relation doesn't have. When adding new
|
||||
* columns, the new column's attribute number will be higher on the compressed
|
||||
* relation compared to the regular one.
|
||||
*/
|
||||
static const int16 *
|
||||
arrow_slot_get_attribute_offset_map(TupleTableSlot *slot)
|
||||
{
|
||||
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
|
||||
const TupleDesc tupdesc = slot->tts_tupleDescriptor;
|
||||
const TupleDesc ctupdesc = aslot->compressed_slot->tts_tupleDescriptor;
|
||||
|
||||
if (NULL == aslot->attrs_offset_map)
|
||||
{
|
||||
MemoryContext oldmcxt = MemoryContextSwitchTo(slot->tts_mcxt);
|
||||
aslot->attrs_offset_map = build_attribute_offset_map(tupdesc, ctupdesc, NULL);
|
||||
MemoryContextSwitchTo(oldmcxt);
|
||||
}
|
||||
|
||||
return aslot->attrs_offset_map;
|
||||
}
|
||||
|
||||
static void
|
||||
tts_arrow_init(TupleTableSlot *slot)
|
||||
{
|
||||
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
|
||||
aslot->arrow_columns = NULL;
|
||||
aslot->compressed_slot = NULL;
|
||||
aslot->segmentby_columns = NULL;
|
||||
aslot->decompression_mcxt = AllocSetContextCreate(slot->tts_mcxt,
|
||||
"bulk decompression",
|
||||
/* minContextSize = */ 0,
|
||||
/* initBlockSize = */ 64 * 1024,
|
||||
/* maxBlockSize = */ 64 * 1024);
|
||||
}
|
||||
|
||||
static void
|
||||
tts_arrow_release(TupleTableSlot *slot)
|
||||
{
|
||||
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
|
||||
|
||||
if (NULL != aslot->arrow_columns)
|
||||
{
|
||||
aslot->arrow_columns = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
tts_arrow_clear(TupleTableSlot *slot)
|
||||
{
|
||||
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
|
||||
|
||||
if (unlikely(TTS_SHOULDFREE(slot)))
|
||||
{
|
||||
/* The tuple is materialized, so free materialized memory */
|
||||
}
|
||||
|
||||
if (aslot->arrow_columns)
|
||||
{
|
||||
for (int i = 0; i < slot->tts_tupleDescriptor->natts; i++)
|
||||
{
|
||||
ArrowArray *arr = aslot->arrow_columns[i];
|
||||
|
||||
if (arr)
|
||||
{
|
||||
pfree(arr);
|
||||
}
|
||||
|
||||
aslot->arrow_columns[i] = NULL;
|
||||
}
|
||||
|
||||
pfree(aslot->arrow_columns);
|
||||
aslot->arrow_columns = NULL;
|
||||
}
|
||||
|
||||
aslot->compressed_slot = NULL;
|
||||
|
||||
slot->tts_nvalid = 0;
|
||||
slot->tts_flags |= TTS_FLAG_EMPTY;
|
||||
ItemPointerSetInvalid(&slot->tts_tid);
|
||||
}
|
||||
|
||||
static inline void
|
||||
tts_arrow_store_tuple(TupleTableSlot *slot, TupleTableSlot *compressed_slot, uint16 tuple_index)
|
||||
{
|
||||
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
|
||||
|
||||
slot->tts_flags &= ~TTS_FLAG_EMPTY;
|
||||
aslot->compressed_slot = compressed_slot;
|
||||
aslot->tuple_index = tuple_index;
|
||||
ItemPointerCopy(&compressed_slot->tts_tid, &slot->tts_tid);
|
||||
slot->tts_tid.ip_blkid.bi_hi = tuple_index;
|
||||
Assert(!TTS_EMPTY(aslot->compressed_slot));
|
||||
}
|
||||
|
||||
TupleTableSlot *
|
||||
ExecStoreArrowTuple(TupleTableSlot *slot, TupleTableSlot *compressed_slot, uint16 tuple_index)
|
||||
{
|
||||
Assert(slot != NULL);
|
||||
Assert(slot->tts_tupleDescriptor != NULL);
|
||||
Assert(!TTS_EMPTY(compressed_slot));
|
||||
|
||||
if (unlikely(!TTS_IS_ARROWTUPLE(slot)))
|
||||
elog(ERROR, "trying to store an on-disk parquet tuple into wrong type of slot");
|
||||
|
||||
ExecClearTuple(slot);
|
||||
tts_arrow_store_tuple(slot, compressed_slot, tuple_index);
|
||||
|
||||
Assert(!TTS_EMPTY(slot));
|
||||
Assert(!TTS_SHOULDFREE(slot));
|
||||
|
||||
return slot;
|
||||
}
|
||||
|
||||
TupleTableSlot *
|
||||
ExecStoreArrowTupleExisting(TupleTableSlot *slot, uint16 tuple_index)
|
||||
{
|
||||
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
|
||||
|
||||
Assert(!TTS_EMPTY(slot));
|
||||
aslot->tuple_index = tuple_index;
|
||||
slot->tts_tid.ip_blkid.bi_hi = aslot->tuple_index;
|
||||
slot->tts_nvalid = 0;
|
||||
|
||||
return slot;
|
||||
}
|
||||
|
||||
/*
|
||||
* Materialize an Arrow slot.
|
||||
*/
|
||||
static void
|
||||
tts_arrow_materialize(TupleTableSlot *slot)
|
||||
{
|
||||
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
|
||||
|
||||
Assert(!TTS_EMPTY(slot));
|
||||
|
||||
TTSOpsBufferHeapTuple.materialize(slot);
|
||||
|
||||
if (aslot->compressed_slot)
|
||||
ExecMaterializeSlot(aslot->compressed_slot);
|
||||
}
|
||||
|
||||
static bool
|
||||
is_compressed_col(const TupleDesc tupdesc, AttrNumber attno)
|
||||
{
|
||||
static CustomTypeInfo *typinfo = NULL;
|
||||
Oid coltypid = tupdesc->attrs[AttrNumberGetAttrOffset(attno)].atttypid;
|
||||
|
||||
if (typinfo == NULL)
|
||||
typinfo = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA);
|
||||
|
||||
return coltypid == typinfo->type_oid;
|
||||
}
|
||||
|
||||
static void
|
||||
tts_arrow_getsomeattrs(TupleTableSlot *slot, int natts)
|
||||
{
|
||||
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
|
||||
const TupleDesc tupdesc = slot->tts_tupleDescriptor;
|
||||
const TupleDesc compressed_tupdesc = aslot->compressed_slot->tts_tupleDescriptor;
|
||||
const int16 *attrs_map;
|
||||
int16 cnatts = -1;
|
||||
|
||||
if (natts < 1 || natts > slot->tts_tupleDescriptor->natts)
|
||||
elog(ERROR, "invalid number of attributes requested");
|
||||
|
||||
attrs_map = arrow_slot_get_attribute_offset_map(slot);
|
||||
|
||||
/* Find highest attribute number/offset in compressed relation */
|
||||
for (int i = 0; i < natts; i++)
|
||||
{
|
||||
int16 coff = attrs_map[AttrNumberGetAttrOffset(natts)];
|
||||
|
||||
if (AttrOffsetGetAttrNumber(coff) > cnatts)
|
||||
cnatts = AttrOffsetGetAttrNumber(coff);
|
||||
}
|
||||
|
||||
Assert(cnatts > 0);
|
||||
|
||||
slot_getsomeattrs(aslot->compressed_slot, cnatts);
|
||||
|
||||
if (NULL == aslot->arrow_columns)
|
||||
{
|
||||
aslot->arrow_columns =
|
||||
MemoryContextAllocZero(slot->tts_mcxt,
|
||||
sizeof(ArrowArray *) * slot->tts_tupleDescriptor->natts);
|
||||
}
|
||||
|
||||
for (int i = 0; i < natts; i++)
|
||||
{
|
||||
const int16 cattoff = attrs_map[i];
|
||||
const AttrNumber attno = AttrOffsetGetAttrNumber(i);
|
||||
const AttrNumber cattno = AttrOffsetGetAttrNumber(cattoff);
|
||||
|
||||
/* Decompress the column if not already done. */
|
||||
if (aslot->arrow_columns[i] == NULL)
|
||||
{
|
||||
if (is_compressed_col(compressed_tupdesc, cattno))
|
||||
{
|
||||
bool isnull;
|
||||
Datum value = slot_getattr(aslot->compressed_slot, cattno, &isnull);
|
||||
|
||||
if (isnull)
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
else
|
||||
{
|
||||
const Form_pg_attribute attr = &tupdesc->attrs[i];
|
||||
const CompressedDataHeader *header =
|
||||
(CompressedDataHeader *) PG_DETOAST_DATUM(value);
|
||||
DecompressAllFunction decompress_all =
|
||||
tsl_get_decompress_all_function(header->compression_algorithm,
|
||||
attr->atttypid);
|
||||
Assert(decompress_all != NULL);
|
||||
MemoryContext oldcxt = MemoryContextSwitchTo(aslot->decompression_mcxt);
|
||||
aslot->arrow_columns[i] =
|
||||
decompress_all(PointerGetDatum(header),
|
||||
slot->tts_tupleDescriptor->attrs[i].atttypid,
|
||||
slot->tts_mcxt);
|
||||
MemoryContextReset(aslot->decompression_mcxt);
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Since we are looping over the attributes of the
|
||||
* non-compressed slot, we will either see only compressed
|
||||
* columns or the segment-by column. If the column is not
|
||||
* compressed, it must be the segment-by columns. The
|
||||
* segment-by column is not compressed and the value is the
|
||||
* same for all rows in the compressed tuple. */
|
||||
aslot->arrow_columns[i] = NULL;
|
||||
slot->tts_values[i] =
|
||||
slot_getattr(aslot->compressed_slot, cattno, &slot->tts_isnull[i]);
|
||||
|
||||
/* Remember the segment-by column */
|
||||
MemoryContext oldcxt = MemoryContextSwitchTo(slot->tts_mcxt);
|
||||
aslot->segmentby_columns = bms_add_member(aslot->segmentby_columns, attno);
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
}
|
||||
}
|
||||
|
||||
/* At this point the column should be decompressed, if it is a
|
||||
* compressed column. */
|
||||
if (bms_is_member(attno, aslot->segmentby_columns))
|
||||
{
|
||||
/* Segment-by column. Value already set. */
|
||||
}
|
||||
else if (aslot->arrow_columns[i] == NULL)
|
||||
{
|
||||
/* Since the column is not the segment-by column, and there is no
|
||||
* decompressed data, the column must be NULL. Use the default
|
||||
* value. */
|
||||
slot->tts_values[i] = getmissingattr(slot->tts_tupleDescriptor,
|
||||
AttrOffsetGetAttrNumber(i),
|
||||
&slot->tts_isnull[i]);
|
||||
}
|
||||
else
|
||||
{
|
||||
const char *restrict values = aslot->arrow_columns[i]->buffers[1];
|
||||
const uint64 *restrict validity = aslot->arrow_columns[i]->buffers[0];
|
||||
int16 value_bytes = get_typlen(slot->tts_tupleDescriptor->attrs[i].atttypid);
|
||||
|
||||
/*
|
||||
* The conversion of Datum to more narrow types will truncate
|
||||
* the higher bytes, so we don't care if we read some garbage
|
||||
* into them, and can always read 8 bytes. These are unaligned
|
||||
* reads, so technically we have to do memcpy.
|
||||
*/
|
||||
uint64 value;
|
||||
memcpy(&value, &values[value_bytes * aslot->tuple_index], 8);
|
||||
|
||||
#ifdef USE_FLOAT8_BYVAL
|
||||
Datum datum = Int64GetDatum(value);
|
||||
#else
|
||||
/*
|
||||
* On 32-bit systems, the data larger than 4 bytes go by
|
||||
* reference, so we have to jump through these hoops.
|
||||
*/
|
||||
Datum datum;
|
||||
if (value_bytes <= 4)
|
||||
{
|
||||
datum = Int32GetDatum((uint32) value);
|
||||
}
|
||||
else
|
||||
{
|
||||
datum = Int64GetDatum(value);
|
||||
}
|
||||
#endif
|
||||
slot->tts_values[i] = datum;
|
||||
slot->tts_isnull[i] = !arrow_row_is_valid(validity, aslot->tuple_index);
|
||||
}
|
||||
}
|
||||
|
||||
slot->tts_nvalid = natts;
|
||||
}
|
||||
|
||||
/*
|
||||
*/
|
||||
static Datum
|
||||
tts_arrow_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull)
|
||||
{
|
||||
Assert(!TTS_EMPTY(slot));
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot retrieve a system column in this context")));
|
||||
|
||||
return 0; /* silence compiler warnings */
|
||||
}
|
||||
|
||||
static void
|
||||
tts_arrow_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
|
||||
{
|
||||
tts_arrow_clear(dstslot);
|
||||
slot_getallattrs(srcslot);
|
||||
|
||||
dstslot->tts_flags &= ~TTS_FLAG_EMPTY;
|
||||
|
||||
/* make sure storage doesn't depend on external memory */
|
||||
tts_arrow_materialize(dstslot);
|
||||
}
|
||||
|
||||
static HeapTuple
|
||||
tts_arrow_copy_heap_tuple(TupleTableSlot *slot)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
|
||||
Assert(!TTS_EMPTY(slot));
|
||||
|
||||
tts_arrow_materialize(slot);
|
||||
tuple = heap_form_tuple(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull);
|
||||
ItemPointerCopy(&slot->tts_tid, &tuple->t_self);
|
||||
|
||||
return tuple;
|
||||
}
|
||||
|
||||
static MinimalTuple
|
||||
tts_arrow_copy_minimal_tuple(TupleTableSlot *slot)
|
||||
{
|
||||
Assert(!TTS_EMPTY(slot));
|
||||
tts_arrow_materialize(slot);
|
||||
|
||||
return heap_form_minimal_tuple(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull);
|
||||
}
|
||||
|
||||
const TupleTableSlotOps TTSOpsArrowTuple = { .base_slot_size = sizeof(ArrowTupleTableSlot),
|
||||
.init = tts_arrow_init,
|
||||
.release = tts_arrow_release,
|
||||
.clear = tts_arrow_clear,
|
||||
.getsomeattrs = tts_arrow_getsomeattrs,
|
||||
.getsysattr = tts_arrow_getsysattr,
|
||||
.materialize = tts_arrow_materialize,
|
||||
.copyslot = tts_arrow_copyslot,
|
||||
.get_heap_tuple = NULL,
|
||||
.get_minimal_tuple = NULL,
|
||||
.copy_heap_tuple = tts_arrow_copy_heap_tuple,
|
||||
.copy_minimal_tuple = tts_arrow_copy_minimal_tuple };
|
96
tsl/src/compression/arrow_tts.h
Normal file
96
tsl/src/compression/arrow_tts.h
Normal file
@ -0,0 +1,96 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Timescale License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
#ifndef PG_ARROW_TUPTABLE_H
|
||||
#define PG_ARROW_TUPTABLE_H
|
||||
|
||||
#include <postgres.h>
|
||||
#include <access/attnum.h>
|
||||
#include <executor/tuptable.h>
|
||||
#include <nodes/bitmapset.h>
|
||||
#include <utils/builtins.h>
|
||||
|
||||
#include "arrow_c_data_interface.h"
|
||||
#include "compression/create.h"
|
||||
#include "nodes/decompress_chunk/detoaster.h"
|
||||
|
||||
typedef struct ArrowTupleTableSlot
|
||||
{
|
||||
BufferHeapTupleTableSlot base;
|
||||
TupleTableSlot *compressed_slot;
|
||||
ArrowArray **arrow_columns;
|
||||
uint16 tuple_index; /* Index of this particular tuple in the compressed (columnar data) tuple */
|
||||
MemoryContext decompression_mcxt;
|
||||
Bitmapset *segmentby_columns;
|
||||
char *data;
|
||||
int16 *attrs_offset_map;
|
||||
} ArrowTupleTableSlot;
|
||||
|
||||
extern const TupleTableSlotOps TTSOpsArrowTuple;
|
||||
|
||||
static inline int16 *
|
||||
build_attribute_offset_map(const TupleDesc tupdesc, const TupleDesc ctupdesc,
|
||||
AttrNumber *count_attno)
|
||||
{
|
||||
int16 *attrs_offset_map = palloc0(sizeof(int16) * tupdesc->natts);
|
||||
|
||||
for (int i = 0; i < tupdesc->natts; i++)
|
||||
{
|
||||
const Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
|
||||
|
||||
if (attr->attisdropped)
|
||||
{
|
||||
attrs_offset_map[i] = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
bool found = false;
|
||||
|
||||
for (int j = 0; j < ctupdesc->natts; j++)
|
||||
{
|
||||
const Form_pg_attribute cattr = TupleDescAttr(ctupdesc, j);
|
||||
|
||||
if (!cattr->attisdropped &&
|
||||
namestrcmp(&attr->attname, NameStr(cattr->attname)) == 0)
|
||||
{
|
||||
attrs_offset_map[i] = AttrNumberGetAttrOffset(cattr->attnum);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ensure(found, "missing attribute in compressed relation");
|
||||
}
|
||||
}
|
||||
|
||||
if (count_attno)
|
||||
{
|
||||
*count_attno = InvalidAttrNumber;
|
||||
|
||||
/* Find the count column attno */
|
||||
for (int i = 0; i < ctupdesc->natts; i++)
|
||||
{
|
||||
const Form_pg_attribute cattr = TupleDescAttr(ctupdesc, i);
|
||||
|
||||
if (namestrcmp(&cattr->attname, COMPRESSION_COLUMN_METADATA_COUNT_NAME) == 0)
|
||||
{
|
||||
*count_attno = cattr->attnum;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assert(*count_attno != InvalidAttrNumber);
|
||||
}
|
||||
|
||||
return attrs_offset_map;
|
||||
}
|
||||
|
||||
extern TupleTableSlot *ExecStoreArrowTuple(TupleTableSlot *slot, TupleTableSlot *compressed_slot,
|
||||
uint16 tuple_index);
|
||||
extern TupleTableSlot *ExecStoreArrowTupleExisting(TupleTableSlot *slot, uint16 tuple_index);
|
||||
|
||||
#define TTS_IS_ARROWTUPLE(slot) ((slot)->tts_ops == &TTSOpsArrowTuple)
|
||||
|
||||
#endif /* PG_ARROW_TUPTABLE_H */
|
1191
tsl/src/compression/compressionam_handler.c
Normal file
1191
tsl/src/compression/compressionam_handler.c
Normal file
File diff suppressed because it is too large
Load Diff
20
tsl/src/compression/compressionam_handler.h
Normal file
20
tsl/src/compression/compressionam_handler.h
Normal file
@ -0,0 +1,20 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Timescale License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
#ifndef TIMESCALEDB_TSL_COMPRESSIONAM_HANDLER_H
|
||||
#define TIMESCALEDB_TSL_COMPRESSIONAM_HANDLER_H
|
||||
|
||||
#include <postgres.h>
|
||||
#include <access/tableam.h>
|
||||
#include <fmgr.h>
|
||||
#include <nodes/pathnodes.h>
|
||||
|
||||
#include "hypertable.h"
|
||||
|
||||
extern const TableAmRoutine *compressionam_routine(void);
|
||||
extern void compressionam_handler_start_conversion(Oid relid);
|
||||
extern Datum compressionam_handler(PG_FUNCTION_ARGS);
|
||||
|
||||
#endif /* TIMESCALEDB_TSL_COMPRESSIONAM_HANDLER_H */
|
@ -22,6 +22,7 @@
|
||||
#include "compression/algorithms/gorilla.h"
|
||||
#include "compression/api.h"
|
||||
#include "compression/compression.h"
|
||||
#include "compression/compressionam_handler.h"
|
||||
#include "compression/create.h"
|
||||
#include "compression/segment_meta.h"
|
||||
#include "config.h"
|
||||
@ -160,7 +161,8 @@ CrossModuleFunctions tsl_cm_functions = {
|
||||
.decompress_chunk = tsl_decompress_chunk,
|
||||
.decompress_batches_for_insert = decompress_batches_for_insert,
|
||||
.decompress_target_segments = decompress_target_segments,
|
||||
|
||||
.compressionam_handler = compressionam_handler,
|
||||
.ddl_command_start = tsl_ddl_command_start,
|
||||
.show_chunk = chunk_show,
|
||||
.create_compressed_chunk = tsl_create_compressed_chunk,
|
||||
.create_chunk = chunk_create,
|
||||
|
@ -14,13 +14,11 @@
|
||||
#include <parser/parsetree.h>
|
||||
|
||||
#include "compat/compat.h"
|
||||
|
||||
#include "chunk.h"
|
||||
#include "chunkwise_agg.h"
|
||||
#include "continuous_aggs/planner.h"
|
||||
#include "guc.h"
|
||||
#include "hypertable.h"
|
||||
#include "hypertable_cache.h"
|
||||
#include "nodes/decompress_chunk/decompress_chunk.h"
|
||||
#include "nodes/frozen_chunk_dml/frozen_chunk_dml.h"
|
||||
#include "nodes/gapfill/gapfill.h"
|
||||
@ -150,7 +148,6 @@ tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeT
|
||||
else if (ts_is_hypercore_am(chunk->amoid))
|
||||
{
|
||||
/* To be implemented */
|
||||
Assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,13 +8,54 @@
|
||||
#include <catalog/namespace.h>
|
||||
#include <catalog/pg_trigger.h>
|
||||
#include <commands/event_trigger.h>
|
||||
#include <commands/tablecmds.h>
|
||||
#include <nodes/nodes.h>
|
||||
#include <nodes/parsenodes.h>
|
||||
|
||||
#include "compression/compressionam_handler.h"
|
||||
#include "compression/create.h"
|
||||
#include "continuous_aggs/create.h"
|
||||
#include "hypertable_cache.h"
|
||||
#include "process_utility.h"
|
||||
#include "ts_catalog/continuous_agg.h"
|
||||
|
||||
void
|
||||
tsl_ddl_command_start(ProcessUtilityArgs *args)
|
||||
{
|
||||
switch (nodeTag(args->parsetree))
|
||||
{
|
||||
case T_AlterTableStmt:
|
||||
{
|
||||
AlterTableStmt *stmt = castNode(AlterTableStmt, args->parsetree);
|
||||
ListCell *lc;
|
||||
|
||||
foreach (lc, stmt->cmds)
|
||||
{
|
||||
AlterTableCmd *cmd = lfirst_node(AlterTableCmd, lc);
|
||||
|
||||
switch (cmd->subtype)
|
||||
{
|
||||
#if PG15_GE
|
||||
case AT_SetAccessMethod:
|
||||
if (strcmp(cmd->name, "tscompression") == 0)
|
||||
{
|
||||
Oid relid = AlterTableLookupRelation(stmt, NoLock);
|
||||
compressionam_handler_start_conversion(relid);
|
||||
}
|
||||
break;
|
||||
#endif
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* AlterTableCmds that need tsl side processing invoke this function
|
||||
* we only process AddColumn command right now.
|
||||
*/
|
||||
|
@ -5,7 +5,10 @@
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <commands/event_trigger.h>
|
||||
#include <process_utility.h>
|
||||
|
||||
extern void tsl_process_altertable_cmd(Hypertable *ht, const AlterTableCmd *cmd);
|
||||
extern void tsl_process_rename_cmd(Oid relid, Cache *hcache, const RenameStmt *stmt);
|
||||
extern void tsl_ddl_command_start(ProcessUtilityArgs *args);
|
||||
extern void tsl_ddl_command_end(EventTriggerData *command);
|
||||
|
55
tsl/test/expected/compression_tam.out
Normal file
55
tsl/test/expected/compression_tam.out
Normal file
@ -0,0 +1,55 @@
|
||||
-- This file and its contents are licensed under the Timescale License.
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-TIMESCALE for a copy of the license.
|
||||
CREATE TABLE readings(time timestamptz, location int, device int, temp float, humidity float);
|
||||
SELECT create_hypertable('readings', 'time');
|
||||
NOTICE: adding not-null constraint to column "time"
|
||||
create_hypertable
|
||||
-----------------------
|
||||
(1,public,readings,t)
|
||||
(1 row)
|
||||
|
||||
INSERT INTO readings (time, location, device, temp, humidity)
|
||||
SELECT t, ceil(random()*10), ceil(random()*30), random()*40, random()*100
|
||||
FROM generate_series('2022-06-01'::timestamptz, '2022-07-01', '5s') t;
|
||||
ALTER TABLE readings SET (
|
||||
timescaledb.compress,
|
||||
timescaledb.compress_orderby = 'time',
|
||||
timescaledb.compress_segmentby = 'device'
|
||||
);
|
||||
SET timescaledb.enable_transparent_decompression TO false;
|
||||
SELECT format('%I.%I', chunk_schema, chunk_name)::regclass AS chunk
|
||||
FROM timescaledb_information.chunks
|
||||
WHERE format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'readings'::regclass
|
||||
LIMIT 1 \gset
|
||||
-- We do some basic checks that the compressed data is the same as the
|
||||
-- uncompressed. In this case, we just count the rows for each device.
|
||||
SELECT device, count(*) INTO orig FROM readings GROUP BY device;
|
||||
-- We should be able to set the table access method for a chunk, which
|
||||
-- will automatically compress the chunk.
|
||||
ALTER TABLE :chunk SET ACCESS METHOD tscompression;
|
||||
-- This should compress the chunk
|
||||
SELECT chunk_name FROM chunk_compression_stats('readings') WHERE compression_status='Compressed';
|
||||
chunk_name
|
||||
------------------
|
||||
_hyper_1_1_chunk
|
||||
(1 row)
|
||||
|
||||
-- Should give the same result as above
|
||||
SELECT device, count(*) INTO comp FROM readings GROUP BY device;
|
||||
-- Row counts for each device should match, so this should be empty.
|
||||
SELECT device FROM orig JOIN comp USING (device) WHERE orig.count != comp.count;
|
||||
device
|
||||
--------
|
||||
(0 rows)
|
||||
|
||||
-- We should be able to change it back to heap.
|
||||
ALTER TABLE :chunk SET ACCESS METHOD heap;
|
||||
-- Should give the same result as above
|
||||
SELECT device, count(*) INTO decomp FROM readings GROUP BY device;
|
||||
-- Row counts for each device should match, so this should be empty.
|
||||
SELECT device FROM orig JOIN decomp USING (device) WHERE orig.count != decomp.count;
|
||||
device
|
||||
--------
|
||||
(0 rows)
|
||||
|
@ -206,6 +206,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
|
||||
debug_waitpoint_enable(text)
|
||||
debug_waitpoint_id(text)
|
||||
debug_waitpoint_release(text)
|
||||
ts_compressionam_handler(internal)
|
||||
ts_now_mock()
|
||||
add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval)
|
||||
add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text)
|
||||
|
@ -110,7 +110,7 @@ if((${PG_VERSION_MAJOR} GREATER_EQUAL "15"))
|
||||
list(APPEND TEST_FILES bgw_scheduler_control.sql)
|
||||
endif()
|
||||
list(APPEND TEST_FILES merge_compress.sql cagg_query_using_merge.sql
|
||||
cagg_refresh_using_merge.sql)
|
||||
cagg_refresh_using_merge.sql compression_tam.sql)
|
||||
endif()
|
||||
|
||||
if((${PG_VERSION_MAJOR} GREATER_EQUAL "17"))
|
||||
|
50
tsl/test/sql/compression_tam.sql
Normal file
50
tsl/test/sql/compression_tam.sql
Normal file
@ -0,0 +1,50 @@
|
||||
-- This file and its contents are licensed under the Timescale License.
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-TIMESCALE for a copy of the license.
|
||||
|
||||
CREATE TABLE readings(time timestamptz, location int, device int, temp float, humidity float);
|
||||
|
||||
SELECT create_hypertable('readings', 'time');
|
||||
|
||||
INSERT INTO readings (time, location, device, temp, humidity)
|
||||
SELECT t, ceil(random()*10), ceil(random()*30), random()*40, random()*100
|
||||
FROM generate_series('2022-06-01'::timestamptz, '2022-07-01', '5s') t;
|
||||
|
||||
ALTER TABLE readings SET (
|
||||
timescaledb.compress,
|
||||
timescaledb.compress_orderby = 'time',
|
||||
timescaledb.compress_segmentby = 'device'
|
||||
);
|
||||
|
||||
SET timescaledb.enable_transparent_decompression TO false;
|
||||
|
||||
SELECT format('%I.%I', chunk_schema, chunk_name)::regclass AS chunk
|
||||
FROM timescaledb_information.chunks
|
||||
WHERE format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'readings'::regclass
|
||||
LIMIT 1 \gset
|
||||
|
||||
-- We do some basic checks that the compressed data is the same as the
|
||||
-- uncompressed. In this case, we just count the rows for each device.
|
||||
SELECT device, count(*) INTO orig FROM readings GROUP BY device;
|
||||
|
||||
-- We should be able to set the table access method for a chunk, which
|
||||
-- will automatically compress the chunk.
|
||||
ALTER TABLE :chunk SET ACCESS METHOD tscompression;
|
||||
|
||||
-- This should compress the chunk
|
||||
SELECT chunk_name FROM chunk_compression_stats('readings') WHERE compression_status='Compressed';
|
||||
|
||||
-- Should give the same result as above
|
||||
SELECT device, count(*) INTO comp FROM readings GROUP BY device;
|
||||
|
||||
-- Row counts for each device should match, so this should be empty.
|
||||
SELECT device FROM orig JOIN comp USING (device) WHERE orig.count != comp.count;
|
||||
|
||||
-- We should be able to change it back to heap.
|
||||
ALTER TABLE :chunk SET ACCESS METHOD heap;
|
||||
|
||||
-- Should give the same result as above
|
||||
SELECT device, count(*) INTO decomp FROM readings GROUP BY device;
|
||||
|
||||
-- Row counts for each device should match, so this should be empty.
|
||||
SELECT device FROM orig JOIN decomp USING (device) WHERE orig.count != decomp.count;
|
Loading…
x
Reference in New Issue
Block a user