mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-17 19:13:16 +08:00
Refactor compression settings handling
This patch moves the check function for orderby and segmentby into dedicated functions and cleans up a few other places where code was redundant.
This commit is contained in:
parent
a69c4682ce
commit
b819aa24c0
@ -48,20 +48,8 @@
|
|||||||
#include "utils.h"
|
#include "utils.h"
|
||||||
#include "guc.h"
|
#include "guc.h"
|
||||||
|
|
||||||
/* entrypoint
|
static List *build_columndefs(CompressionSettings *settings, Oid src_relid);
|
||||||
* tsl_process_compress_table : is the entry point.
|
static ColumnDef *build_columndef_singlecolumn(const char *colname, Oid typid);
|
||||||
*/
|
|
||||||
typedef struct CompressColInfo
|
|
||||||
{
|
|
||||||
int numcols;
|
|
||||||
Oid compresseddata_oid;
|
|
||||||
CompressionSettings *settings;
|
|
||||||
List *coldeflist; /*list of ColumnDef for the compressed column */
|
|
||||||
} CompressColInfo;
|
|
||||||
|
|
||||||
static void compresscolinfo_init(CompressColInfo *cc, Oid srctbl_relid, List *segmentby_cols,
|
|
||||||
List *orderby_cols);
|
|
||||||
static void compresscolinfo_init_singlecolumn(CompressColInfo *cc, const char *colname, Oid typid);
|
|
||||||
|
|
||||||
#define PRINT_COMPRESSION_TABLE_NAME(buf, prefix, hypertable_id) \
|
#define PRINT_COMPRESSION_TABLE_NAME(buf, prefix, hypertable_id) \
|
||||||
do \
|
do \
|
||||||
@ -107,22 +95,141 @@ column_segment_max_name(int16 column_index)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
compresscolinfo_add_metadata_columns(CompressColInfo *cc)
|
check_segmentby(Oid relid, List *segmentby_cols)
|
||||||
{
|
{
|
||||||
CompressionSettings *settings = cc->settings;
|
ListCell *lc;
|
||||||
|
ArrayType *segmentby = NULL;
|
||||||
|
|
||||||
/* additional metadata columns.
|
foreach (lc, segmentby_cols)
|
||||||
* these are not listed in hypertable_compression catalog table
|
{
|
||||||
* and so only has a ColDef entry */
|
CompressedParsedCol *col = (CompressedParsedCol *) lfirst(lc);
|
||||||
|
AttrNumber col_attno = get_attnum(relid, NameStr(col->colname));
|
||||||
|
if (col_attno == InvalidAttrNumber)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("column \"%s\" does not exist", NameStr(col->colname)),
|
||||||
|
errhint("The timescaledb.compress_segmentby option must reference a valid "
|
||||||
|
"column.")));
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *col_attname = get_attname(relid, col_attno, false);
|
||||||
|
|
||||||
|
/* check if segmentby columns are distinct. */
|
||||||
|
if (ts_array_is_member(segmentby, col_attname))
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("duplicate column name \"%s\"", NameStr(col->colname)),
|
||||||
|
errhint("The timescaledb.compress_segmentby option must reference distinct "
|
||||||
|
"column.")));
|
||||||
|
segmentby = ts_array_add_element_text(segmentby, col_attname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
check_orderby(Oid relid, List *orderby_cols, ArrayType *segmentby)
|
||||||
|
{
|
||||||
|
ArrayType *orderby = NULL;
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
|
foreach (lc, orderby_cols)
|
||||||
|
{
|
||||||
|
CompressedParsedCol *col = (CompressedParsedCol *) lfirst(lc);
|
||||||
|
AttrNumber col_attno = get_attnum(relid, NameStr(col->colname));
|
||||||
|
|
||||||
|
if (col_attno == InvalidAttrNumber)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("column \"%s\" does not exist", NameStr(col->colname)),
|
||||||
|
errhint("The timescaledb.compress_orderby option must reference a valid "
|
||||||
|
"column.")));
|
||||||
|
|
||||||
|
const char *col_attname = get_attname(relid, col_attno, false);
|
||||||
|
|
||||||
|
/* check if orderby columns are distinct. */
|
||||||
|
if (ts_array_is_member(orderby, col_attname))
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("duplicate column name \"%s\"", NameStr(col->colname)),
|
||||||
|
errhint("The timescaledb.compress_orderby option must reference distinct "
|
||||||
|
"column.")));
|
||||||
|
|
||||||
|
/* check if orderby_cols and segmentby_cols are distinct */
|
||||||
|
if (ts_array_is_member(segmentby, col_attname))
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("cannot use column \"%s\" for both ordering and segmenting",
|
||||||
|
col_attname),
|
||||||
|
errhint("Use separate columns for the timescaledb.compress_orderby and"
|
||||||
|
" timescaledb.compress_segmentby options.")));
|
||||||
|
|
||||||
|
orderby = ts_array_add_element_text(orderby, col_attname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* return the columndef list for compressed hypertable.
|
||||||
|
* we do this by getting the source hypertable's attrs,
|
||||||
|
* 1. validate the segmentby cols and orderby cols exists in this list and
|
||||||
|
* 2. create the columndefs for the new compressed hypertable
|
||||||
|
* segmentby_cols have same datatype as the original table
|
||||||
|
* all other cols have COMPRESSEDDATA_TYPE type
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
build_columndefs(CompressionSettings *settings, Oid src_relid)
|
||||||
|
{
|
||||||
|
Oid compresseddata_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;
|
||||||
|
ArrayType *segmentby = settings->fd.segmentby;
|
||||||
|
List *column_defs = NIL;
|
||||||
|
|
||||||
|
Relation rel = table_open(src_relid, AccessShareLock);
|
||||||
|
TupleDesc tupdesc = rel->rd_att;
|
||||||
|
|
||||||
|
for (int attno = 0; attno < tupdesc->natts; attno++)
|
||||||
|
{
|
||||||
|
Oid attroid = InvalidOid;
|
||||||
|
int32 typmod = -1;
|
||||||
|
Oid collid = 0;
|
||||||
|
|
||||||
|
Form_pg_attribute attr = TupleDescAttr(tupdesc, attno);
|
||||||
|
ColumnDef *coldef;
|
||||||
|
if (attr->attisdropped)
|
||||||
|
continue;
|
||||||
|
if (strncmp(NameStr(attr->attname),
|
||||||
|
COMPRESSION_COLUMN_METADATA_PREFIX,
|
||||||
|
strlen(COMPRESSION_COLUMN_METADATA_PREFIX)) == 0)
|
||||||
|
elog(ERROR,
|
||||||
|
"cannot compress tables with reserved column prefix '%s'",
|
||||||
|
COMPRESSION_COLUMN_METADATA_PREFIX);
|
||||||
|
|
||||||
|
bool is_segmentby = ts_array_is_member(segmentby, NameStr(attr->attname));
|
||||||
|
|
||||||
|
if (is_segmentby)
|
||||||
|
{
|
||||||
|
attroid = attr->atttypid; /*segment by columns have original type */
|
||||||
|
typmod = attr->atttypmod;
|
||||||
|
collid = attr->attcollation;
|
||||||
|
}
|
||||||
|
if (!OidIsValid(attroid))
|
||||||
|
{
|
||||||
|
attroid = compresseddata_oid; /* default type for column */
|
||||||
|
}
|
||||||
|
coldef = makeColumnDef(NameStr(attr->attname), attroid, typmod, collid);
|
||||||
|
column_defs = lappend(column_defs, coldef);
|
||||||
|
}
|
||||||
|
|
||||||
|
table_close(rel, AccessShareLock);
|
||||||
|
|
||||||
|
/* additional metadata columns. */
|
||||||
|
|
||||||
/* count of the number of uncompressed rows */
|
/* count of the number of uncompressed rows */
|
||||||
cc->coldeflist = lappend(cc->coldeflist,
|
column_defs = lappend(column_defs,
|
||||||
makeColumnDef(COMPRESSION_COLUMN_METADATA_COUNT_NAME,
|
makeColumnDef(COMPRESSION_COLUMN_METADATA_COUNT_NAME,
|
||||||
INT4OID,
|
INT4OID,
|
||||||
-1 /* typemod */,
|
-1 /* typemod */,
|
||||||
0 /*collation*/));
|
0 /*collation*/));
|
||||||
/* sequence_num column */
|
/* sequence_num column */
|
||||||
cc->coldeflist = lappend(cc->coldeflist,
|
column_defs = lappend(column_defs,
|
||||||
makeColumnDef(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME,
|
makeColumnDef(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME,
|
||||||
INT4OID,
|
INT4OID,
|
||||||
-1 /* typemod */,
|
-1 /* typemod */,
|
||||||
@ -147,12 +254,12 @@ compresscolinfo_add_metadata_columns(CompressColInfo *cc)
|
|||||||
errdetail("Could not identify a less-than operator for the type.")));
|
errdetail("Could not identify a less-than operator for the type.")));
|
||||||
|
|
||||||
/* segment_meta min and max columns */
|
/* segment_meta min and max columns */
|
||||||
cc->coldeflist = lappend(cc->coldeflist,
|
column_defs = lappend(column_defs,
|
||||||
makeColumnDef(column_segment_min_name(index),
|
makeColumnDef(column_segment_min_name(index),
|
||||||
col_type,
|
col_type,
|
||||||
-1 /* typemod */,
|
-1 /* typemod */,
|
||||||
0 /*collation*/));
|
0 /*collation*/));
|
||||||
cc->coldeflist = lappend(cc->coldeflist,
|
column_defs = lappend(column_defs,
|
||||||
makeColumnDef(column_segment_max_name(index),
|
makeColumnDef(column_segment_max_name(index),
|
||||||
col_type,
|
col_type,
|
||||||
-1 /* typemod */,
|
-1 /* typemod */,
|
||||||
@ -160,180 +267,37 @@ compresscolinfo_add_metadata_columns(CompressColInfo *cc)
|
|||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
return column_defs;
|
||||||
|
|
||||||
/*
|
|
||||||
* return the columndef list for compressed hypertable.
|
|
||||||
* we do this by getting the source hypertable's attrs,
|
|
||||||
* 1. validate the segmentby cols and orderby cols exists in this list and
|
|
||||||
* 2. create the columndefs for the new compressed hypertable
|
|
||||||
* segmentby_cols have same datatype as the original table
|
|
||||||
* all other cols have COMPRESSEDDATA_TYPE type
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
compresscolinfo_init(CompressColInfo *cc, Oid srctbl_relid, List *segmentby_cols,
|
|
||||||
List *orderby_cols)
|
|
||||||
{
|
|
||||||
Relation rel;
|
|
||||||
TupleDesc tupdesc;
|
|
||||||
int colno, attno;
|
|
||||||
ListCell *lc;
|
|
||||||
cc->compresseddata_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;
|
|
||||||
|
|
||||||
rel = table_open(srctbl_relid, AccessShareLock);
|
|
||||||
tupdesc = rel->rd_att;
|
|
||||||
ArrayType *segmentby = NULL;
|
|
||||||
|
|
||||||
foreach (lc, segmentby_cols)
|
|
||||||
{
|
|
||||||
CompressedParsedCol *col = (CompressedParsedCol *) lfirst(lc);
|
|
||||||
AttrNumber col_attno = get_attnum(rel->rd_id, NameStr(col->colname));
|
|
||||||
if (col_attno == InvalidAttrNumber)
|
|
||||||
{
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
||||||
errmsg("column \"%s\" does not exist", NameStr(col->colname)),
|
|
||||||
errhint("The timescaledb.compress_segmentby option must reference a valid "
|
|
||||||
"column.")));
|
|
||||||
}
|
|
||||||
|
|
||||||
const char *col_attname = get_attname(rel->rd_id, col_attno, false);
|
|
||||||
|
|
||||||
/* check if segmentby columns are distinct. */
|
|
||||||
if (ts_array_is_member(segmentby, col_attname))
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
||||||
errmsg("duplicate column name \"%s\"", NameStr(col->colname)),
|
|
||||||
errhint("The timescaledb.compress_segmentby option must reference distinct "
|
|
||||||
"column.")));
|
|
||||||
segmentby = ts_array_add_element_text(segmentby, col_attname);
|
|
||||||
}
|
|
||||||
/* the column indexes are numbered as seg_attnolen + <orderby_index>
|
|
||||||
*/
|
|
||||||
ArrayType *orderby = NULL;
|
|
||||||
ArrayType *orderby_desc = NULL;
|
|
||||||
ArrayType *orderby_nullsfirst = NULL;
|
|
||||||
|
|
||||||
foreach (lc, orderby_cols)
|
|
||||||
{
|
|
||||||
CompressedParsedCol *col = (CompressedParsedCol *) lfirst(lc);
|
|
||||||
AttrNumber col_attno = get_attnum(rel->rd_id, NameStr(col->colname));
|
|
||||||
|
|
||||||
if (col_attno == InvalidAttrNumber)
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
||||||
errmsg("column \"%s\" does not exist", NameStr(col->colname)),
|
|
||||||
errhint("The timescaledb.compress_orderby option must reference a valid "
|
|
||||||
"column.")));
|
|
||||||
|
|
||||||
const char *col_attname = get_attname(rel->rd_id, col_attno, false);
|
|
||||||
|
|
||||||
/* check if orderby columns are distinct. */
|
|
||||||
if (ts_array_is_member(orderby, col_attname))
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
||||||
errmsg("duplicate column name \"%s\"", NameStr(col->colname)),
|
|
||||||
errhint("The timescaledb.compress_orderby option must reference distinct "
|
|
||||||
"column.")));
|
|
||||||
|
|
||||||
/* check if orderby_cols and segmentby_cols are distinct */
|
|
||||||
if (ts_array_is_member(segmentby, col_attname))
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
||||||
errmsg("cannot use column \"%s\" for both ordering and segmenting",
|
|
||||||
col_attname),
|
|
||||||
errhint("Use separate columns for the timescaledb.compress_orderby and"
|
|
||||||
" timescaledb.compress_segmentby options.")));
|
|
||||||
|
|
||||||
orderby = ts_array_add_element_text(orderby, col_attname);
|
|
||||||
orderby_desc = ts_array_add_element_bool(orderby_desc, !col->asc);
|
|
||||||
orderby_nullsfirst = ts_array_add_element_bool(orderby_nullsfirst, col->nullsfirst);
|
|
||||||
}
|
|
||||||
|
|
||||||
cc->numcols = 0;
|
|
||||||
cc->coldeflist = NIL;
|
|
||||||
colno = 0;
|
|
||||||
for (attno = 0; attno < tupdesc->natts; attno++)
|
|
||||||
{
|
|
||||||
Oid attroid = InvalidOid;
|
|
||||||
int32 typmod = -1;
|
|
||||||
Oid collid = 0;
|
|
||||||
|
|
||||||
Form_pg_attribute attr = TupleDescAttr(tupdesc, attno);
|
|
||||||
ColumnDef *coldef;
|
|
||||||
if (attr->attisdropped)
|
|
||||||
continue;
|
|
||||||
if (strncmp(NameStr(attr->attname),
|
|
||||||
COMPRESSION_COLUMN_METADATA_PREFIX,
|
|
||||||
strlen(COMPRESSION_COLUMN_METADATA_PREFIX)) == 0)
|
|
||||||
elog(ERROR,
|
|
||||||
"cannot compress tables with reserved column prefix '%s'",
|
|
||||||
COMPRESSION_COLUMN_METADATA_PREFIX);
|
|
||||||
|
|
||||||
bool is_segmentby = ts_array_is_member(segmentby, NameStr(attr->attname));
|
|
||||||
bool is_orderby = ts_array_is_member(orderby, NameStr(attr->attname));
|
|
||||||
|
|
||||||
if (is_segmentby || is_orderby)
|
|
||||||
{
|
|
||||||
if (is_segmentby)
|
|
||||||
{
|
|
||||||
attroid = attr->atttypid; /*segment by columns have original type */
|
|
||||||
typmod = attr->atttypmod;
|
|
||||||
collid = attr->attcollation;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!OidIsValid(attroid))
|
|
||||||
{
|
|
||||||
attroid = cc->compresseddata_oid; /* default type for column */
|
|
||||||
}
|
|
||||||
coldef = makeColumnDef(NameStr(attr->attname), attroid, typmod, collid);
|
|
||||||
cc->coldeflist = lappend(cc->coldeflist, coldef);
|
|
||||||
colno++;
|
|
||||||
}
|
|
||||||
cc->numcols = colno;
|
|
||||||
|
|
||||||
cc->settings->fd.segmentby = segmentby;
|
|
||||||
cc->settings->fd.orderby = orderby;
|
|
||||||
cc->settings->fd.orderby_desc = orderby_desc;
|
|
||||||
cc->settings->fd.orderby_nullsfirst = orderby_nullsfirst;
|
|
||||||
|
|
||||||
ts_compression_settings_update(cc->settings);
|
|
||||||
|
|
||||||
table_close(rel, AccessShareLock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* use this api for the case when you add a single column to a table that already has
|
/* use this api for the case when you add a single column to a table that already has
|
||||||
* compression setup
|
* compression setup
|
||||||
* such as ALTER TABLE xyz ADD COLUMN .....
|
* such as ALTER TABLE xyz ADD COLUMN .....
|
||||||
*/
|
*/
|
||||||
static void
|
static ColumnDef *
|
||||||
compresscolinfo_init_singlecolumn(CompressColInfo *cc, const char *colname, Oid typid)
|
build_columndef_singlecolumn(const char *colname, Oid typid)
|
||||||
{
|
{
|
||||||
cc->compresseddata_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;
|
Oid compresseddata_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;
|
||||||
ColumnDef *coldef;
|
|
||||||
|
|
||||||
cc->numcols = 1;
|
return makeColumnDef(colname, compresseddata_oid, -1 /*typmod*/, 0 /*collation*/);
|
||||||
cc->coldeflist = NIL;
|
|
||||||
coldef = makeColumnDef(colname, cc->compresseddata_oid, -1 /*typmod*/, 0 /*collation*/);
|
|
||||||
cc->coldeflist = lappend(cc->coldeflist, coldef);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* modify storage attributes for toast table columns attached to the
|
/* modify storage attributes for toast table columns attached to the
|
||||||
* compression table
|
* compression table
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
modify_compressed_toast_table_storage(CompressColInfo *cc, Oid compress_relid)
|
modify_compressed_toast_table_storage(CompressionSettings *settings, List *coldefs,
|
||||||
|
Oid compress_relid)
|
||||||
{
|
{
|
||||||
CompressionSettings *settings = cc->settings;
|
|
||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
List *cmds = NIL;
|
List *cmds = NIL;
|
||||||
|
Oid compresseddata_oid = ts_custom_type_cache_get(CUSTOM_TYPE_COMPRESSED_DATA)->type_oid;
|
||||||
|
|
||||||
foreach (lc, cc->coldeflist)
|
foreach (lc, coldefs)
|
||||||
{
|
{
|
||||||
ColumnDef *cd = lfirst_node(ColumnDef, lc);
|
ColumnDef *cd = lfirst_node(ColumnDef, lc);
|
||||||
AttrNumber attno = get_attnum(compress_relid, cd->colname);
|
AttrNumber attno = get_attnum(compress_relid, cd->colname);
|
||||||
if (attno != InvalidAttrNumber &&
|
if (attno != InvalidAttrNumber && get_atttype(compress_relid, attno) == compresseddata_oid)
|
||||||
get_atttype(compress_relid, attno) == cc->compresseddata_oid)
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* All columns that pass the datatype check are columns
|
* All columns that pass the datatype check are columns
|
||||||
@ -367,10 +331,9 @@ modify_compressed_toast_table_storage(CompressColInfo *cc, Oid compress_relid)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
create_compressed_table_indexes(Oid compresstable_relid, CompressColInfo *compress_cols)
|
create_compressed_table_indexes(Oid compresstable_relid, CompressionSettings *settings)
|
||||||
{
|
{
|
||||||
Cache *hcache;
|
Cache *hcache;
|
||||||
CompressionSettings *settings = compress_cols->settings;
|
|
||||||
Hypertable *ht =
|
Hypertable *ht =
|
||||||
ts_hypertable_cache_get_cache_and_entry(compresstable_relid, CACHE_FLAG_NONE, &hcache);
|
ts_hypertable_cache_get_cache_and_entry(compresstable_relid, CACHE_FLAG_NONE, &hcache);
|
||||||
IndexStmt stmt = {
|
IndexStmt stmt = {
|
||||||
@ -391,7 +354,7 @@ create_compressed_table_indexes(Oid compresstable_relid, CompressColInfo *compre
|
|||||||
|
|
||||||
StringInfo buf = makeStringInfo();
|
StringInfo buf = makeStringInfo();
|
||||||
|
|
||||||
if (compress_cols->settings->fd.segmentby)
|
if (settings->fd.segmentby)
|
||||||
{
|
{
|
||||||
Datum datum;
|
Datum datum;
|
||||||
bool isnull;
|
bool isnull;
|
||||||
@ -512,7 +475,8 @@ set_toast_tuple_target_on_compressed(Oid compressed_table_id)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int32
|
static int32
|
||||||
create_compression_table(Oid owner, CompressColInfo *compress_cols, Oid tablespace_oid)
|
create_compression_table(Oid owner, CompressionSettings *settings, List *coldefs,
|
||||||
|
Oid tablespace_oid)
|
||||||
{
|
{
|
||||||
ObjectAddress tbladdress;
|
ObjectAddress tbladdress;
|
||||||
char relnamebuf[NAMEDATALEN];
|
char relnamebuf[NAMEDATALEN];
|
||||||
@ -526,7 +490,7 @@ create_compression_table(Oid owner, CompressColInfo *compress_cols, Oid tablespa
|
|||||||
int32 compress_hypertable_id;
|
int32 compress_hypertable_id;
|
||||||
|
|
||||||
create = makeNode(CreateStmt);
|
create = makeNode(CreateStmt);
|
||||||
create->tableElts = compress_cols->coldeflist;
|
create->tableElts = coldefs;
|
||||||
create->inhRelations = NIL;
|
create->inhRelations = NIL;
|
||||||
create->ofTypename = NULL;
|
create->ofTypename = NULL;
|
||||||
create->constraints = NIL;
|
create->constraints = NIL;
|
||||||
@ -554,13 +518,13 @@ create_compression_table(Oid owner, CompressColInfo *compress_cols, Oid tablespa
|
|||||||
(void) heap_reloptions(RELKIND_TOASTVALUE, toast_options, true);
|
(void) heap_reloptions(RELKIND_TOASTVALUE, toast_options, true);
|
||||||
NewRelationCreateToastTable(compress_relid, toast_options);
|
NewRelationCreateToastTable(compress_relid, toast_options);
|
||||||
ts_catalog_restore_user(&sec_ctx);
|
ts_catalog_restore_user(&sec_ctx);
|
||||||
modify_compressed_toast_table_storage(compress_cols, compress_relid);
|
modify_compressed_toast_table_storage(settings, coldefs, compress_relid);
|
||||||
ts_hypertable_create_compressed(compress_relid, compress_hypertable_id);
|
ts_hypertable_create_compressed(compress_relid, compress_hypertable_id);
|
||||||
|
|
||||||
set_statistics_on_compressed_table(compress_relid);
|
set_statistics_on_compressed_table(compress_relid);
|
||||||
set_toast_tuple_target_on_compressed(compress_relid);
|
set_toast_tuple_target_on_compressed(compress_relid);
|
||||||
|
|
||||||
create_compressed_table_indexes(compress_relid, compress_cols);
|
create_compressed_table_indexes(compress_relid, settings);
|
||||||
return compress_hypertable_id;
|
return compress_hypertable_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -711,7 +675,7 @@ add_time_to_order_by_if_not_included(List *orderby_cols, List *segmentby_cols, H
|
|||||||
* This is limited to foreign key constraints now
|
* This is limited to foreign key constraints now
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
validate_existing_constraints(Hypertable *ht, CompressColInfo *colinfo, List **indexes)
|
validate_existing_constraints(Hypertable *ht, CompressionSettings *settings, List **indexes)
|
||||||
{
|
{
|
||||||
Relation pg_constr;
|
Relation pg_constr;
|
||||||
SysScanDesc scan;
|
SysScanDesc scan;
|
||||||
@ -719,7 +683,6 @@ validate_existing_constraints(Hypertable *ht, CompressColInfo *colinfo, List **i
|
|||||||
HeapTuple tuple;
|
HeapTuple tuple;
|
||||||
List *conlist = NIL;
|
List *conlist = NIL;
|
||||||
ArrayType *arr;
|
ArrayType *arr;
|
||||||
CompressionSettings *settings = colinfo->settings;
|
|
||||||
|
|
||||||
pg_constr = table_open(ConstraintRelationId, AccessShareLock);
|
pg_constr = table_open(ConstraintRelationId, AccessShareLock);
|
||||||
|
|
||||||
@ -829,13 +792,12 @@ validate_existing_constraints(Hypertable *ht, CompressColInfo *colinfo, List **i
|
|||||||
* by the constraint checking above.
|
* by the constraint checking above.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
validate_existing_indexes(Hypertable *ht, CompressColInfo *colinfo, List *ignore)
|
validate_existing_indexes(Hypertable *ht, CompressionSettings *settings, List *ignore)
|
||||||
{
|
{
|
||||||
Relation pg_index;
|
Relation pg_index;
|
||||||
HeapTuple htup;
|
HeapTuple htup;
|
||||||
ScanKeyData skey;
|
ScanKeyData skey;
|
||||||
SysScanDesc indscan;
|
SysScanDesc indscan;
|
||||||
CompressionSettings *settings = colinfo->settings;
|
|
||||||
|
|
||||||
ScanKeyInit(&skey,
|
ScanKeyInit(&skey,
|
||||||
Anum_pg_index_indrelid,
|
Anum_pg_index_indrelid,
|
||||||
@ -993,12 +955,11 @@ disable_compression(Hypertable *ht, WithClauseResult *with_clause_options)
|
|||||||
|
|
||||||
/* Add column to internal compression table */
|
/* Add column to internal compression table */
|
||||||
static void
|
static void
|
||||||
add_column_to_compression_table(Hypertable *compress_ht, CompressColInfo *compress_cols)
|
add_column_to_compression_table(Hypertable *compress_ht, CompressionSettings *settings,
|
||||||
|
ColumnDef *coldef)
|
||||||
{
|
{
|
||||||
Oid compress_relid = compress_ht->main_table_relid;
|
Oid compress_relid = compress_ht->main_table_relid;
|
||||||
ColumnDef *coldef;
|
|
||||||
AlterTableCmd *addcol_cmd;
|
AlterTableCmd *addcol_cmd;
|
||||||
coldef = (ColumnDef *) linitial(compress_cols->coldeflist);
|
|
||||||
|
|
||||||
/* create altertable stmt to add column to the compressed hypertable */
|
/* create altertable stmt to add column to the compressed hypertable */
|
||||||
Assert(TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(compress_ht));
|
Assert(TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(compress_ht));
|
||||||
@ -1009,7 +970,7 @@ add_column_to_compression_table(Hypertable *compress_ht, CompressColInfo *compre
|
|||||||
|
|
||||||
/* alter the table and add column */
|
/* alter the table and add column */
|
||||||
ts_alter_table_with_event_trigger(compress_relid, NULL, list_make1(addcol_cmd), true);
|
ts_alter_table_with_event_trigger(compress_relid, NULL, list_make1(addcol_cmd), true);
|
||||||
modify_compressed_toast_table_storage(compress_cols, compress_relid);
|
modify_compressed_toast_table_storage(settings, list_make1(coldef), compress_relid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Drop column from internal compression table */
|
/* Drop column from internal compression table */
|
||||||
@ -1071,7 +1032,6 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
|
|||||||
WithClauseResult *with_clause_options)
|
WithClauseResult *with_clause_options)
|
||||||
{
|
{
|
||||||
int32 compress_htid;
|
int32 compress_htid;
|
||||||
struct CompressColInfo compress_cols = { 0 };
|
|
||||||
bool compress_enable = DatumGetBool(with_clause_options[CompressEnabled].parsed);
|
bool compress_enable = DatumGetBool(with_clause_options[CompressEnabled].parsed);
|
||||||
Oid ownerid;
|
Oid ownerid;
|
||||||
List *segmentby_cols;
|
List *segmentby_cols;
|
||||||
@ -1122,8 +1082,12 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
|
|||||||
if (TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(ht))
|
if (TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(ht))
|
||||||
check_modify_compression_options(ht, settings, with_clause_options, orderby_cols);
|
check_modify_compression_options(ht, settings, with_clause_options, orderby_cols);
|
||||||
|
|
||||||
compress_cols.settings = settings;
|
compression_settings_update(settings, with_clause_options, segmentby_cols, orderby_cols);
|
||||||
compresscolinfo_init(&compress_cols, ht->main_table_relid, segmentby_cols, orderby_cols);
|
|
||||||
|
check_segmentby(ht->main_table_relid, segmentby_cols);
|
||||||
|
check_orderby(ht->main_table_relid, orderby_cols, settings->fd.segmentby);
|
||||||
|
|
||||||
|
List *column_defs = build_columndefs(settings, ht->main_table_relid);
|
||||||
|
|
||||||
/* take explicit locks on catalog tables and keep them till end of txn */
|
/* take explicit locks on catalog tables and keep them till end of txn */
|
||||||
LockRelationOid(catalog_get_table_id(ts_catalog_get(), HYPERTABLE), RowExclusiveLock);
|
LockRelationOid(catalog_get_table_id(ts_catalog_get(), HYPERTABLE), RowExclusiveLock);
|
||||||
@ -1134,22 +1098,15 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
|
|||||||
drop_existing_compression_table(ht);
|
drop_existing_compression_table(ht);
|
||||||
}
|
}
|
||||||
|
|
||||||
compression_settings_update(compress_cols.settings,
|
|
||||||
with_clause_options,
|
|
||||||
segmentby_cols,
|
|
||||||
orderby_cols);
|
|
||||||
|
|
||||||
/* Check if we can create a compressed hypertable with existing
|
/* Check if we can create a compressed hypertable with existing
|
||||||
* constraints and indexes. */
|
* constraints and indexes. */
|
||||||
List *indexes = NIL;
|
List *indexes = NIL;
|
||||||
constraint_list = validate_existing_constraints(ht, &compress_cols, &indexes);
|
constraint_list = validate_existing_constraints(ht, settings, &indexes);
|
||||||
validate_existing_indexes(ht, &compress_cols, indexes);
|
validate_existing_indexes(ht, settings, indexes);
|
||||||
list_free(indexes);
|
list_free(indexes);
|
||||||
|
|
||||||
compresscolinfo_add_metadata_columns(&compress_cols);
|
|
||||||
|
|
||||||
Oid tablespace_oid = get_rel_tablespace(ht->main_table_relid);
|
Oid tablespace_oid = get_rel_tablespace(ht->main_table_relid);
|
||||||
compress_htid = create_compression_table(ownerid, &compress_cols, tablespace_oid);
|
compress_htid = create_compression_table(ownerid, settings, column_defs, tablespace_oid);
|
||||||
ts_hypertable_set_compressed(ht, compress_htid);
|
ts_hypertable_set_compressed(ht, compress_htid);
|
||||||
|
|
||||||
if (!with_clause_options[CompressChunkTimeInterval].is_default)
|
if (!with_clause_options[CompressChunkTimeInterval].is_default)
|
||||||
@ -1169,8 +1126,6 @@ static void
|
|||||||
compression_settings_update(CompressionSettings *settings, WithClauseResult *with_clause_options,
|
compression_settings_update(CompressionSettings *settings, WithClauseResult *with_clause_options,
|
||||||
List *segmentby_cols, List *orderby_cols)
|
List *segmentby_cols, List *orderby_cols)
|
||||||
{
|
{
|
||||||
bool need_update = false;
|
|
||||||
|
|
||||||
/* orderby arrays should always be in sync either all NULL or none */
|
/* orderby arrays should always be in sync either all NULL or none */
|
||||||
Assert(
|
Assert(
|
||||||
(settings->fd.orderby && settings->fd.orderby_desc && settings->fd.orderby_nullsfirst) ||
|
(settings->fd.orderby && settings->fd.orderby_desc && settings->fd.orderby_nullsfirst) ||
|
||||||
@ -1195,7 +1150,6 @@ compression_settings_update(CompressionSettings *settings, WithClauseResult *wit
|
|||||||
{
|
{
|
||||||
settings->fd.segmentby = NULL;
|
settings->fd.segmentby = NULL;
|
||||||
}
|
}
|
||||||
need_update = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!with_clause_options[CompressOrderBy].is_default)
|
if (!with_clause_options[CompressOrderBy].is_default)
|
||||||
@ -1241,7 +1195,6 @@ compression_settings_update(CompressionSettings *settings, WithClauseResult *wit
|
|||||||
settings->fd.orderby_desc = NULL;
|
settings->fd.orderby_desc = NULL;
|
||||||
settings->fd.orderby_nullsfirst = NULL;
|
settings->fd.orderby_nullsfirst = NULL;
|
||||||
}
|
}
|
||||||
need_update = true;
|
|
||||||
}
|
}
|
||||||
else if (orderby_cols && !settings->fd.orderby)
|
else if (orderby_cols && !settings->fd.orderby)
|
||||||
{
|
{
|
||||||
@ -1275,14 +1228,9 @@ compression_settings_update(CompressionSettings *settings, WithClauseResult *wit
|
|||||||
1,
|
1,
|
||||||
true,
|
true,
|
||||||
TYPALIGN_CHAR);
|
TYPALIGN_CHAR);
|
||||||
need_update = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ts_compression_settings_update(settings);
|
ts_compression_settings_update(settings);
|
||||||
if (need_update)
|
|
||||||
{
|
|
||||||
ts_compression_settings_update(settings);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add a column to a table that has compression enabled
|
/* Add a column to a table that has compression enabled
|
||||||
@ -1291,12 +1239,7 @@ compression_settings_update(CompressionSettings *settings, WithClauseResult *wit
|
|||||||
void
|
void
|
||||||
tsl_process_compress_table_add_column(Hypertable *ht, ColumnDef *orig_def)
|
tsl_process_compress_table_add_column(Hypertable *ht, ColumnDef *orig_def)
|
||||||
{
|
{
|
||||||
struct CompressColInfo compress_cols = { 0 };
|
|
||||||
Oid coloid;
|
|
||||||
char *colname = orig_def->colname;
|
|
||||||
|
|
||||||
ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);
|
ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);
|
||||||
|
|
||||||
if (!TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht))
|
if (!TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
@ -1304,27 +1247,16 @@ tsl_process_compress_table_add_column(Hypertable *ht, ColumnDef *orig_def)
|
|||||||
|
|
||||||
Hypertable *compress_ht = ts_hypertable_get_by_id(ht->fd.compressed_hypertable_id);
|
Hypertable *compress_ht = ts_hypertable_get_by_id(ht->fd.compressed_hypertable_id);
|
||||||
/* don't add column if it already exists */
|
/* don't add column if it already exists */
|
||||||
if (get_attnum(compress_ht->main_table_relid, colname) != InvalidAttrNumber)
|
if (get_attnum(compress_ht->main_table_relid, orig_def->colname) != InvalidAttrNumber)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
TypeName *orig_typname = orig_def->typeName;
|
Oid coloid = LookupTypeNameOid(NULL, orig_def->typeName, false);
|
||||||
coloid = LookupTypeNameOid(NULL, orig_typname, false);
|
ColumnDef *coldef = build_columndef_singlecolumn(orig_def->colname, coloid);
|
||||||
compresscolinfo_init_singlecolumn(&compress_cols, colname, coloid);
|
CompressionSettings *settings = ts_compression_settings_get(ht->main_table_relid);
|
||||||
|
|
||||||
if (TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht))
|
add_column_to_compression_table(compress_ht, settings, coldef);
|
||||||
{
|
|
||||||
int32 compress_htid = ht->fd.compressed_hypertable_id;
|
|
||||||
Hypertable *compress_ht = ts_hypertable_get_by_id(compress_htid);
|
|
||||||
compress_cols.settings = ts_compression_settings_get(ht->main_table_relid);
|
|
||||||
|
|
||||||
add_column_to_compression_table(compress_ht, &compress_cols);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
Assert(TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(ht));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Drop a column from a table that has compression enabled
|
/* Drop a column from a table that has compression enabled
|
||||||
|
Loading…
x
Reference in New Issue
Block a user