Allow altering compression options

We now allow changing the compression options on a hypertable
as long as there are no existing compressed chunks.
This commit is contained in:
Matvey Arye 2019-08-16 14:58:48 -04:00 committed by Matvey Arye
parent eba612ea2e
commit cdf6fcb69a
12 changed files with 179 additions and 15 deletions

View File

@ -54,6 +54,7 @@
#include "hypertable_cache.h"
#include "cache.h"
#include "bgw_policy/chunk_stats.h"
#include "scan_iterator.h"
TS_FUNCTION_INFO_V1(ts_chunk_show_chunks);
TS_FUNCTION_INFO_V1(ts_chunk_drop_chunks);
@ -1749,6 +1750,41 @@ ts_chunk_delete_by_hypertable_id(int32 hypertable_id)
CurrentMemoryContext);
}
static void
init_scan_by_hypertable_id(ScanIterator *iterator, int32 hypertable_id)
{
iterator->ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_HYPERTABLE_ID_INDEX);
ts_scan_iterator_scan_key_init(iterator,
Anum_chunk_hypertable_id_idx_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(hypertable_id));
}
bool
ts_chunk_exists_with_compression(int32 hypertable_id)
{
ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
bool found = false;
init_scan_by_hypertable_id(&iterator, hypertable_id);
ts_scanner_foreach(&iterator)
{
bool isnull;
heap_getattr(ts_scan_iterator_tuple_info(&iterator)->tuple,
Anum_chunk_compressed_chunk_id,
ts_scan_iterator_tuple_info(&iterator)->desc,
&isnull);
if (!isnull)
{
found = true;
break;
}
}
ts_scan_iterator_close(&iterator);
return found;
}
static ChunkResult
chunk_recreate_constraint(ChunkScanCtx *ctx, Chunk *chunk)
{

View File

@ -101,6 +101,8 @@ extern TSDLLEXPORT Chunk *ts_chunk_get_by_relid(Oid relid, int16 num_constraints
bool fail_if_not_found);
extern bool ts_chunk_exists(const char *schema_name, const char *table_name);
extern bool ts_chunk_exists_relid(Oid relid);
extern TSDLLEXPORT bool ts_chunk_exists_with_compression(int32 hypertable_id);
extern void ts_chunk_recreate_all_constraints_for_dimension(Hyperspace *hs, int32 dimension_id);
extern int ts_chunk_delete_by_relid(Oid chunk_oid);
extern int ts_chunk_delete_by_hypertable_id(int32 hypertable_id);

View File

@ -454,7 +454,7 @@ drop_continuous_agg(ContinuousAgg *agg, bool drop_user_view)
ts_hypertable_drop_trigger(raw_hypertable, CAGGINVAL_TRIGGER_NAME);
/* delete the materialization table */
ts_hypertable_drop(mat_hypertable);
ts_hypertable_drop(mat_hypertable, DROP_CASCADE);
if (OidIsValid(partial_view.objectId))
performDeletion(&partial_view, DROP_RESTRICT, 0);

View File

@ -593,7 +593,7 @@ ts_hypertable_delete_by_name(const char *schema_name, const char *table_name)
}
void
ts_hypertable_drop(Hypertable *hypertable)
ts_hypertable_drop(Hypertable *hypertable, DropBehavior behavior)
{
ObjectAddress hypertable_addr = (ObjectAddress){
.classId = RelationRelationId,
@ -601,7 +601,7 @@ ts_hypertable_drop(Hypertable *hypertable)
};
/* Drop the postgres table */
performDeletion(&hypertable_addr, DROP_CASCADE, 0);
performDeletion(&hypertable_addr, behavior, 0);
/* Clean up catalog */
ts_hypertable_delete_by_name(hypertable->fd.schema_name.data, hypertable->fd.table_name.data);
}

View File

@ -92,7 +92,7 @@ extern int ts_hypertable_delete_by_name(const char *schema_name, const char *tab
extern TSDLLEXPORT ObjectAddress ts_hypertable_create_trigger(Hypertable *ht, CreateTrigStmt *stmt,
const char *query);
extern TSDLLEXPORT void ts_hypertable_drop_trigger(Hypertable *ht, const char *trigger_name);
extern void ts_hypertable_drop(Hypertable *hypertable);
extern TSDLLEXPORT void ts_hypertable_drop(Hypertable *hypertable, DropBehavior behavior);
extern int ts_hypertable_reset_associated_schema_name(const char *associated_schema);
extern TSDLLEXPORT Oid ts_hypertable_id_to_relid(int32 hypertable_id);

View File

@ -108,3 +108,26 @@ get_hypertablecompression_info(int32 htid)
}
return fdlist;
}
bool
hypertable_compression_delete_by_hypertable_id(int32 htid)
{
int count = 0;
ScanIterator iterator =
ts_scan_iterator_create(HYPERTABLE_COMPRESSION, RowExclusiveLock, CurrentMemoryContext);
iterator.ctx.index =
catalog_get_index(ts_catalog_get(), HYPERTABLE_COMPRESSION, HYPERTABLE_COMPRESSION_PKEY);
ts_scan_iterator_scan_key_init(&iterator,
Anum_hypertable_compression_pkey_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(htid));
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
ts_catalog_delete(ti->scanrel, ti->tuple);
count++;
}
return count > 0;
}

View File

@ -16,4 +16,6 @@ extern TSDLLEXPORT void
hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values,
bool *nulls);
extern TSDLLEXPORT bool hypertable_compression_delete_by_hypertable_id(int32 htid);
#endif

View File

@ -2680,7 +2680,6 @@ process_altertable_set_options(AlterTableCmd *cmd, Hypertable *ht)
List *pg_options = NIL, *compress_options = NIL;
WithClauseResult *parse_results = NULL;
List *inpdef = NIL;
bool is_compress = false;
/* is this a compress table stmt */
Assert(IsA(cmd->def, List));
inpdef = (List *) cmd->def;
@ -2688,10 +2687,13 @@ process_altertable_set_options(AlterTableCmd *cmd, Hypertable *ht)
if (compress_options)
{
parse_results = ts_compress_hypertable_set_clause_parse(compress_options);
is_compress = DatumGetBool(parse_results[CompressEnabled].parsed);
if (parse_results[CompressEnabled].is_default)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("must set the 'compress' boolean option when setting compression "
"options")));
}
if (!is_compress)
else
return false;
if (pg_options != NIL)

View File

@ -404,6 +404,11 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
AttrNumber count_metadata_column_num = attno_find_by_attname(out_desc, count_metadata_name);
int16 count_metadata_column_offset;
Oid internal_schema_oid = LookupExplicitNamespace(INTERNAL_SCHEMA_NAME, false);
Oid compressed_data_type_oid = GetSysCacheOid2(TYPENAMENSP,
PointerGetDatum("compressed_data"),
ObjectIdGetDatum(internal_schema_oid));
if (count_metadata_column_num == InvalidAttrNumber)
elog(ERROR,
"missing metadata column '%s' in compressed table",
@ -436,11 +441,17 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
Form_pg_attribute column_attr = TupleDescAttr(uncompressed_tuple_desc, in_column_offset);
AttrNumber compressed_colnum =
attno_find_by_attname(out_desc, (Name) &compression_info->attname);
Form_pg_attribute compressed_column_attr =
TupleDescAttr(out_desc, AttrNumberGetAttrOffset(compressed_colnum));
row_compressor->uncompressed_col_to_compressed_col[in_column_offset] =
AttrNumberGetAttrOffset(compressed_colnum);
Assert(AttrNumberGetAttrOffset(compressed_colnum) < num_columns_in_compressed_table);
if (!COMPRESSIONCOL_IS_SEGMENT_BY(compression_info))
{
if (compressed_column_attr->atttypid != compressed_data_type_oid)
elog(ERROR,
"expected column '%s' to be a compressed data type",
compression_info->attname.data);
*column = (PerColumn){
.compressor = compressor_for_algorithm_and_type(compression_info->algo_id,
column_attr->atttypid),
@ -448,6 +459,10 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
}
else
{
if (column_attr->atttypid != compressed_column_attr->atttypid)
elog(ERROR,
"expected segment by column '%s' to be same type as uncompressed column",
compression_info->attname.data);
*column = (PerColumn){
.segment_info = segment_info_new(column_attr),
};

View File

@ -394,18 +394,57 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
{
int32 compress_htid;
struct CompressColInfo compress_cols;
bool compress_enable = DatumGetBool(with_clause_options[CompressEnabled].parsed);
Oid ownerid = ts_rel_get_owner(ht->main_table_relid);
List *segmentby_cols = ts_compress_hypertable_parse_segment_by(with_clause_options, ht);
List *orderby_cols = ts_compress_hypertable_parse_order_by(with_clause_options, ht);
orderby_cols = add_time_to_order_by_if_not_included(orderby_cols, segmentby_cols, ht);
bool compression_already_enabled = ht->fd.compressed_hypertable_id != INVALID_HYPERTABLE_ID;
bool compressed_chunks_exist =
compression_already_enabled && ts_chunk_exists_with_compression(ht->fd.id);
/* we need an AccessShare lock on the hypertable so that there are
* no DDL changes while we create the compressed hypertable
*/
LockRelationOid(ht->main_table_relid, AccessShareLock);
if (!compress_enable)
{
if (compression_already_enabled)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("disabling compression is not yet supported")));
/* compression is not enabled, so just return */
return false;
}
if (compressed_chunks_exist)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot change compression options if compressed chunks exist")));
/* Require both order by and segment by when altering because otherwise it's not clear what
* the default value means: does it mean leave as-is or is it an empty list. */
if (compression_already_enabled && (with_clause_options[CompressOrderBy].is_default ||
with_clause_options[CompressSegmentBy].is_default))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("need to specify both compress_orderby and compress_groupby if altering "
"compression")));
compresscolinfo_init(&compress_cols, ht->main_table_relid, segmentby_cols, orderby_cols);
LockRelationOid(ht->main_table_relid, AccessShareLock);
if (compression_already_enabled)
{
Hypertable *compressed = ts_hypertable_get_by_id(ht->fd.compressed_hypertable_id);
if (compressed == NULL)
elog(ERROR, "compression enabled but no compressed hypertable found");
/* need to drop the old compressed hypertable in case the segment by columns changed (and
* thus the column types of compressed hypertable need to change) */
ts_hypertable_drop(compressed, DROP_RESTRICT);
hypertable_compression_delete_by_hypertable_id(ht->fd.id);
}
compress_htid = create_compression_table(ownerid, &compress_cols);
/* block concurrent DDL that creates same compressed hypertable*/
LockRelationOid(catalog_get_table_id(ts_catalog_get(), HYPERTABLE), RowExclusiveLock);

View File

@ -28,12 +28,12 @@ NOTICE: adding not-null constraint to column "a"
(1 row)
insert into non_compressed values( 3 , 16 , 20, 4);
ALTER TABLE foo2 set (timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c');
ERROR: must set the 'compress' boolean option when setting compression options
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c');
ERROR: cannot use the same column c in compress_orderby and compress_segmentby
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd');
--TODO: allow changing the options if not chunks compressed
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd DESC');
ERROR: duplicate key value violates unique constraint "hypertable_compression_pkey"
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd');
--note that the time column "a" should be added to the end of the orderby list
select * from _timescaledb_catalog.hypertable_compression order by attname;
hypertable_id | attname | compression_algorithm_id | segmentby_column_index | orderby_column_index | orderby_asc | orderby_nullsfirst
@ -46,6 +46,10 @@ select * from _timescaledb_catalog.hypertable_compression order by attname;
ALTER TABLE foo3 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd DeSc NullS lAsT');
-- Negative test cases ---
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c');
ERROR: need to specify both compress_orderby and compress_groupby if altering compression
ALTER TABLE foo2 set (timescaledb.compress='f');
ERROR: disabling compression is not yet supported
create table reserved_column_prefix (a integer, _ts_meta_foo integer, "bacB toD" integer, c integer, d integer);
select table_name from create_hypertable('reserved_column_prefix', 'a', chunk_time_interval=> 10);
NOTICE: adding not-null constraint to column "a"
@ -138,6 +142,8 @@ select hc.* from _timescaledb_catalog.hypertable_compression hc inner join _time
select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'foo' limit 1;
ERROR: chunk "_hyper_8_2_chunk" is not a compressed
--test changing the segment by columns
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'a', timescaledb.compress_segmentby = 'b');
--should succeed
select compress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'foo' limit 1;
@ -152,6 +158,27 @@ ERROR: chunk is already compressed
select compress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'non_compressed' limit 1;
ERROR: chunks can be compressed only if compression property is set on the hypertable
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'a', timescaledb.compress_segmentby = 'c');
ERROR: cannot change compression options if compressed chunks exist
select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'non_compressed' limit 1;
ERROR: missing compressed hypertable
--should succeed
select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'foo' and ch1.compressed_chunk_id IS NOT NULL;
decompress_chunk
------------------
(1 row)
--should succeed
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'a', timescaledb.compress_segmentby = 'b');
select hc.* from _timescaledb_catalog.hypertable_compression hc inner join _timescaledb_catalog.hypertable h on (h.id = hc.hypertable_id) where h.table_name = 'foo' order by attname;
hypertable_id | attname | compression_algorithm_id | segmentby_column_index | orderby_column_index | orderby_asc | orderby_nullsfirst
---------------+---------+--------------------------+------------------------+----------------------+-------------+--------------------
8 | a | 4 | | 1 | t | f
8 | b | 0 | 1 | | |
8 | c | 4 | | | |
8 | t | 2 | | | |
(4 rows)

View File

@ -15,10 +15,10 @@ create table non_compressed (a integer, "bacB toD" integer, c integer, d integer
select table_name from create_hypertable('non_compressed', 'a', chunk_time_interval=> 10);
insert into non_compressed values( 3 , 16 , 20, 4);
ALTER TABLE foo2 set (timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c');
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c');
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd');
--TODO: allow changing the options if not chunks compressed
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd DESC');
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd');
--note that the time column "a" should be added to the end of the orderby list
select * from _timescaledb_catalog.hypertable_compression order by attname;
@ -27,6 +27,10 @@ ALTER TABLE foo3 set (timescaledb.compress, timescaledb.compress_segmentby = '"b
-- Negative test cases ---
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c');
ALTER TABLE foo2 set (timescaledb.compress='f');
create table reserved_column_prefix (a integer, _ts_meta_foo integer, "bacB toD" integer, c integer, d integer);
select table_name from create_hypertable('reserved_column_prefix', 'a', chunk_time_interval=> 10);
ALTER TABLE reserved_column_prefix set (timescaledb.compress);
@ -78,6 +82,9 @@ select hc.* from _timescaledb_catalog.hypertable_compression hc inner join _time
select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'foo' limit 1;
--test changing the segment by columns
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'a', timescaledb.compress_segmentby = 'b');
--should succeed
select compress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'foo' limit 1;
@ -88,5 +95,16 @@ FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch
select compress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'non_compressed' limit 1;
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'a', timescaledb.compress_segmentby = 'c');
select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'non_compressed' limit 1;
--should succeed
select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name)
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'foo' and ch1.compressed_chunk_id IS NOT NULL;
--should succeed
ALTER TABLE foo set (timescaledb.compress, timescaledb.compress_orderby = 'a', timescaledb.compress_segmentby = 'b');
select hc.* from _timescaledb_catalog.hypertable_compression hc inner join _timescaledb_catalog.hypertable h on (h.id = hc.hypertable_id) where h.table_name = 'foo' order by attname;