Add UI for compress_chunks functionality

Add support for compress_chunks function.

This also adds support for compress_orderby and compress_segmentby
parameters in ALTER TABLE. These parameteres are used by the
compress_chunks function.

The parsing code will most likely be changed to use PG raw_parser
function.
This commit is contained in:
gayyappan 2019-07-25 18:30:39 -04:00 committed by Matvey Arye
parent bb89e62629
commit 44941f7bd2
31 changed files with 936 additions and 175 deletions

View File

@ -6,6 +6,7 @@ set(PRE_INSTALL_SOURCE_FILES
pre_install/types.sql # Must be before tables.sql
pre_install/tables.sql
pre_install/types.sql
pre_install/insert_data.sql
pre_install/bgw_scheduler_startup.sql
)

View File

@ -18,3 +18,7 @@ CREATE OR REPLACE FUNCTION move_chunk(
reorder_index REGCLASS=NULL,
verbose BOOLEAN=FALSE
) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_move_chunk' LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION compress_chunk(
chunk REGCLASS
) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_compress_chunk' LANGUAGE C VOLATILE;

View File

@ -0,0 +1,11 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.
--insert data for compression_algorithm --
insert into _timescaledb_catalog.compression_algorithm( id, version, name, description) values
( 0, 1, 'COMPRESSION_ALGORITHM_NONE', 'no compression'),
( 1, 1, 'COMPRESSION_ALGORITHM_ARRAY', 'array'),
( 2, 1, 'COMPRESSION_ALGORITHM_DICTIONARY', 'dictionary'),
( 3, 1, 'COMPRESSION_ALGORITHM_GORILLA', 'gorilla'),
( 4, 1, 'COMPRESSION_ALGORITHM_DELTADELTA', 'deltadelta');

View File

@ -293,7 +293,8 @@ CREATE INDEX continuous_aggs_materialization_invalidation_log_idx
ON _timescaledb_catalog.continuous_aggs_materialization_invalidation_log (materialization_id, lowest_modified_value ASC);
/* the source of this data is the enum from the source code that lists
the algorithms */
* the algorithms. This table is NOT dumped.
*/
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_algorithm(
id SMALLINT PRIMARY KEY,
version SMALLINT NOT NULL,
@ -301,7 +302,6 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_algorithm(
description TEXT
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_algorithm', '');
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.hypertable_compression (
hypertable_id INTEGER REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE,

View File

@ -120,15 +120,13 @@ alter table _timescaledb_catalog.hypertable add constraint hypertable_compress_c
ALTER TABLE _timescaledb_catalog.chunk add column compressed_chunk_id integer references _timescaledb_catalog.chunk(id);
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_algorithm(
CREATE TABLE _timescaledb_catalog.compression_algorithm(
id SMALLINT PRIMARY KEY,
version SMALLINT NOT NULL,
name NAME NOT NULL,
description TEXT
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_algorithm', '');
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.hypertable_compression (
hypertable_id INTEGER REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE,
attname NAME NOT NULL,
@ -190,3 +188,13 @@ CREATE TYPE _timescaledb_internal.compressed_data (
RECEIVE = _timescaledb_internal.compressed_data_recv,
SEND = _timescaledb_internal.compressed_data_send
);
--insert data for compression_algorithm --
insert into _timescaledb_catalog.compression_algorithm values
( 0, 1, 'COMPRESSION_ALGORITHM_NONE', 'no compression'),
( 1, 1, 'COMPRESSION_ALGORITHM_ARRAY', 'array'),
( 2, 1, 'COMPRESSION_ALGORITHM_DICTIONARY', 'dictionary'),
( 3, 1, 'COMPRESSION_ALGORITHM_GORILLA', 'gorilla'),
( 4, 1, 'COMPRESSION_ALGORITHM_DELTADELTA', 'deltadelta')
on conflict(id) do update set (version, name, description)
= (excluded.version, excluded.name, excluded.description);

View File

@ -17,8 +17,7 @@ set(SOURCES
constraint_aware_append.c
cross_module_fn.c
copy.c
compress_hypertable.h
compress_hypertable.c
compression_with_clause.c
dimension.c
dimension_slice.c
dimension_vector.c
@ -31,6 +30,7 @@ set(SOURCES
hypercube.c
hypertable.c
hypertable_cache.c
hypertable_compression.c
hypertable_insert.c
hypertable_restrict_info.c
indexing.c

View File

@ -1,45 +0,0 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include <fmgr.h>
#include <access/htup_details.h>
#include <catalog/dependency.h>
#include <catalog/namespace.h>
#include <catalog/pg_type.h>
#include <catalog/pg_trigger.h>
#include <commands/trigger.h>
#include <storage/lmgr.h>
#include <utils/builtins.h>
#include <utils/lsyscache.h>
#include "compat.h"
#include "compress_hypertable.h"
static const WithClauseDefinition compress_hypertable_with_clause_def[] = {
[CompressEnabled] = {
.arg_name = "compress",
.type_id = BOOLOID,
.default_val = BoolGetDatum(false),
},
[CompressSegmentBy] = {
.arg_name = "segment_by",
.type_id = TEXTARRAYOID,
},
[CompressOrderBy] = {
.arg_name = "order_by",
.type_id = TEXTOID,
},
};
WithClauseResult *
ts_compress_hypertable_set_clause_parse(const List *defelems)
{
return ts_with_clauses_parse(defelems,
compress_hypertable_with_clause_def,
TS_ARRAY_LEN(compress_hypertable_with_clause_def));
}

View File

@ -0,0 +1,213 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include <fmgr.h>
#include <access/htup_details.h>
#include <catalog/dependency.h>
#include <catalog/namespace.h>
#include <catalog/pg_type.h>
#include <catalog/pg_trigger.h>
#include <commands/trigger.h>
#include <storage/lmgr.h>
#include <utils/builtins.h>
#include <utils/lsyscache.h>
#include "compat.h"
#include "compression_with_clause.h"
static const WithClauseDefinition compress_hypertable_with_clause_def[] = {
[CompressEnabled] = {
.arg_name = "compress",
.type_id = BOOLOID,
.default_val = BoolGetDatum(false),
},
[CompressSegmentBy] = {
.arg_name = "compress_segmentby",
.type_id = TEXTOID,
},
[CompressOrderBy] = {
.arg_name = "compress_orderby",
.type_id = TEXTOID,
},
};
WithClauseResult *
ts_compress_hypertable_set_clause_parse(const List *defelems)
{
return ts_with_clauses_parse(defelems,
compress_hypertable_with_clause_def,
TS_ARRAY_LEN(compress_hypertable_with_clause_def));
}
/* strip double quotes from tokens that are names of columns */
static char *
strip_name_token(char *token)
{
int len;
if (token == NULL)
return NULL;
len = strlen(token);
if (token[0] == '"' && (len > 1) && (token[len - 1] == '"'))
{
char *newtok = palloc0(sizeof(char) * (len - 1));
strncpy(newtok, &token[1], len - 2);
return newtok;
}
return token;
}
static List *
parse_segment_collist(char *inpstr, const char *delim)
{
List *collist = NIL;
char *saveptr = NULL;
char *token = strtok_r(inpstr, delim, &saveptr);
short index = 0;
while (token)
{
char *namtoken = NULL;
CompressedParsedCol *col = (CompressedParsedCol *) palloc(sizeof(CompressedParsedCol));
col->index = index;
namtoken = strip_name_token(token);
namestrcpy(&col->colname, namtoken);
index++;
// elog(INFO, "colname is %s %d", col->colname, col->index);
collist = lappend(collist, (void *) col);
token = strtok_r(NULL, delim, &saveptr);
}
return collist;
}
#define CHKTOKEN(token, str) \
(token && (strncmp(token, str, strlen(str)) == 0) && (strlen(str) == strlen(token)))
#define PRINT_UNEXPECTED_TOKEN_MSG(token2) \
ereport(ERROR, \
(errcode(ERRCODE_SYNTAX_ERROR), \
errmsg("unexpected token %s in compress_orderby list ", token2)))
static CompressedParsedCol *
parse_orderelement(char *elttoken, const char *spcdelim, short index)
{
bool getnext = false;
bool neednullstok = false;
CompressedParsedCol *col = (CompressedParsedCol *) palloc(sizeof(CompressedParsedCol));
char *saveptr2 = NULL;
char *namtoken;
char *token2 = strtok_r(elttoken, spcdelim, &saveptr2);
col->index = index;
namtoken = strip_name_token(token2);
namestrcpy(&col->colname, namtoken);
/* default for sort is asc and nulls first */
col->asc = true;
col->nullsfirst = true;
token2 = strtok_r(NULL, spcdelim, &saveptr2);
if (CHKTOKEN(token2, "asc"))
{
col->asc = true;
getnext = true;
}
else if (CHKTOKEN(token2, "desc"))
{
col->asc = false;
/* if we have desceneding then nulls last is default unless user specifies otherwise */
col->nullsfirst = false;
getnext = true;
}
if (getnext)
{
token2 = strtok_r(NULL, spcdelim, &saveptr2);
}
if (CHKTOKEN(token2, "nulls"))
{
token2 = strtok_r(NULL, spcdelim, &saveptr2);
neednullstok = true;
}
else if (token2) // we have a token but not nay of the expected ones
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unexpected token %s in compress_orderby list ", token2)));
}
if (CHKTOKEN(token2, "first"))
{
col->nullsfirst = true;
}
else if (CHKTOKEN(token2, "last"))
{
col->nullsfirst = false;
}
else if (neednullstok)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("expect first/last after nulls in compress_orderby list ")));
}
else if (token2) // we have a token but not nay of the expected ones
{
PRINT_UNEXPECTED_TOKEN_MSG(token2);
}
// any more tokens left?
token2 = strtok_r(NULL, spcdelim, &saveptr2);
if (token2)
{
PRINT_UNEXPECTED_TOKEN_MSG(token2);
}
return col;
}
/* compress_orderby = `<elt>,<elt>:...'
<elt> = <col_name> [asc|desc] [nulls (first|last)]
*/
static List *
parse_order_collist(char *inpstr, const char *delim)
{
List *collist = NIL;
char *saveptr = NULL;
char *elttoken = strtok_r(inpstr, delim, &saveptr);
short index = 0;
char spcdelim = ' ';
while (elttoken)
{
CompressedParsedCol *col = parse_orderelement(elttoken, &spcdelim, index);
collist = lappend(collist, (void *) col);
elttoken = strtok_r(NULL, delim, &saveptr);
index++;
}
return collist;
}
/* returns List of CompressedParsedCol
* compress_segmentby = `col1,col2,col3`
*/
List *
ts_compress_hypertable_parse_segment_by(WithClauseResult *parsed_options)
{
if (parsed_options[CompressSegmentBy].is_default == false)
{
Datum textarg = parsed_options[CompressSegmentBy].parsed;
return parse_segment_collist(TextDatumGetCString(textarg), ",");
}
else
return NIL;
}
/* returns List of CompressedParsedCol
* E.g. timescaledb.compress_orderby = 'col1 asc nulls first,col2 desc,col3'
*/
List *
ts_compress_hypertable_parse_order_by(WithClauseResult *parsed_options)
{
if (parsed_options[CompressOrderBy].is_default == false)
{
Datum textarg = parsed_options[CompressOrderBy].parsed;
return parse_order_collist(TextDatumGetCString(textarg), ",");
}
else
return NIL;
}

View File

@ -3,8 +3,8 @@
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#ifndef TIMESCALEDB_COMPRESS_HYPERTABLE_H
#define TIMESCALEDB_COMPRESS_HYPERTABLE_H
#ifndef TIMESCALEDB_COMPRESSION_WITH_CLAUSE_H
#define TIMESCALEDB_COMPRESSION_WITH_CLAUSE_H
#include <postgres.h>
#include <catalog/pg_type.h>
@ -20,6 +20,16 @@ typedef enum CompressHypertableOption
CompressOrderBy,
} CompressHypertableOption;
typedef struct
{
short index;
NameData colname;
bool nullsfirst;
bool asc;
} CompressedParsedCol;
WithClauseResult *ts_compress_hypertable_set_clause_parse(const List *defelems);
extern TSDLLEXPORT List *ts_compress_hypertable_parse_segment_by(WithClauseResult *parsed_options);
extern TSDLLEXPORT List *ts_compress_hypertable_parse_order_by(WithClauseResult *parsed_options);
#endif

View File

@ -24,6 +24,7 @@ TS_FUNCTION_INFO_V1(ts_partialize_agg);
TS_FUNCTION_INFO_V1(ts_finalize_agg_sfunc);
TS_FUNCTION_INFO_V1(ts_finalize_agg_ffunc);
TS_FUNCTION_INFO_V1(ts_continuous_agg_invalidation_trigger);
TS_FUNCTION_INFO_V1(ts_compress_chunk);
TS_FUNCTION_INFO_V1(ts_compressed_data_decompress_forward);
TS_FUNCTION_INFO_V1(ts_compressed_data_decompress_reverse);
@ -194,6 +195,12 @@ ts_array_compressor_finish(PG_FUNCTION_ARGS)
return ts_cm_functions->array_compressor_finish(fcinfo);
}
Datum
ts_compress_chunk(PG_FUNCTION_ARGS)
{
PG_RETURN_DATUM(ts_cm_functions->compress_chunk(fcinfo));
}
/*
* casting a function pointer to a pointer of another type is undefined
* behavior, so we need one of these for every function type we have
@ -377,6 +384,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.array_compressor_append = error_no_default_fn_pg_community,
.array_compressor_finish = error_no_default_fn_pg_community,
.process_compress_table = process_compress_table_default,
.compress_chunk = error_no_default_fn_pg_enterprise,
};
TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default;

View File

@ -88,6 +88,7 @@ typedef struct CrossModuleFunctions
PGFunction array_compressor_finish;
bool (*process_compress_table)(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options);
PGFunction compress_chunk;
} CrossModuleFunctions;
extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions;

View File

@ -30,6 +30,7 @@
typedef struct SubspaceStore SubspaceStore;
typedef struct Chunk Chunk;
#define TS_HYPERTABLE_HAS_COMPRESSION_ON(ht) (ht->fd.compressed_hypertable_id > 0)
typedef struct Hypertable
{
FormData_hypertable fd;

View File

@ -0,0 +1,110 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include "hypertable.h"
#include "hypertable_cache.h"
#include "catalog.h"
#include "hypertable_compression.h"
#include "scanner.h"
#include "scan_iterator.h"
static void
hypertable_compression_fill_from_tuple(FormData_hypertable_compression *fd, TupleInfo *ti)
{
HeapTuple tuple = ti->tuple;
TupleDesc desc = ti->desc;
Datum val;
bool isnull;
memcpy((void *) fd, GETSTRUCT(tuple), sizeof(FormData_hypertable_compression));
/* copy the part that could have null values explictly */
val = heap_getattr(tuple, Anum_hypertable_compression_segmentby_column_index, desc, &isnull);
if (isnull)
fd->segmentby_column_index = 0;
else
fd->segmentby_column_index = DatumGetInt16(val);
val = heap_getattr(tuple, Anum_hypertable_compression_orderby_column_index, desc, &isnull);
if (isnull)
fd->orderby_column_index = 0;
else
{
fd->orderby_column_index = DatumGetInt16(val);
val = heap_getattr(tuple, Anum_hypertable_compression_orderby_asc, desc, &isnull);
fd->orderby_asc = BoolGetDatum(val);
val = heap_getattr(tuple, Anum_hypertable_compression_orderby_nullsfirst, desc, &isnull);
fd->orderby_nullsfirst = BoolGetDatum(val);
}
}
void
hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values,
bool *nulls)
{
memset(nulls, 0, sizeof(bool) * Natts_hypertable_compression);
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_hypertable_id)] =
Int32GetDatum(fd->hypertable_id);
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_attname)] =
NameGetDatum(&fd->attname);
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_algo_id)] =
Int16GetDatum(fd->algo_id);
if (fd->segmentby_column_index > 0)
{
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_segmentby_column_index)] =
Int16GetDatum(fd->segmentby_column_index);
}
else
{
nulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_segmentby_column_index)] = true;
}
if (fd->orderby_column_index > 0)
{
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_column_index)] =
Int16GetDatum(fd->orderby_column_index);
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_asc)] =
BoolGetDatum(fd->orderby_asc);
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_nullsfirst)] =
BoolGetDatum(fd->orderby_nullsfirst);
}
else
{
nulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_column_index)] = true;
nulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_asc)] = true;
nulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_nullsfirst)] = true;
}
}
/* returns length of list and fills passed in list with pointers
* to FormData_hypertable_compression
*/
List *
get_hypertablecompression_info(int32 htid)
{
List *fdlist = NIL;
FormData_hypertable_compression *colfd = NULL;
ScanIterator iterator =
ts_scan_iterator_create(HYPERTABLE_COMPRESSION, AccessShareLock, CurrentMemoryContext);
iterator.ctx.index =
catalog_get_index(ts_catalog_get(), HYPERTABLE_COMPRESSION, HYPERTABLE_COMPRESSION_PKEY);
ts_scan_iterator_scan_key_init(&iterator,
Anum_hypertable_compression_pkey_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(htid));
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
FormData_hypertable_compression *data =
(FormData_hypertable_compression *) GETSTRUCT(ti->tuple);
if (data->hypertable_id != htid)
continue;
colfd = palloc0(sizeof(FormData_hypertable_compression));
hypertable_compression_fill_from_tuple(colfd, ti);
fdlist = lappend(fdlist, colfd);
}
return fdlist;
}

View File

@ -0,0 +1,19 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#ifndef TIMESCALEDB_HYPERTABLE_COMPRESSION_H
#define TIMESCALEDB_HYPERTABLE_COMPRESSION_H
#include <postgres.h>
#include <catalog/pg_type.h>
#include <catalog.h>
#include <chunk.h>
extern TSDLLEXPORT List *get_hypertablecompression_info(int32 htid);
extern TSDLLEXPORT void
hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values,
bool *nulls);
#endif

View File

@ -62,7 +62,7 @@
#include "with_clause_parser.h"
#include "cross_module_fn.h"
#include "continuous_agg.h"
#include "compress_hypertable.h"
#include "compression_with_clause.h"
#include "partitioning.h"
#include "cross_module_fn.h"

View File

@ -21,6 +21,7 @@ ORDER BY proname;
attach_tablespace
chunk_relation_size
chunk_relation_size_pretty
compress_chunk
create_hypertable
detach_tablespace
detach_tablespaces

View File

@ -524,10 +524,11 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
timescaledb_information.drop_chunks_policies
timescaledb_information.license
timescaledb_information.hypertable
_timescaledb_catalog.compression_algorithm
_timescaledb_internal.bgw_policy_chunk_stats
_timescaledb_internal.bgw_job_stat
_timescaledb_catalog.tablespace_id_seq
(10 rows)
(11 rows)
-- Make sure we can't run our restoring functions as a normal perm user as that would disable functionality for the whole db
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER

View File

@ -2,6 +2,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/array.c
${CMAKE_CURRENT_SOURCE_DIR}/compression.c
${CMAKE_CURRENT_SOURCE_DIR}/create.c
${CMAKE_CURRENT_SOURCE_DIR}/compress_utils.c
${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c
${CMAKE_CURRENT_SOURCE_DIR}/dictionary.c
${CMAKE_CURRENT_SOURCE_DIR}/gorilla.c

View File

@ -0,0 +1,117 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
/* This file contains the implementation for SQL utility functions that
* compress and decompress chunks
*/
#include <postgres.h>
#include <miscadmin.h>
#include <storage/lmgr.h>
#include <utils/elog.h>
#include "chunk.h"
#include "errors.h"
#include "hypertable.h"
#include "hypertable_cache.h"
#include "hypertable_compression.h"
#include "create.h"
#include "compress_utils.h"
#include "compression.h"
typedef struct CompressChunkCxt
{
Hypertable *srcht;
Chunk *srcht_chunk; /* chunk from srcht */
Hypertable *compress_ht; /*compressed table for srcht */
} CompressChunkCxt;
static void
compresschunkcxt_init(CompressChunkCxt *cxt, Cache *hcache, Oid hypertable_relid, Oid chunk_relid)
{
Hypertable *srcht = ts_hypertable_cache_get_entry(hcache, hypertable_relid);
Hypertable *compress_ht;
Chunk *srcchunk;
if (srcht == NULL)
ereport(ERROR,
(errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
errmsg("table \"%s\" is not a hypertable", get_rel_name(hypertable_relid))));
ts_hypertable_permissions_check(srcht->main_table_relid, GetUserId());
if (!TS_HYPERTABLE_HAS_COMPRESSION_ON(srcht))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("chunks can be compressed only if compression property is set on the "
"hypertable"),
errhint("use ALTER TABLE with timescaledb.compression option ")));
}
compress_ht = ts_hypertable_get_by_id(srcht->fd.compressed_hypertable_id);
if (compress_ht == NULL)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("missing compress hypertable")));
/* user has to be the owner of the compression table too */
ts_hypertable_permissions_check(compress_ht->main_table_relid, GetUserId());
if (!srcht->space) // something is wrong
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("missing hyperspace for hypertable")));
/* refetch the srcchunk with all attributes filled in */
srcchunk = ts_chunk_get_by_relid(chunk_relid, srcht->space->num_dimensions, true);
cxt->srcht = srcht;
cxt->compress_ht = compress_ht;
cxt->srcht_chunk = srcchunk;
return;
}
static void
compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
{
CompressChunkCxt cxt;
Chunk *compress_ht_chunk;
Cache *hcache;
ListCell *lc;
List *htcols_list = NIL;
const ColumnCompressionInfo **colinfo_array;
int i = 0, htcols_listlen;
hcache = ts_hypertable_cache_pin();
compresschunkcxt_init(&cxt, hcache, hypertable_relid, chunk_relid);
/* acquire locks on src and compress hypertable and src chunk */
LockRelationOid(cxt.srcht->main_table_relid, AccessShareLock);
LockRelationOid(cxt.compress_ht->main_table_relid, AccessShareLock);
LockRelationOid(cxt.srcht_chunk->table_id, AccessShareLock); /*upgrade when needed */
// get compression properties for hypertable
htcols_list = get_hypertablecompression_info(cxt.srcht->fd.id);
htcols_listlen = list_length(htcols_list);
// create compressed chunk DDL and compress the data
compress_ht_chunk = create_compress_chunk_table(cxt.compress_ht, cxt.srcht_chunk);
/* convert list to array of pointers for compress_chunk */
colinfo_array = palloc(sizeof(ColumnCompressionInfo *) * htcols_listlen);
foreach (lc, htcols_list)
{
FormData_hypertable_compression *fd = (FormData_hypertable_compression *) lfirst(lc);
colinfo_array[i++] = fd;
}
compress_chunk(cxt.srcht_chunk->table_id,
compress_ht_chunk->table_id,
colinfo_array,
htcols_listlen);
ts_chunk_set_compressed_chunk(cxt.srcht_chunk, compress_ht_chunk->fd.id, false);
ts_cache_release(hcache);
}
Datum
tsl_compress_chunk(PG_FUNCTION_ARGS)
{
Oid chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
Chunk *srcchunk = ts_chunk_get_by_relid(chunk_id, 0, true);
if (srcchunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
{
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("chunk is already compressed")));
}
compress_chunk_impl(srcchunk->hypertable_relid, chunk_id);
PG_RETURN_VOID();
}

View File

@ -0,0 +1,11 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#ifndef TIMESCALEDB_TSL_COMPRESSION_UTILS_H
#define TIMESCALEDB_TSL_COMPRESSION_UTILS_H
extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS);
#endif // TIMESCALEDB_TSL_COMPRESSION_UTILS_H

View File

@ -34,6 +34,8 @@
#include "gorilla.h"
#define MAX_ROWS_PER_COMPRESSION 1000
#define COMPRESSIONCOL_IS_SEGMENT_BY(col) (col->segmentby_column_index > 0)
#define COMPRESSIONCOL_IS_ORDER_BY(col) (col->orderby_column_index > 0)
static const CompressionAlgorithmDefinition definitions[_END_COMPRESSION_ALGORITHMS] = {
[COMPRESSION_ALGORITHM_ARRAY] = ARRAY_ALGORITHM_DEFINITION,
@ -181,12 +183,10 @@ compress_chunk_populate_keys(Oid in_table, const ColumnCompressionInfo **columns
for (i = 0; i < n_columns; i++)
{
bool is_segmentby = columns[i]->segmentby_column_index >= 0;
bool is_orderby = columns[i]->orderby_column_index >= 0;
if (is_segmentby)
if (COMPRESSIONCOL_IS_SEGMENT_BY(columns[i]))
n_segment_keys += 1;
if (is_segmentby || is_orderby)
if (COMPRESSIONCOL_IS_SEGMENT_BY(columns[i]) || COMPRESSIONCOL_IS_ORDER_BY(columns[i]))
*n_keys_out += 1;
}
@ -195,13 +195,14 @@ compress_chunk_populate_keys(Oid in_table, const ColumnCompressionInfo **columns
for (i = 0; i < n_columns; i++)
{
const ColumnCompressionInfo *column = columns[i];
int16 segment_offset = column->segmentby_column_index;
int16 orderby_offset = column->orderby_column_index;
/* valid values for segmentby_columnn_index and orderby_column_index
are > 0 */
int16 segment_offset = column->segmentby_column_index - 1;
int16 orderby_offset = column->orderby_column_index - 1;
AttrNumber compressed_att;
if (segment_offset >= 0)
if (COMPRESSIONCOL_IS_SEGMENT_BY(column))
(*keys_out)[segment_offset] = column;
if (columns[i]->orderby_column_index >= 0)
else if (COMPRESSIONCOL_IS_ORDER_BY(column))
(*keys_out)[n_segment_keys + orderby_offset] = column;
compressed_att = get_attnum(in_table, NameStr(column->attname));
@ -294,11 +295,11 @@ compress_chunk_populate_sort_info_for_column(Oid table, const ColumnCompressionI
*att_nums = att_tup->attnum;
*collation = att_tup->attcollation;
*nulls_first = column->segmentby_column_index < 0 && column->orderby_nullsfirst;
*nulls_first = (!(COMPRESSIONCOL_IS_SEGMENT_BY(column))) && column->orderby_nullsfirst;
tentry = lookup_type_cache(att_tup->atttypid, TYPECACHE_LT_OPR | TYPECACHE_GT_OPR);
if (column->segmentby_column_index >= 0 || column->orderby_asc)
if (COMPRESSIONCOL_IS_SEGMENT_BY(column) || column->orderby_asc)
*sort_operator = tentry->lt_opr;
else
*sort_operator = tentry->gt_opr;
@ -362,8 +363,7 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
row_compressor->uncompressed_col_to_compressed_col[in_column_offset] =
AttrNumberGetAttrOffset(compressed_colnum);
Assert(AttrNumberGetAttrOffset(compressed_colnum) < num_compressed_columns);
if (compression_info->segmentby_column_index < 0)
if (!COMPRESSIONCOL_IS_SEGMENT_BY(compression_info))
{
*column = (PerColumn){
.compressor = compressor_for_algorithm_and_type(compression_info->algo_id,

View File

@ -14,6 +14,7 @@
#include <commands/tablecmds.h>
#include <commands/tablespace.h>
#include <nodes/makefuncs.h>
#include <storage/lmgr.h>
#include <utils/builtins.h>
#include <utils/rel.h>
@ -25,9 +26,24 @@
#include "trigger.h"
#include "scan_iterator.h"
#include "hypertable_cache.h"
#include "compression_with_clause.h"
#include "compression.h"
#include "hypertable_compression.h"
/* entrypoint
* tsl_process_compress_table : is the entry point.
*/
typedef struct CompressColInfo
{
int numcols;
FormData_hypertable_compression
*col_meta; /* metadata about columns from src hypertable that will be compressed*/
List *coldeflist; /*list of ColumnDef for the compressed column */
} CompressColInfo;
static void compresscolinfo_init(CompressColInfo *cc, Oid srctbl_relid, List *segmentby_cols,
List *orderby_cols);
static void compresscolinfo_add_catalog_entries(CompressColInfo *compress_cols, int32 htid);
#define PRINT_COMPRESSION_TABLE_NAME(buf, prefix, hypertable_id) \
do \
@ -41,20 +57,59 @@
} \
} while (0);
static void test_compresschunk(Hypertable *ht, int32 compress_htid);
#define COMPRESSEDDATA_TYPE_NAME "_timescaledb_internal.compressed_data"
/* return ColumnDef list - dups columns of passed in relid
* new columns have BYTEA type
static enum CompressionAlgorithms
get_default_algorithm_id(Oid typeoid)
{
switch (typeoid)
{
case INT4OID:
case INT2OID:
case INT8OID:
case INTERVALOID:
case DATEOID:
case TIMESTAMPOID:
{
return COMPRESSION_ALGORITHM_DELTADELTA;
}
case FLOAT4OID:
case FLOAT8OID:
{
return COMPRESSION_ALGORITHM_GORILLA;
}
case NUMERICOID:
{
return COMPRESSION_ALGORITHM_ARRAY;
}
case TEXTOID:
case CHAROID:
{
return COMPRESSION_ALGORITHM_DICTIONARY;
}
default:
return COMPRESSION_ALGORITHM_DICTIONARY;
}
}
/*
* return the columndef list for compressed hypertable.
* we do this by getting the source hypertable's attrs,
* 1. validate the segmentby cols and orderby cols exists in this list and
* 2. create the columndefs for the new compressed hypertable
* segmentby_cols have same datatype as the original table
* all other cols have COMPRESSEDDATA_TYPE type
*/
static List *
get_compress_columndef_from_table(Oid srctbl_relid)
static void
compresscolinfo_init(CompressColInfo *cc, Oid srctbl_relid, List *segmentby_cols,
List *orderby_cols)
{
Relation rel;
TupleDesc tupdesc;
int attno;
List *collist = NIL;
int i, colno, attno;
int32 *segorder_colindex;
int seg_attnolen = 0;
ListCell *lc;
const Oid compresseddata_oid =
DatumGetObjectId(DirectFunctionCall1(regtypein, CStringGetDatum(COMPRESSEDDATA_TYPE_NAME)));
@ -62,28 +117,129 @@ get_compress_columndef_from_table(Oid srctbl_relid)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("type \"%s\" does not exist", COMPRESSEDDATA_TYPE_NAME)));
/* Get the tupledesc and turn it over to expandTupleDesc */
seg_attnolen = list_length(segmentby_cols);
rel = relation_open(srctbl_relid, AccessShareLock);
segorder_colindex = palloc0(sizeof(int32) * (rel->rd_att->natts));
tupdesc = rel->rd_att;
i = 1;
foreach (lc, segmentby_cols)
{
CompressedParsedCol *col = (CompressedParsedCol *) lfirst(lc);
AttrNumber col_attno = attno_find_by_attname(tupdesc, &col->colname);
if (col_attno == InvalidAttrNumber)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("column %s in compress_segmentby list does not exist",
NameStr(col->colname))));
}
segorder_colindex[col_attno - 1] = i++;
}
/* the column indexes are numbered as seg_attnolen + <orderby_index>
*/
Assert(seg_attnolen == (i - 1));
foreach (lc, orderby_cols)
{
CompressedParsedCol *col = (CompressedParsedCol *) lfirst(lc);
AttrNumber col_attno = attno_find_by_attname(tupdesc, &col->colname);
if (col_attno == InvalidAttrNumber)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("column %s in compress_orderby list does not exist",
NameStr(col->colname))));
}
/* check if orderby_cols and segmentby_cols are distinct */
if (segorder_colindex[col_attno - 1] != 0)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot use the same column %s in compress_orderby and "
"compress_segmentby",
NameStr(col->colname))));
}
segorder_colindex[col_attno - 1] = i++;
}
cc->numcols = 0;
cc->col_meta = palloc0(sizeof(FormData_hypertable_compression) * tupdesc->natts);
cc->coldeflist = NIL;
colno = 0;
for (attno = 0; attno < tupdesc->natts; attno++)
{
Oid attroid = InvalidOid;
Form_pg_attribute attr = TupleDescAttr(tupdesc, attno);
if (!attr->attisdropped)
ColumnDef *coldef;
if (attr->attisdropped)
continue;
namestrcpy(&cc->col_meta[colno].attname, NameStr(attr->attname));
if (segorder_colindex[attno] > 0)
{
ColumnDef *col = makeColumnDef(NameStr(attr->attname),
compresseddata_oid,
-1 /*typmod*/,
0 /*collation*/);
collist = lappend(collist, col);
if (segorder_colindex[attno] <= seg_attnolen)
{
attroid = attr->atttypid; /*segment by columns have original type */
cc->col_meta[colno].segmentby_column_index = segorder_colindex[attno];
}
else
{
int orderby_index = segorder_colindex[attno] - seg_attnolen;
CompressedParsedCol *ordercol = list_nth(orderby_cols, orderby_index - 1);
cc->col_meta[colno].orderby_column_index = orderby_index;
cc->col_meta[colno].orderby_asc = ordercol->asc;
cc->col_meta[colno].orderby_nullsfirst = ordercol->nullsfirst;
}
}
if (attroid == InvalidOid)
{
attroid = compresseddata_oid; /* default type for column */
cc->col_meta[colno].algo_id = get_default_algorithm_id(attr->atttypid);
}
else
{
cc->col_meta[colno].algo_id = 0; // invalid algo number
}
coldef = makeColumnDef(NameStr(attr->attname), attroid, -1 /*typmod*/, 0 /*collation*/);
cc->coldeflist = lappend(cc->coldeflist, coldef);
colno++;
}
cc->numcols = colno;
relation_close(rel, AccessShareLock);
return collist;
}
/* prevent concurrent transactions from inserting into
* hypertable_compression for the same table, acquire the lock but don't free
* here
* i.e. 2 concurrent ALTER TABLE to compressed will not succeed.
*/
static void
compresscolinfo_add_catalog_entries(CompressColInfo *compress_cols, int32 htid)
{
Catalog *catalog = ts_catalog_get();
Relation rel;
Datum values[Natts_hypertable_compression];
bool nulls[Natts_hypertable_compression] = { false };
TupleDesc desc;
int i;
CatalogSecurityContext sec_ctx;
rel = heap_open(catalog_get_table_id(catalog, HYPERTABLE_COMPRESSION), RowExclusiveLock);
desc = RelationGetDescr(rel);
for (i = 0; i < compress_cols->numcols; i++)
{
FormData_hypertable_compression *fd = &compress_cols->col_meta[i];
fd->hypertable_id = htid;
hypertable_compression_fill_tuple_values(fd, &values[0], &nulls[0]);
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
}
heap_close(rel, NoLock); /*lock will be released at end of transaction only*/
}
static int32
create_compression_table(Oid relid, Oid owner)
create_compression_table(Oid owner, CompressColInfo *compress_cols)
{
ObjectAddress tbladdress;
char relnamebuf[NAMEDATALEN];
@ -94,12 +250,10 @@ create_compression_table(Oid relid, Oid owner)
CreateStmt *create;
RangeVar *compress_rel;
List *collist;
int32 compress_hypertable_id;
collist = get_compress_columndef_from_table(relid);
create = makeNode(CreateStmt);
create->tableElts = collist;
create->tableElts = compress_cols->coldeflist;
create->inhRelations = NIL;
create->ofTypename = NULL;
create->constraints = NIL;
@ -128,14 +282,12 @@ create_compression_table(Oid relid, Oid owner)
return compress_hypertable_id;
}
/* this function will change in the follow up PR. Please do not review */
static Chunk *
create_compress_chunk(Hypertable *compress_ht, int32 compress_hypertable_id, Chunk *src_chunk)
Chunk *
create_compress_chunk_table(Hypertable *compress_ht, Chunk *src_chunk)
{
Hyperspace *hs = compress_ht->space;
Catalog *catalog = ts_catalog_get();
CatalogSecurityContext sec_ctx;
// Hypercube *cube;
Chunk *compress_chunk;
/* Create a new chunk based on the hypercube */
@ -150,7 +302,7 @@ create_compress_chunk(Hypertable *compress_ht, int32 compress_hypertable_id, Chu
namestrcpy(&compress_chunk->fd.schema_name, INTERNAL_SCHEMA_NAME);
snprintf(compress_chunk->fd.table_name.data,
NAMEDATALEN,
"compress_%s_%d_chunk",
"compress%s_%d_chunk",
NameStr(compress_ht->fd.associated_table_prefix),
compress_chunk->fd.id);
compress_chunk->constraints = NULL;
@ -161,15 +313,12 @@ create_compress_chunk(Hypertable *compress_ht, int32 compress_hypertable_id, Chu
compress_chunk->table_id = ts_chunk_create_table(compress_chunk, compress_ht);
if (!OidIsValid(compress_chunk->table_id))
elog(ERROR, "could not create chunk table");
elog(ERROR, "could not create compress chunk table");
/* compressed chunk has no constraints. But inherits indexes and triggers
* from the compressed hypertable
*/
/* Create the chunk's constraints, triggers, and indexes */
/* ts_chunk_constraints_create(compress_chunk->constraints,
compress_chunk->table_id,
compress_chunk->fd.id,
compress_chunk->hypertable_relid,
compress_chunk->fd.hypertable_id);
*/
ts_trigger_create_all_on_chunk(compress_ht, compress_chunk);
ts_chunk_index_create_all(compress_chunk->fd.hypertable_id,
@ -191,45 +340,22 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options)
{
int32 compress_htid;
struct CompressColInfo compress_cols;
Oid ownerid = ts_rel_get_owner(ht->main_table_relid);
List *segmentby_cols = ts_compress_hypertable_parse_segment_by(with_clause_options);
List *orderby_cols = ts_compress_hypertable_parse_order_by(with_clause_options);
/* we need an AccessShare lock on the hypertable so that there are
* no DDL changes while we create the compressed hypertable
*/
LockRelationOid(ht->main_table_relid, AccessShareLock);
compresscolinfo_init(&compress_cols, ht->main_table_relid, segmentby_cols, orderby_cols);
compress_htid = create_compression_table(ht->main_table_relid, ownerid);
compress_htid = create_compression_table(ownerid, &compress_cols);
/* block concurrent DDL that creates same compressed hypertable*/
LockRelationOid(catalog_get_table_id(ts_catalog_get(), HYPERTABLE), RowExclusiveLock);
ts_hypertable_set_compressed_id(ht, compress_htid);
// TODO remove this after we have compress_chunks function
test_compresschunk(ht, compress_htid);
compresscolinfo_add_catalog_entries(&compress_cols, ht->fd.id);
/* do not release any locks, will get released by xact end */
return true;
}
static List *
get_chunk_ids(int32 hypertable_id)
{
List *chunk_ids = NIL;
ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
FormData_chunk *form = (FormData_chunk *) GETSTRUCT(ts_scan_iterator_tuple(&iterator));
if (form->hypertable_id == hypertable_id)
chunk_ids = lappend_int(chunk_ids, form->id);
}
return chunk_ids;
}
static void
test_compresschunk(Hypertable *ht, int32 compress_htid)
{
Cache *hcache = ts_hypertable_cache_pin();
Hypertable *compress_ht = ts_hypertable_cache_get_entry_by_id(hcache, compress_htid);
// compress chunk from origin table */
List *ht_chks = get_chunk_ids(ht->fd.id);
ListCell *lc;
foreach (lc, ht_chks)
{
int chkid = lfirst_int(lc);
Chunk *src_chunk = ts_chunk_get_by_id(chkid, 0, true);
Chunk *compress_chunk = create_compress_chunk(compress_ht, compress_ht->fd.id, src_chunk);
ts_chunk_set_compressed_chunk(src_chunk, compress_chunk->fd.id, false);
}
ts_cache_release(hcache);
}

View File

@ -10,8 +10,9 @@
#include "with_clause_parser.h"
#include "hypertable.h"
#include "chunk.h"
bool tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options);
Chunk *create_compress_chunk_table(Hypertable *compress_ht, Chunk *src_chunk);
#endif /* TIMESCALEDB_TSL_COMPRESSION_CREATE_H */

View File

@ -33,6 +33,7 @@
#include "process_utility.h"
#include "hypertable.h"
#include "compression/create.h"
#include "compression/compress_utils.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
@ -105,6 +106,7 @@ CrossModuleFunctions tsl_cm_functions = {
.array_compressor_append = tsl_array_compressor_append,
.array_compressor_finish = tsl_array_compressor_finish,
.process_compress_table = tsl_process_compress_table,
.compress_chunk = tsl_compress_chunk,
};
TS_FUNCTION_INFO_V1(ts_module_init);

View File

@ -15,27 +15,30 @@ CREATE OR REPLACE FUNCTION ts_decompress_table(in_table REGCLASS, out_table REGC
\set ECHO errors
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
-- column name, algorithm, idx, asc, nulls_first
--no sgement_byindex (use 0 to indicate that)
CREATE FUNCTION ord(TEXT, INT, INT, BOOL = true, BOOL = false)
RETURNS _timescaledb_catalog.hypertable_compression
AS $$
SELECT (1, $1, $2::SMALLINT, -1, $3::SMALLINT, $4, $5)::_timescaledb_catalog.hypertable_compression
SELECT (1, $1, $2::SMALLINT, 0, $3::SMALLINT+1, $4, $5)::_timescaledb_catalog.hypertable_compression
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
-- column name, idx, asc, nulls_first
-- no orderby_index. use 0 to indicate that.
CREATE FUNCTION seg(TEXT, INT, BOOL = true, BOOL = false)
RETURNS _timescaledb_catalog.hypertable_compression
AS $$
SELECT (1, $1, 0, $2::SMALLINT, -1, $3, $4)::_timescaledb_catalog.hypertable_compression
SELECT (1, $1, 0, $2::SMALLINT+1, 0, $3, $4)::_timescaledb_catalog.hypertable_compression
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
-- column name, algorithm
--no orderby or segment by index (use 0 to indicate that)
CREATE FUNCTION com(TEXT, INT)
RETURNS _timescaledb_catalog.hypertable_compression
AS $$
SELECT (1, $1, $2::SMALLINT, -1, -1, true, false)::_timescaledb_catalog.hypertable_compression
SELECT (1, $1, $2::SMALLINT, 0, 0, true, false)::_timescaledb_catalog.hypertable_compression
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
SELECT * FROM ord('time', 4, 0);
hypertable_id | attname | compression_algorithm_id | segmentby_column_index | orderby_column_index | orderby_asc | orderby_nullsfirst
---------------+---------+--------------------------+------------------------+----------------------+-------------+--------------------
1 | time | 4 | -1 | 0 | t | f
1 | time | 4 | 0 | 1 | t | f
(1 row)
CREATE TABLE uncompressed(
@ -414,6 +417,20 @@ SELECT ARRAY[ord('device', :deltadelta, 0), com('b', :deltadelta), com('time', :
b | device | time | b | device | t
---+--------+------+---+--------+---
(0 rows)
ts_decompress_table
---------------------
(1 row)
?column? | count
---------------------------------------------------------------------------+-------
Number of rows different between original and decompress_table (expect 0) | 0
(1 row)
b | device | time | b | device | time
---+--------+------+---+--------+------
(0 rows)
TRUNCATE compressed;

View File

@ -3,7 +3,7 @@
-- LICENSE-TIMESCALE for a copy of the license.
--TEST1 ---
--basic test with count
create table foo (a integer, b integer, c integer);
create table foo (a integer, b integer, c integer, d integer);
select table_name from create_hypertable('foo', 'a', chunk_time_interval=> 10);
NOTICE: adding not-null constraint to column "a"
table_name
@ -11,11 +11,11 @@ NOTICE: adding not-null constraint to column "a"
foo
(1 row)
insert into foo values( 3 , 16 , 20);
insert into foo values( 10 , 10 , 20);
insert into foo values( 20 , 11 , 20);
insert into foo values( 30 , 12 , 20);
alter table foo set (timescaledb.compress);
insert into foo values( 3 , 16 , 20, 11);
insert into foo values( 10 , 10 , 20, 120);
insert into foo values( 20 , 11 , 20, 13);
insert into foo values( 30 , 12 , 20, 14);
alter table foo set (timescaledb.compress, timescaledb.compress_segmentby = 'a,b', timescaledb.compress_orderby = 'c desc, d asc nulls last');
select id, schema_name, table_name, compressed, compressed_hypertable_id from
_timescaledb_catalog.hypertable order by id;
id | schema_name | table_name | compressed | compressed_hypertable_id
@ -24,15 +24,38 @@ _timescaledb_catalog.hypertable order by id;
2 | _timescaledb_internal | _compressed_hypertable_2 | t |
(2 rows)
-- should error out --
\set ON_ERROR_STOP 0
ALTER TABLE foo ALTER b SET NOT NULL, set (timescaledb.compress);
ERROR: ALTER TABLE <hypertable> SET does not support multiple clauses
\set ON_ERROR_STOP 1
ALTER TABLE foo ALTER b SET NOT NULL;
select attname, attnotnull from pg_attribute where attrelid = (select oid from pg_class where relname like 'foo') and attname like 'b';
attname | attnotnull
---------+------------
b | t
select * from _timescaledb_catalog.hypertable_compression order by hypertable_id, attname;
hypertable_id | attname | compression_algorithm_id | segmentby_column_index | orderby_column_index | orderby_asc | orderby_nullsfirst
---------------+---------+--------------------------+------------------------+----------------------+-------------+--------------------
1 | a | 0 | 1 | | |
1 | b | 0 | 2 | | |
1 | c | 4 | | 1 | f | f
1 | d | 4 | | 2 | t | f
(4 rows)
select compress_chunk( '_timescaledb_internal._hyper_1_2_chunk');
compress_chunk
----------------
(1 row)
select compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');
compress_chunk
----------------
(1 row)
select ch1.id, ch1.schema_name, ch1.table_name , ch2.table_name as compress_table
from
_timescaledb_catalog.chunk ch1, _timescaledb_catalog.chunk ch2
where ch1.compressed_chunk_id = ch2.id;
id | schema_name | table_name | compress_table
----+-----------------------+------------------+--------------------------
2 | _timescaledb_internal | _hyper_1_2_chunk | compress_hyper_2_5_chunk
1 | _timescaledb_internal | _hyper_1_1_chunk | compress_hyper_2_6_chunk
(2 rows)
\set ON_ERROR_STOP 0
--cannot recompress the chunk the second time around
select compress_chunk( '_timescaledb_internal._hyper_1_2_chunk');
ERROR: chunk is already compressed

View File

@ -0,0 +1,69 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\set ON_ERROR_STOP 0
--table with special column names --
create table foo2 (a integer, "bacB toD" integer, c integer, d integer);
select table_name from create_hypertable('foo2', 'a', chunk_time_interval=> 10);
NOTICE: adding not-null constraint to column "a"
table_name
------------
foo2
(1 row)
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c');
ERROR: cannot use the same column c in compress_orderby and compress_segmentby
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd');
select * from _timescaledb_catalog.hypertable_compression order by attname;
hypertable_id | attname | compression_algorithm_id | segmentby_column_index | orderby_column_index | orderby_asc | orderby_nullsfirst
---------------+----------+--------------------------+------------------------+----------------------+-------------+--------------------
1 | a | 4 | | | |
1 | bacB toD | 0 | 1 | | |
1 | c | 0 | 2 | | |
1 | d | 4 | | 1 | t | t
(4 rows)
-- Negative test cases ---
--basic test with count
create table foo (a integer, b integer, c integer);
select table_name from create_hypertable('foo', 'a', chunk_time_interval=> 10);
NOTICE: adding not-null constraint to column "a"
table_name
------------
foo
(1 row)
insert into foo values( 3 , 16 , 20);
insert into foo values( 10 , 10 , 20);
insert into foo values( 20 , 11 , 20);
insert into foo values( 30 , 12 , 20);
-- should error out --
ALTER TABLE foo ALTER b SET NOT NULL, set (timescaledb.compress);
ERROR: ALTER TABLE <hypertable> SET does not support multiple clauses
ALTER TABLE foo ALTER b SET NOT NULL;
select attname, attnotnull from pg_attribute where attrelid = (select oid from pg_class where relname like 'foo') and attname like 'b';
attname | attnotnull
---------+------------
b | t
(1 row)
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_segmentby = 'd');
ERROR: column d in compress_segmentby list does not exist
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'd');
ERROR: column d in compress_orderby list does not exist
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c desc nulls');
ERROR: expect first/last after nulls in compress_orderby list
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c desc nulls thirsty');
ERROR: expect first/last after nulls in compress_orderby list
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c climb nulls first');
ERROR: unexpected token climb in compress_orderby list
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c nulls first asC');
ERROR: unexpected token asC in compress_orderby list
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c desc nulls first asc');
ERROR: unexpected token asc in compress_orderby list
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c desc hurry');
ERROR: unexpected token hurry in compress_orderby list
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c descend');
ERROR: unexpected token descend in compress_orderby list
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_segmentby = 'c asc' , timescaledb.compress_orderby = 'c');
ERROR: column c asc in compress_segmentby list does not exist

View File

@ -20,6 +20,7 @@ set(TEST_FILES_DEBUG
continuous_aggs_materialize.sql
continuous_aggs_multi.sql
compression.sql
compression_errors.sql
ddl_hook.sql
tsl_tables.sql
)

View File

@ -14,24 +14,27 @@ CREATE OR REPLACE FUNCTION ts_decompress_table(in_table REGCLASS, out_table REGC
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
-- column name, algorithm, idx, asc, nulls_first
--no sgement_byindex (use 0 to indicate that)
CREATE FUNCTION ord(TEXT, INT, INT, BOOL = true, BOOL = false)
RETURNS _timescaledb_catalog.hypertable_compression
AS $$
SELECT (1, $1, $2::SMALLINT, -1, $3::SMALLINT, $4, $5)::_timescaledb_catalog.hypertable_compression
SELECT (1, $1, $2::SMALLINT, 0, $3::SMALLINT+1, $4, $5)::_timescaledb_catalog.hypertable_compression
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
-- column name, idx, asc, nulls_first
-- no orderby_index. use 0 to indicate that.
CREATE FUNCTION seg(TEXT, INT, BOOL = true, BOOL = false)
RETURNS _timescaledb_catalog.hypertable_compression
AS $$
SELECT (1, $1, 0, $2::SMALLINT, -1, $3, $4)::_timescaledb_catalog.hypertable_compression
SELECT (1, $1, 0, $2::SMALLINT+1, 0, $3, $4)::_timescaledb_catalog.hypertable_compression
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
-- column name, algorithm
--no orderby or segment by index (use 0 to indicate that)
CREATE FUNCTION com(TEXT, INT)
RETURNS _timescaledb_catalog.hypertable_compression
AS $$
SELECT (1, $1, $2::SMALLINT, -1, -1, true, false)::_timescaledb_catalog.hypertable_compression
SELECT (1, $1, $2::SMALLINT, 0, 0, true, false)::_timescaledb_catalog.hypertable_compression
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
SELECT * FROM ord('time', 4, 0);

View File

@ -5,21 +5,27 @@
--TEST1 ---
--basic test with count
create table foo (a integer, b integer, c integer);
create table foo (a integer, b integer, c integer, d integer);
select table_name from create_hypertable('foo', 'a', chunk_time_interval=> 10);
insert into foo values( 3 , 16 , 20);
insert into foo values( 10 , 10 , 20);
insert into foo values( 20 , 11 , 20);
insert into foo values( 30 , 12 , 20);
insert into foo values( 3 , 16 , 20, 11);
insert into foo values( 10 , 10 , 20, 120);
insert into foo values( 20 , 11 , 20, 13);
insert into foo values( 30 , 12 , 20, 14);
alter table foo set (timescaledb.compress);
alter table foo set (timescaledb.compress, timescaledb.compress_segmentby = 'a,b', timescaledb.compress_orderby = 'c desc, d asc nulls last');
select id, schema_name, table_name, compressed, compressed_hypertable_id from
_timescaledb_catalog.hypertable order by id;
select * from _timescaledb_catalog.hypertable_compression order by hypertable_id, attname;
select compress_chunk( '_timescaledb_internal._hyper_1_2_chunk');
select compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');
select ch1.id, ch1.schema_name, ch1.table_name , ch2.table_name as compress_table
from
_timescaledb_catalog.chunk ch1, _timescaledb_catalog.chunk ch2
where ch1.compressed_chunk_id = ch2.id;
-- should error out --
\set ON_ERROR_STOP 0
ALTER TABLE foo ALTER b SET NOT NULL, set (timescaledb.compress);
\set ON_ERROR_STOP 1
ALTER TABLE foo ALTER b SET NOT NULL;
select attname, attnotnull from pg_attribute where attrelid = (select oid from pg_class where relname like 'foo') and attname like 'b';
--cannot recompress the chunk the second time around
select compress_chunk( '_timescaledb_internal._hyper_1_2_chunk');

View File

@ -0,0 +1,41 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\set ON_ERROR_STOP 0
--table with special column names --
create table foo2 (a integer, "bacB toD" integer, c integer, d integer);
select table_name from create_hypertable('foo2', 'a', chunk_time_interval=> 10);
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c');
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd');
select * from _timescaledb_catalog.hypertable_compression order by attname;
-- Negative test cases ---
--basic test with count
create table foo (a integer, b integer, c integer);
select table_name from create_hypertable('foo', 'a', chunk_time_interval=> 10);
insert into foo values( 3 , 16 , 20);
insert into foo values( 10 , 10 , 20);
insert into foo values( 20 , 11 , 20);
insert into foo values( 30 , 12 , 20);
-- should error out --
ALTER TABLE foo ALTER b SET NOT NULL, set (timescaledb.compress);
ALTER TABLE foo ALTER b SET NOT NULL;
select attname, attnotnull from pg_attribute where attrelid = (select oid from pg_class where relname like 'foo') and attname like 'b';
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_segmentby = 'd');
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'd');
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c desc nulls');
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c desc nulls thirsty');
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c climb nulls first');
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c nulls first asC');
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c desc nulls first asc');
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c desc hurry');
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'c descend');
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_segmentby = 'c asc' , timescaledb.compress_orderby = 'c');