diff --git a/sql/bgw_scheduler.sql b/sql/bgw_scheduler.sql index 11f164ccc..efa6d0a58 100644 --- a/sql/bgw_scheduler.sql +++ b/sql/bgw_scheduler.sql @@ -30,6 +30,11 @@ CREATE OR REPLACE FUNCTION add_reorder_policy(hypertable REGCLASS, index_name NA AS '@MODULE_PATHNAME@', 'ts_add_reorder_policy' LANGUAGE C VOLATILE STRICT; +CREATE OR REPLACE FUNCTION add_compress_chunks_policy(hypertable REGCLASS, older_than "any", if_not_exists BOOL = false) +RETURNS INTEGER +AS '@MODULE_PATHNAME@', 'ts_add_compress_chunks_policy' +LANGUAGE C VOLATILE STRICT; + CREATE OR REPLACE FUNCTION remove_drop_chunks_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_remove_drop_chunks_policy' LANGUAGE C VOLATILE STRICT; @@ -38,6 +43,10 @@ CREATE OR REPLACE FUNCTION remove_reorder_policy(hypertable REGCLASS, if_exists AS '@MODULE_PATHNAME@', 'ts_remove_reorder_policy' LANGUAGE C VOLATILE STRICT; +CREATE OR REPLACE FUNCTION remove_compress_chunks_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS BOOL +AS '@MODULE_PATHNAME@', 'ts_remove_compress_chunks_policy' +LANGUAGE C VOLATILE STRICT; + -- Returns the updated job schedule values CREATE OR REPLACE FUNCTION alter_job_schedule( job_id INTEGER, diff --git a/sql/pre_install/tables.sql b/sql/pre_install/tables.sql index 85610ee11..ca07cdc52 100644 --- a/sql/pre_install/tables.sql +++ b/sql/pre_install/tables.sql @@ -174,7 +174,7 @@ CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_job ( max_runtime INTERVAL NOT NULL, max_retries INT NOT NULL, retry_period INTERVAL NOT NULL, - CONSTRAINT valid_job_type CHECK (job_type IN ('telemetry_and_version_check_if_enabled', 'reorder', 'drop_chunks', 'continuous_aggregate')) + CONSTRAINT valid_job_type CHECK (job_type IN ('telemetry_and_version_check_if_enabled', 'reorder', 'drop_chunks', 'continuous_aggregate', 'compress_chunks')) ); ALTER SEQUENCE _timescaledb_config.bgw_job_id_seq OWNED BY _timescaledb_config.bgw_job.id; @@ -332,14 +332,14 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_chunk_size ( ); SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', ''); -CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_compress_chunks_policy( - hypertable_id INTEGER REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE, - older_than BIGINT NOT NULL, - job_id SMALLINT REFERENCES _timescaledb_config.bgw_job(id), - UNIQUE (hypertable_id, job_id) +CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_policy_compress_chunks( + job_id INTEGER PRIMARY KEY REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE, + hypertable_id INTEGER UNIQUE NOT NULL REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE, + older_than _timescaledb_catalog.ts_interval NOT NULL, + CONSTRAINT valid_older_than CHECK(_timescaledb_internal.valid_ts_interval(older_than)) ); -SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_compress_chunks_policy', ''); +SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_policy_compress_chunks', ''); -- Set table permissions diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index c5f095eef..9e66623c8 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -156,20 +156,19 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_chunk_size ( ); SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', ''); -CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_compress_chunks_policy( - hypertable_id INTEGER REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE, - older_than BIGINT NOT NULL, - job_id SMALLINT REFERENCES _timescaledb_config.bgw_job(id), - UNIQUE (hypertable_id, job_id) +CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_policy_compress_chunks( + job_id INTEGER PRIMARY KEY REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE, + hypertable_id INTEGER UNIQUE NOT NULL REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE, + older_than _timescaledb_catalog.ts_interval NOT NULL, + CONSTRAINT valid_older_than CHECK(_timescaledb_internal.valid_ts_interval(older_than)) ); -SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_compress_chunks_policy', ''); - +SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_policy_compress_chunks', ''); GRANT SELECT ON _timescaledb_catalog.compression_algorithm TO PUBLIC; GRANT SELECT ON _timescaledb_catalog.hypertable_compression TO PUBLIC; GRANT SELECT ON _timescaledb_catalog.compression_chunk_size TO PUBLIC; -GRANT SELECT ON _timescaledb_config.bgw_compress_chunks_policy TO PUBLIC; +GRANT SELECT ON _timescaledb_config.bgw_policy_compress_chunks TO PUBLIC; CREATE TYPE _timescaledb_internal.compressed_data; diff --git a/src/bgw/job.c b/src/bgw/job.c index 0cdd21851..04bbd0f2e 100644 --- a/src/bgw/job.c +++ b/src/bgw/job.c @@ -27,6 +27,7 @@ #include "telemetry/telemetry.h" #include "bgw_policy/chunk_stats.h" #include "bgw_policy/drop_chunks.h" +#include "bgw_policy/compress_chunks.h" #include "bgw_policy/reorder.h" #include "scan_iterator.h" @@ -39,6 +40,7 @@ static const char *job_type_names[_MAX_JOB_TYPE] = { [JOB_TYPE_REORDER] = "reorder", [JOB_TYPE_DROP_CHUNKS] = "drop_chunks", [JOB_TYPE_CONTINUOUS_AGGREGATE] = "continuous_aggregate", + [JOB_TYPE_COMPRESS_CHUNKS] = "compress_chunks", [JOB_TYPE_UNKNOWN] = "unknown", }; @@ -110,6 +112,13 @@ ts_bgw_job_owner(BgwJob *job) return ts_rel_get_owner(ts_continuous_agg_get_user_view_oid(ca)); } + case JOB_TYPE_COMPRESS_CHUNKS: + { + BgwPolicyCompressChunks *policy = ts_bgw_policy_compress_chunks_find_by_job(job->fd.id); + if (policy == NULL) + elog(ERROR, "compress chunks policy for job with id \"%d\" not found", job->fd.id); + return ts_rel_get_owner(ts_hypertable_id_to_relid(policy->fd.hypertable_id)); + } case JOB_TYPE_UNKNOWN: if (unknown_job_type_owner_hook != NULL) return unknown_job_type_owner_hook(job); @@ -473,6 +482,7 @@ ts_bgw_job_execute(BgwJob *job) case JOB_TYPE_REORDER: case JOB_TYPE_DROP_CHUNKS: case JOB_TYPE_CONTINUOUS_AGGREGATE: + case JOB_TYPE_COMPRESS_CHUNKS: return ts_cm_functions->bgw_policy_job_execute(job); case JOB_TYPE_UNKNOWN: if (unknown_job_type_hook != NULL) diff --git a/src/bgw/job.h b/src/bgw/job.h index 002510e2f..5f3e000fb 100644 --- a/src/bgw/job.h +++ b/src/bgw/job.h @@ -19,6 +19,7 @@ typedef enum JobType JOB_TYPE_REORDER, JOB_TYPE_DROP_CHUNKS, JOB_TYPE_CONTINUOUS_AGGREGATE, + JOB_TYPE_COMPRESS_CHUNKS, /* end of real jobs */ JOB_TYPE_UNKNOWN, _MAX_JOB_TYPE diff --git a/src/bgw/job_stat.h b/src/bgw/job_stat.h index d9754895c..565095ec5 100644 --- a/src/bgw/job_stat.h +++ b/src/bgw/job_stat.h @@ -22,7 +22,7 @@ typedef enum JobResult extern TSDLLEXPORT BgwJobStat *ts_bgw_job_stat_find(int job_id); extern void ts_bgw_job_stat_delete(int job_id); -extern void ts_bgw_job_stat_mark_start(int32 bgw_job_id); +extern TSDLLEXPORT void ts_bgw_job_stat_mark_start(int32 bgw_job_id); extern void ts_bgw_job_stat_mark_end(BgwJob *job, JobResult result); extern bool ts_bgw_job_stat_end_was_marked(BgwJobStat *jobstat); diff --git a/src/bgw_policy/CMakeLists.txt b/src/bgw_policy/CMakeLists.txt index 0e35ac7d2..83b2530ac 100644 --- a/src/bgw_policy/CMakeLists.txt +++ b/src/bgw_policy/CMakeLists.txt @@ -1,6 +1,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/reorder.c ${CMAKE_CURRENT_SOURCE_DIR}/drop_chunks.c + ${CMAKE_CURRENT_SOURCE_DIR}/compress_chunks.c ${CMAKE_CURRENT_SOURCE_DIR}/policy.c ${CMAKE_CURRENT_SOURCE_DIR}/chunk_stats.c ) diff --git a/src/bgw_policy/compress_chunks.c b/src/bgw_policy/compress_chunks.c new file mode 100644 index 000000000..784384201 --- /dev/null +++ b/src/bgw_policy/compress_chunks.c @@ -0,0 +1,157 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ + +#include +#include + +#include "bgw/job.h" +#include "catalog.h" +#include "compress_chunks.h" +#include "hypertable.h" +#include "interval.h" +#include "policy.h" +#include "scanner.h" +#include "scan_iterator.h" +#include "utils.h" + +static ScanTupleResult +bgw_policy_compress_chunks_tuple_found(TupleInfo *ti, void *const data) +{ + BgwPolicyCompressChunks **policy = data; + bool nulls[Natts_bgw_policy_compress_chunks]; + Datum values[Natts_bgw_policy_compress_chunks]; + + heap_deform_tuple(ti->tuple, ti->desc, values, nulls); + + *policy = MemoryContextAllocZero(ti->mctx, sizeof(BgwPolicyCompressChunks)); + Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)]); + (*policy)->fd.job_id = + DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)]); + + Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)]); + (*policy)->fd.hypertable_id = DatumGetInt32( + values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)]); + + Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)]); + + (*policy)->fd.older_than = *ts_interval_from_tuple( + values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)]); + + return SCAN_CONTINUE; +} + +static ScanTupleResult +compress_policy_delete_row_tuple_found(TupleInfo *ti, void *const data) +{ + CatalogSecurityContext sec_ctx; + + ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx); + ts_catalog_delete(ti->scanrel, ti->tuple); + ts_catalog_restore_user(&sec_ctx); + + return SCAN_CONTINUE; +} + +/*deletes only from the compress_chunks policy table. need to remove the job separately */ +bool +ts_bgw_policy_compress_chunks_delete(int32 job_id) +{ + ScanKeyData scankey[1]; + + ScanKeyInit(&scankey[0], + Anum_bgw_policy_compress_chunks_pkey_job_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(job_id)); + + return ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS, + BGW_POLICY_COMPRESS_CHUNKS_PKEY, + scankey, + 1, + compress_policy_delete_row_tuple_found, + RowExclusiveLock, + BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME, + NULL); +} + +BgwPolicyCompressChunks * +ts_bgw_policy_compress_chunks_find_by_hypertable(int32 hypertable_id) +{ + ScanKeyData scankey[1]; + BgwPolicyCompressChunks *ret = NULL; + + ScanKeyInit(&scankey[0], + Anum_bgw_policy_compress_chunks_hypertable_id_key_hypertable_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(hypertable_id)); + + ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS, + BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY, + scankey, + 1, + bgw_policy_compress_chunks_tuple_found, + RowExclusiveLock, + BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME, + (void *) &ret); + + return ret; +} + +void +ts_bgw_policy_compress_chunks_insert(BgwPolicyCompressChunks *policy) +{ + TupleDesc tupdesc; + CatalogSecurityContext sec_ctx; + Datum values[Natts_bgw_policy_compress_chunks]; + bool nulls[Natts_bgw_policy_compress_chunks] = { false }; + HeapTuple ht_older_than; + Catalog *catalog = ts_catalog_get(); + Relation rel = + heap_open(catalog_get_table_id(catalog, BGW_POLICY_COMPRESS_CHUNKS), RowExclusiveLock); + + tupdesc = RelationGetDescr(rel); + + values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)] = + Int32GetDatum(policy->fd.job_id); + values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)] = + Int32GetDatum(policy->fd.hypertable_id); + + ht_older_than = ts_interval_form_heaptuple(&policy->fd.older_than); + + values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)] = + HeapTupleGetDatum(ht_older_than); + + ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx); + ts_catalog_insert_values(rel, tupdesc, values, nulls); + ts_catalog_restore_user(&sec_ctx); + heap_freetuple(ht_older_than); + heap_close(rel, RowExclusiveLock); +} + +BgwPolicyCompressChunks * +ts_bgw_policy_compress_chunks_find_by_job(int32 job_id) +{ + ScanKeyData scankey[1]; + BgwPolicyCompressChunks *ret = NULL; + + ScanKeyInit(&scankey[0], + Anum_bgw_policy_compress_chunks_pkey_job_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(job_id)); + + ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS, + BGW_POLICY_COMPRESS_CHUNKS_PKEY, + scankey, + 1, + bgw_policy_compress_chunks_tuple_found, + RowExclusiveLock, + BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME, + (void *) &ret); + + return ret; +} diff --git a/src/bgw_policy/compress_chunks.h b/src/bgw_policy/compress_chunks.h new file mode 100644 index 000000000..c7b4deb99 --- /dev/null +++ b/src/bgw_policy/compress_chunks.h @@ -0,0 +1,23 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ + +#ifndef TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H +#define TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H + +#include "catalog.h" +#include "export.h" + +typedef struct BgwPolicyCompressChunks +{ + FormData_bgw_policy_compress_chunks fd; +} BgwPolicyCompressChunks; + +extern TSDLLEXPORT BgwPolicyCompressChunks *ts_bgw_policy_compress_chunks_find_by_job(int32 job_id); +extern TSDLLEXPORT BgwPolicyCompressChunks * +ts_bgw_policy_compress_chunks_find_by_hypertable(int32 hypertable_id); +extern TSDLLEXPORT void ts_bgw_policy_compress_chunks_insert(BgwPolicyCompressChunks *policy); +extern TSDLLEXPORT bool ts_bgw_policy_compress_chunks_delete(int32 job_id); +#endif /* TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H */ diff --git a/src/catalog.c b/src/catalog.c index ab44dd780..3fdd33218 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -106,9 +106,9 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = { .schema_name = CATALOG_SCHEMA_NAME, .table_name = COMPRESSION_CHUNK_SIZE_TABLE_NAME, }, - [BGW_COMPRESS_CHUNKS_POLICY] = { + [BGW_POLICY_COMPRESS_CHUNKS] = { .schema_name = CONFIG_SCHEMA_NAME, - .table_name = BGW_COMPRESS_CHUNKS_POLICY_TABLE_NAME, + .table_name = BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME, }, [_MAX_CATALOG_TABLES] = { .schema_name = "invalid schema", @@ -250,10 +250,11 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES] [COMPRESSION_CHUNK_SIZE_PKEY] = "compression_chunk_size_pkey", }, }, - [BGW_COMPRESS_CHUNKS_POLICY] = { - .length = _MAX_BGW_COMPRESS_CHUNKS_POLICY_INDEX, + [BGW_POLICY_COMPRESS_CHUNKS] = { + .length = _MAX_BGW_POLICY_COMPRESS_CHUNKS_INDEX, .names = (char *[]) { - [BGW_COMPRESS_CHUNKS_POLICY_HYPERTABLE_ID_JOB_ID_KEY] = "bgw_compress_chunks_policy_hypertable_id_job_id_key", + [BGW_POLICY_COMPRESS_CHUNKS_PKEY] = "bgw_policy_compress_chunks_pkey", + [BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY] = "bgw_policy_compress_chunks_hypertable_id_key", }, }, }; @@ -276,7 +277,7 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = { [CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG] = NULL, [HYPERTABLE_COMPRESSION] = NULL, [COMPRESSION_CHUNK_SIZE] = NULL, - [BGW_COMPRESS_CHUNKS_POLICY] = NULL, + [BGW_POLICY_COMPRESS_CHUNKS] = NULL, }; typedef struct InternalFunctionDef diff --git a/src/catalog.h b/src/catalog.h index 579a4de08..b918b82c6 100644 --- a/src/catalog.h +++ b/src/catalog.h @@ -54,7 +54,7 @@ typedef enum CatalogTable CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG, HYPERTABLE_COMPRESSION, COMPRESSION_CHUNK_SIZE, - BGW_COMPRESS_CHUNKS_POLICY, + BGW_POLICY_COMPRESS_CHUNKS, _MAX_CATALOG_TABLES, } CatalogTable; @@ -1132,40 +1132,49 @@ typedef enum Anum_compression_chunk_size_pkey #define Natts_compression_chunk_size_pkey (_Anum_compression_chunk_size_pkey_max - 1) -#define BGW_COMPRESS_CHUNKS_POLICY_TABLE_NAME "bgw_compress_chunks_policy" -typedef enum Anum_bgw_compress_chunks_policy +#define BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME "bgw_policy_compress_chunks" +typedef enum Anum_bgw_policy_compress_chunks { - Anum_bgw_compress_chunks_policy_hypertable_id = 1, - Anum_bgw_compress_chunks_policy_older_than, - Anum_bgw_compress_chunks_policy_job_id, - _Anum_bgw_compress_chunks_policy_max, -} Anum_bgw_compress_chunks_policy; + Anum_bgw_policy_compress_chunks_job_id = 1, + Anum_bgw_policy_compress_chunks_hypertable_id, + Anum_bgw_policy_compress_chunks_older_than, + _Anum_bgw_policy_compress_chunks_max, +} Anum_bgw_policy_compress_chunks; -#define Natts_bgw_compress_chunks_policy (_Anum_bgw_compress_chunks_policy_max - 1) +#define Natts_bgw_policy_compress_chunks (_Anum_bgw_policy_compress_chunks_max - 1) -typedef struct FormData_bgw_compress_chunks_policy +typedef struct FormData_bgw_policy_compress_chunks { + int32 job_id; int32 hypertable_id; - int64 older_than; - int16 job_id; -} FormData_bgw_compress_chunks_policy; + FormData_ts_interval older_than; +} FormData_bgw_policy_compress_chunks; -typedef FormData_bgw_compress_chunks_policy *Form_bgw_compress_chunks_policy; +typedef FormData_bgw_policy_compress_chunks *Form_bgw_policy_compress_chunks; enum { - BGW_COMPRESS_CHUNKS_POLICY_HYPERTABLE_ID_JOB_ID_KEY = 0, - _MAX_BGW_COMPRESS_CHUNKS_POLICY_INDEX, + BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY = 0, + BGW_POLICY_COMPRESS_CHUNKS_PKEY, + _MAX_BGW_POLICY_COMPRESS_CHUNKS_INDEX, }; -typedef enum Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key -{ - Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key_hypertable_id = 1, - Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key_job_id, - _Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key_max, -} Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key; -#define Natts_bgw_compress_chunks_policy_hypertable_id_job_id_key \ - (_Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key_max - 1) +typedef enum Anum_bgw_policy_compress_chunks_hypertable_id_key +{ + Anum_bgw_policy_compress_chunks_hypertable_id_key_hypertable_id = 1, + _Anum_bgw_policy_compress_chunks_hypertable_id_key_max, +} Anum_bgw_policy_compress_chunks_hypertable_id_key; + +#define Natts_bgw_policy_compress_chunks_hypertable_id_key \ + (_Anum_bgw_policy_compress_chunks_hypertable_id_key_max - 1) + +typedef enum Anum_bgw_policy_compress_chunks_pkey +{ + Anum_bgw_policy_compress_chunks_pkey_job_id = 1, + _Anum_bgw_policy_compress_chunks_pkey_max, +} Anum_bgw_policy_compress_chunks_pkey; + +#define Natts_bgw_policy_compress_chunks_pkey (_Anum_bgw_policy_compress_chunks_pkey_max - 1) /* * The maximum number of indexes a catalog table can have. diff --git a/src/chunk.c b/src/chunk.c index de446d59f..57ceeece1 100644 --- a/src/chunk.c +++ b/src/chunk.c @@ -2310,3 +2310,27 @@ ts_chunks_in(PG_FUNCTION_ARGS) "with AND operator"))); pg_unreachable(); } + +/* has chunk ,specifieid by chunk_id, been compressed */ +bool +ts_chunk_is_compressed(int32 chunk_id) +{ + bool compressed = false; + ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext); + iterator.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX); + ts_scan_iterator_scan_key_init(&iterator, + Anum_chunk_idx_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(chunk_id)); + + ts_scanner_foreach(&iterator) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); + bool isnull; + heap_getattr(ti->tuple, Anum_chunk_compressed_chunk_id, ti->desc, &isnull); + compressed = !isnull; // isnull is false when compress chunk id is set + } + ts_scan_iterator_close(&iterator); + return compressed; +} diff --git a/src/chunk.h b/src/chunk.h index 4d1fadccc..517c18a25 100644 --- a/src/chunk.h +++ b/src/chunk.h @@ -119,6 +119,7 @@ extern TSDLLEXPORT List *ts_chunk_do_drop_chunks(Oid table_relid, Datum older_th Oid newer_than_type, bool cascade, bool cascades_to_materializations, int32 log_level); +extern TSDLLEXPORT bool ts_chunk_is_compressed(int32 chunk_id); #define chunk_get_by_name(schema_name, table_name, num_constraints, fail_if_not_found) \ ts_chunk_get_by_name_with_memory_context(schema_name, \ diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 68b4faedf..a24ce86aa 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -15,8 +15,10 @@ TS_FUNCTION_INFO_V1(ts_add_drop_chunks_policy); TS_FUNCTION_INFO_V1(ts_add_reorder_policy); +TS_FUNCTION_INFO_V1(ts_add_compress_chunks_policy); TS_FUNCTION_INFO_V1(ts_remove_drop_chunks_policy); TS_FUNCTION_INFO_V1(ts_remove_reorder_policy); +TS_FUNCTION_INFO_V1(ts_remove_compress_chunks_policy); TS_FUNCTION_INFO_V1(ts_alter_job_schedule); TS_FUNCTION_INFO_V1(ts_reorder_chunk); TS_FUNCTION_INFO_V1(ts_move_chunk); @@ -55,6 +57,12 @@ ts_add_reorder_policy(PG_FUNCTION_ARGS) PG_RETURN_DATUM(ts_cm_functions->add_reorder_policy(fcinfo)); } +Datum +ts_add_compress_chunks_policy(PG_FUNCTION_ARGS) +{ + PG_RETURN_DATUM(ts_cm_functions->add_compress_chunks_policy(fcinfo)); +} + Datum ts_remove_drop_chunks_policy(PG_FUNCTION_ARGS) { @@ -67,6 +75,12 @@ ts_remove_reorder_policy(PG_FUNCTION_ARGS) PG_RETURN_DATUM(ts_cm_functions->remove_reorder_policy(fcinfo)); } +Datum +ts_remove_compress_chunks_policy(PG_FUNCTION_ARGS) +{ + return ts_cm_functions->remove_compress_chunks_policy(fcinfo); +} + Datum ts_alter_job_schedule(PG_FUNCTION_ARGS) { @@ -388,8 +402,10 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .continuous_agg_materialize = cagg_materialize_default_fn, .add_drop_chunks_policy = error_no_default_fn_pg_enterprise, .add_reorder_policy = error_no_default_fn_pg_enterprise, + .add_compress_chunks_policy = error_no_default_fn_pg_enterprise, .remove_drop_chunks_policy = error_no_default_fn_pg_enterprise, .remove_reorder_policy = error_no_default_fn_pg_enterprise, + .remove_compress_chunks_policy = error_no_default_fn_pg_enterprise, .create_upper_paths_hook = NULL, .set_rel_pathlist_hook = NULL, .gapfill_marker = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 3a4215943..67d72f0dd 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -40,8 +40,10 @@ typedef struct CrossModuleFunctions bool (*continuous_agg_materialize)(int32 materialization_id, bool verbose); Datum (*add_drop_chunks_policy)(PG_FUNCTION_ARGS); Datum (*add_reorder_policy)(PG_FUNCTION_ARGS); + Datum (*add_compress_chunks_policy)(PG_FUNCTION_ARGS); Datum (*remove_drop_chunks_policy)(PG_FUNCTION_ARGS); Datum (*remove_reorder_policy)(PG_FUNCTION_ARGS); + Datum (*remove_compress_chunks_policy)(PG_FUNCTION_ARGS); void (*create_upper_paths_hook)(PlannerInfo *, UpperRelationKind, RelOptInfo *, RelOptInfo *); void (*set_rel_pathlist_hook)(PlannerInfo *, RelOptInfo *, Index, RangeTblEntry *, Hypertable *); diff --git a/src/dimension_slice.c b/src/dimension_slice.c index 9a7f95e70..8c76d22de 100644 --- a/src/dimension_slice.c +++ b/src/dimension_slice.c @@ -778,3 +778,44 @@ ts_dimension_slice_oldest_chunk_without_executed_job(int32 job_id, int32 dimensi return info.chunk_id; } + +static ScanTupleResult +dimension_slice_check_is_chunk_uncompressed_tuple_found(TupleInfo *ti, void *data) +{ + ListCell *lc; + DimensionSlice *slice = dimension_slice_from_tuple(ti->tuple); + List *chunk_ids = NIL; + + ts_chunk_constraint_scan_by_dimension_slice_to_list(slice, &chunk_ids, CurrentMemoryContext); + + foreach (lc, chunk_ids) + { + int32 chunk_id = lfirst_int(lc); + if (!ts_chunk_is_compressed(chunk_id)) + { + /* found a chunk that has not yet been compressed */ + *((int32 *) data) = chunk_id; + return SCAN_DONE; + } + } + + return SCAN_CONTINUE; +} + +int32 +ts_dimension_slice_get_chunkid_to_compress(int32 dimension_id, StrategyNumber start_strategy, + int64 start_value, StrategyNumber end_strategy, + int64 end_value) +{ + int32 chunk_id_ret = INVALID_CHUNK_ID; + dimension_slice_scan_with_strategies(dimension_id, + start_strategy, + start_value, + end_strategy, + end_value, + &chunk_id_ret, + dimension_slice_check_is_chunk_uncompressed_tuple_found, + -1); + + return chunk_id_ret; +} diff --git a/src/dimension_slice.h b/src/dimension_slice.h index 1e3b3ff4c..f28d45d41 100644 --- a/src/dimension_slice.h +++ b/src/dimension_slice.h @@ -76,7 +76,11 @@ extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_nth_latest_slice(int32 dim extern TSDLLEXPORT int ts_dimension_slice_oldest_chunk_without_executed_job( int32 job_id, int32 dimension_id, StrategyNumber start_strategy, int64 start_value, StrategyNumber end_strategy, int64 end_value); - +extern TSDLLEXPORT int32 ts_dimension_slice_get_chunkid_to_compress(int32 dimension_id, + StrategyNumber start_strategy, + int64 start_value, + StrategyNumber end_strategy, + int64 end_value); #define dimension_slice_insert(slice) ts_dimension_slice_insert_multi(&(slice), 1) #define dimension_slice_scan(dimension_id, coordinate) \ diff --git a/src/hypertable.c b/src/hypertable.c index 28707be23..8d2d01834 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -45,6 +45,7 @@ #include "hypertable_cache.h" #include "trigger.h" #include "scanner.h" +#include "scan_iterator.h" #include "catalog.h" #include "dimension_slice.h" #include "dimension_vector.h" @@ -2136,3 +2137,27 @@ ts_hypertable_create_compressed(Oid table_relid, int32 hypertable_id) heap_close(rel, NoLock); return true; } + +/* is this a internal hypertable created for compression */ +TSDLLEXPORT bool +ts_hypertable_is_compressed_internal(int32 compressed_hypertable_id) +{ + bool compressed = false; + ScanIterator iterator = + ts_scan_iterator_create(HYPERTABLE, AccessShareLock, CurrentMemoryContext); + iterator.ctx.index = catalog_get_index(ts_catalog_get(), HYPERTABLE, HYPERTABLE_ID_INDEX); + ts_scan_iterator_scan_key_init(&iterator, + Anum_hypertable_pkey_idx_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(compressed_hypertable_id)); + + ts_scanner_foreach(&iterator) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator); + bool isnull; + compressed = + DatumGetBool(heap_getattr(ti->tuple, Anum_hypertable_compressed, ti->desc, &isnull)); + } + return compressed; +} diff --git a/src/hypertable.h b/src/hypertable.h index dccf618af..cbc35eaf8 100644 --- a/src/hypertable.h +++ b/src/hypertable.h @@ -112,6 +112,7 @@ extern List *ts_hypertable_get_all_by_name(Name schema_name, Name table_name, Me extern bool ts_is_partitioning_column(Hypertable *ht, Index column_attno); extern TSDLLEXPORT bool ts_hypertable_set_compressed_id(Hypertable *ht, int32 compressed_hypertable_id); +extern TSDLLEXPORT bool ts_hypertable_is_compressed_internal(int32 compressed_hypertable_id); #define hypertable_scan(schema, table, tuple_found, data, lockmode, tuplock) \ ts_hypertable_scan_with_memory_context(schema, \ diff --git a/src/hypertable_compression.h b/src/hypertable_compression.h index e0ad35fdc..ec6a76976 100644 --- a/src/hypertable_compression.h +++ b/src/hypertable_compression.h @@ -17,5 +17,6 @@ hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Da bool *nulls); extern TSDLLEXPORT bool hypertable_compression_delete_by_hypertable_id(int32 htid); +extern TSDLLEXPORT bool ts_is_compression_hypertable(int32 hypertable_id); #endif diff --git a/test/expected/extension.out b/test/expected/extension.out index ff98137e4..c97f5a1e7 100644 --- a/test/expected/extension.out +++ b/test/expected/extension.out @@ -14,6 +14,7 @@ WHERE OID IN ( ORDER BY proname; proname ---------------------------------- + add_compress_chunks_policy add_dimension add_drop_chunks_policy add_reorder_policy @@ -39,6 +40,7 @@ ORDER BY proname; last locf move_chunk + remove_compress_chunks_policy remove_drop_chunks_policy remove_reorder_policy reorder_chunk @@ -52,5 +54,5 @@ ORDER BY proname; time_bucket_gapfill timescaledb_post_restore timescaledb_pre_restore -(38 rows) +(40 rows) diff --git a/tsl/src/bgw_policy/CMakeLists.txt b/tsl/src/bgw_policy/CMakeLists.txt index 449b00942..17d1fca14 100644 --- a/tsl/src/bgw_policy/CMakeLists.txt +++ b/tsl/src/bgw_policy/CMakeLists.txt @@ -1,6 +1,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/reorder_api.c ${CMAKE_CURRENT_SOURCE_DIR}/drop_chunks_api.c + ${CMAKE_CURRENT_SOURCE_DIR}/compress_chunks_api.c ${CMAKE_CURRENT_SOURCE_DIR}/job.c ) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/bgw_policy/compress_chunks_api.c b/tsl/src/bgw_policy/compress_chunks_api.c new file mode 100644 index 000000000..c0569b52a --- /dev/null +++ b/tsl/src/bgw_policy/compress_chunks_api.c @@ -0,0 +1,189 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#include +#include +#include +#include + +#include "bgw_policy/compress_chunks.h" +#include "bgw/job.h" +#include "compress_chunks_api.h" +#include "errors.h" +#include "hypertable.h" +#include "hypertable_cache.h" +#include "interval.h" +#include "license.h" +#include "utils.h" + +/* + * Default scheduled interval for compress jobs = default chunk length. + * If this is non-timestamp based hypertable, then default is 1 day + */ +#define DEFAULT_SCHEDULE_INTERVAL \ + DatumGetIntervalP(DirectFunctionCall7(make_interval, \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Int32GetDatum(1), \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Float8GetDatum(0))) +/* Default max runtime is unlimited for compress chunks */ +#define DEFAULT_MAX_RUNTIME \ + DatumGetIntervalP(DirectFunctionCall7(make_interval, \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Float8GetDatum(0))) +/* Right now, there is an infinite number of retries for compress_chunks jobs */ +#define DEFAULT_MAX_RETRIES -1 +/* Default retry period for reorder_jobs is currently 1 hour */ +#define DEFAULT_RETRY_PERIOD \ + DatumGetIntervalP(DirectFunctionCall7(make_interval, \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Int32GetDatum(0), \ + Int32GetDatum(1), \ + Int32GetDatum(0), \ + Float8GetDatum(0))) + +Datum +compress_chunks_add_policy(PG_FUNCTION_ARGS) +{ + NameData application_name; + NameData compress_chunks_name; + int32 job_id; + BgwPolicyCompressChunks *existing; + Oid ht_oid = PG_GETARG_OID(0); + Datum older_than_datum = PG_GETARG_DATUM(1); + Oid older_than_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1); + bool if_not_exists = PG_GETARG_BOOL(2); + Interval *default_schedule_interval = DEFAULT_SCHEDULE_INTERVAL; + + BgwPolicyCompressChunks policy; + Hypertable *hypertable; + Cache *hcache; + Dimension *dim; + FormData_ts_interval *older_than; + ts_hypertable_permissions_check(ht_oid, GetUserId()); + + older_than = ts_interval_from_sql_input(ht_oid, + older_than_datum, + older_than_type, + "older_than", + "compress_chunks_add_policy"); + /* check if this is a table with compression enabled */ + hcache = ts_hypertable_cache_pin(); + hypertable = ts_hypertable_cache_get_entry(hcache, ht_oid); + if (!hypertable || !TS_HYPERTABLE_HAS_COMPRESSION_ON(hypertable)) + { + ts_cache_release(hcache); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("can add compress_chunks policy only on hypertables with compression " + "enabled"))); + } + + /* Make sure that an existing policy doesn't exist on this hypertable */ + existing = ts_bgw_policy_compress_chunks_find_by_hypertable(hypertable->fd.id); + + if (existing != NULL) + { + if (!if_not_exists) + { + ts_cache_release(hcache); + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("compress chunks policy already exists for hypertable \"%s\"", + get_rel_name(ht_oid)))); + } + if (ts_interval_equal(&existing->fd.older_than, older_than)) + { + /* If all arguments are the same, do nothing */ + ts_cache_release(hcache); + ereport(NOTICE, + (errmsg("compress chunks policy already exists on hypertable \"%s\", skipping", + get_rel_name(ht_oid)))); + PG_RETURN_INT32(-1); + } + else + { + ts_cache_release(hcache); + elog(WARNING, + "could not add compress_chunks policy due to existing policy on hypertable with " + "different arguments"); + PG_RETURN_INT32(-1); + } + } + dim = hyperspace_get_open_dimension(hypertable->space, 0); + + if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim))) + { + default_schedule_interval = DatumGetIntervalP( + ts_internal_to_interval_value(dim->fd.interval_length / 2, INTERVALOID)); + } + + /* insert a new job into jobs table */ + namestrcpy(&application_name, "Compress Chunks Background Job"); + namestrcpy(&compress_chunks_name, "compress_chunks"); + job_id = ts_bgw_job_insert_relation(&application_name, + &compress_chunks_name, + default_schedule_interval, + DEFAULT_MAX_RUNTIME, + DEFAULT_MAX_RETRIES, + DEFAULT_RETRY_PERIOD); + + policy = (BgwPolicyCompressChunks){ .fd = { + .job_id = job_id, + .hypertable_id = ts_hypertable_relid_to_id(ht_oid), + .older_than = *older_than, + } }; + + /* Now, insert a new row in the compress_chunks args table */ + ts_bgw_policy_compress_chunks_insert(&policy); + ts_cache_release(hcache); + + PG_RETURN_INT32(job_id); +} + +Datum +compress_chunks_remove_policy(PG_FUNCTION_ARGS) +{ + Oid hypertable_oid = PG_GETARG_OID(0); + bool if_exists = PG_GETARG_BOOL(1); + + /* Remove the job, then remove the policy */ + int ht_id = ts_hypertable_relid_to_id(hypertable_oid); + BgwPolicyCompressChunks *policy = ts_bgw_policy_compress_chunks_find_by_hypertable(ht_id); + + ts_hypertable_permissions_check(hypertable_oid, GetUserId()); + + if (policy == NULL) + { + if (!if_exists) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("cannot remove compress chunks policy, no such policy exists"))); + else + { + ereport(NOTICE, + (errmsg("compress chunks policy does not exist on hypertable \"%s\", skipping", + get_rel_name(hypertable_oid)))); + PG_RETURN_BOOL(false); + } + } + + ts_bgw_job_delete_by_id(policy->fd.job_id); + /* Now, delete from policy table */ + ts_bgw_policy_compress_chunks_delete(policy->fd.job_id); + + PG_RETURN_BOOL(true); +} diff --git a/tsl/src/bgw_policy/compress_chunks_api.h b/tsl/src/bgw_policy/compress_chunks_api.h new file mode 100644 index 000000000..4854bc4f0 --- /dev/null +++ b/tsl/src/bgw_policy/compress_chunks_api.h @@ -0,0 +1,15 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#ifndef TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H +#define TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H + +#include + +/* User-facing API functions */ +extern Datum compress_chunks_add_policy(PG_FUNCTION_ARGS); +extern Datum compress_chunks_remove_policy(PG_FUNCTION_ARGS); +#endif /* TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H */ diff --git a/tsl/src/bgw_policy/drop_chunks_api.c b/tsl/src/bgw_policy/drop_chunks_api.c index c87f2742a..6d77d3b60 100644 --- a/tsl/src/bgw_policy/drop_chunks_api.c +++ b/tsl/src/bgw_policy/drop_chunks_api.c @@ -88,7 +88,7 @@ drop_chunks_add_policy(PG_FUNCTION_ARGS) ereport(NOTICE, (errmsg("drop chunks policy already exists on hypertable \"%s\", skipping", get_rel_name(ht_oid)))); - return -1; + PG_RETURN_INT32(-1); } else { @@ -96,7 +96,7 @@ drop_chunks_add_policy(PG_FUNCTION_ARGS) elog(WARNING, "could not add drop_chunks policy due to existing policy on hypertable with " "different arguments"); - return -1; + PG_RETURN_INT32(-1); } } diff --git a/tsl/src/bgw_policy/job.c b/tsl/src/bgw_policy/job.c index 155bf3f0a..a3f924389 100644 --- a/tsl/src/bgw_policy/job.c +++ b/tsl/src/bgw_policy/job.c @@ -15,10 +15,13 @@ #include #include "bgw/timer.h" +#include "bgw/job.h" #include "bgw/job_stat.h" #include "bgw_policy/chunk_stats.h" #include "bgw_policy/drop_chunks.h" +#include "bgw_policy/compress_chunks.h" #include "bgw_policy/reorder.h" +#include "compression/compress_utils.h" #include "continuous_aggs/materialize.h" #include "continuous_aggs/job.h" @@ -77,6 +80,21 @@ get_chunk_id_to_reorder(int32 job_id, Hypertable *ht) -1); } +static int32 +get_chunk_to_compress(Hypertable *ht, FormData_ts_interval *older_than) +{ + Dimension *open_dim = hyperspace_get_open_dimension(ht->space, 0); + StrategyNumber end_strategy = BTLessStrategyNumber; + Oid partitioning_type = ts_dimension_get_partition_type(open_dim); + int64 end_value = ts_time_value_to_internal(ts_interval_subtract_from_now(older_than, open_dim), + partitioning_type); + return ts_dimension_slice_get_chunkid_to_compress(open_dim->fd.id, + InvalidStrategy, /*start_strategy*/ + -1, /*start_value*/ + end_strategy, + end_value); +} + bool execute_reorder_policy(BgwJob *job, reorder_func reorder, bool fast_continue) { @@ -241,6 +259,77 @@ execute_materialize_continuous_aggregate(BgwJob *job) return true; } +bool +execute_compress_chunks_policy(BgwJob *job) +{ + bool started = false; + BgwPolicyCompressChunks *args; + Oid table_relid; + Hypertable *ht; + Cache *hcache; + int32 chunkid; + Chunk *chunk = NULL; + int job_id = job->fd.id; + + if (!IsTransactionOrTransactionBlock()) + { + started = true; + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + } + + /* Get the arguments from the compress_chunks_policy table */ + args = ts_bgw_policy_compress_chunks_find_by_job(job_id); + + if (args == NULL) + ereport(ERROR, + (errcode(ERRCODE_TS_INTERNAL_ERROR), + errmsg("could not run compress_chunks policy #%d because no args in policy table", + job_id))); + + table_relid = ts_hypertable_id_to_relid(args->fd.hypertable_id); + hcache = ts_hypertable_cache_pin(); + ht = ts_hypertable_cache_get_entry(hcache, table_relid); + /* First verify that the hypertable corresponds to a valid table */ + if (ht == NULL) + ereport(ERROR, + (errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST), + errmsg("could not run compress_chunks policy #%d because \"%s\" is not a " + "hypertable", + job_id, + get_rel_name(table_relid)))); + + chunkid = get_chunk_to_compress(ht, &args->fd.older_than); + if (chunkid == INVALID_CHUNK_ID) + { + elog(NOTICE, + "no chunks for hypertable %s.%s that satisfy compress chunk policy", + ht->fd.schema_name.data, + ht->fd.table_name.data); + } + else + { + chunk = ts_chunk_get_by_id(chunkid, 0, true); + tsl_compress_chunk_wrapper(chunk->table_id); + elog(LOG, + "completed compressing chunk %s.%s", + NameStr(chunk->fd.schema_name), + NameStr(chunk->fd.table_name)); + } + + chunkid = get_chunk_to_compress(ht, &args->fd.older_than); + if (chunkid != INVALID_CHUNK_ID) + enable_fast_restart(job, "compress_chunks"); + + ts_cache_release(hcache); + if (started) + { + PopActiveSnapshot(); + CommitTransactionCommand(); + } + return true; +} + static bool bgw_policy_job_requires_enterprise_license(BgwJob *job) { @@ -254,6 +343,8 @@ bgw_policy_job_requires_enterprise_license(BgwJob *job) return true; case JOB_TYPE_CONTINUOUS_AGGREGATE: return false; + case JOB_TYPE_COMPRESS_CHUNKS: + return true; default: elog(ERROR, "scheduler could not determine the license type for job type: \"%s\"", @@ -277,6 +368,8 @@ tsl_bgw_policy_job_execute(BgwJob *job) return execute_drop_chunks_policy(job->fd.id); case JOB_TYPE_CONTINUOUS_AGGREGATE: return execute_materialize_continuous_aggregate(job); + case JOB_TYPE_COMPRESS_CHUNKS: + return execute_compress_chunks_policy(job); default: elog(ERROR, "scheduler tried to run an invalid job type: \"%s\"", diff --git a/tsl/src/bgw_policy/job.h b/tsl/src/bgw_policy/job.h index 1c8ae8dc4..3c4dacc52 100644 --- a/tsl/src/bgw_policy/job.h +++ b/tsl/src/bgw_policy/job.h @@ -20,7 +20,7 @@ typedef void (*reorder_func)(Oid tableOid, Oid indexOid, bool verbose, Oid wait_ /* Functions exposed only for testing */ extern bool execute_reorder_policy(BgwJob *job, reorder_func reorder, bool fast_continue); extern bool execute_drop_chunks_policy(int32 job_id); - +extern bool execute_compress_chunks_policy(BgwJob *job); extern bool tsl_bgw_policy_job_execute(BgwJob *job); extern Datum bgw_policy_alter_job_schedule(PG_FUNCTION_ARGS); diff --git a/tsl/src/bgw_policy/reorder_api.c b/tsl/src/bgw_policy/reorder_api.c index c8b214c83..6b2f9a434 100644 --- a/tsl/src/bgw_policy/reorder_api.c +++ b/tsl/src/bgw_policy/reorder_api.c @@ -120,13 +120,13 @@ reorder_add_policy(PG_FUNCTION_ARGS) elog(WARNING, "could not add reorder policy due to existing policy on hypertable with different " "arguments"); - return -1; + PG_RETURN_INT32(-1); } /* If all arguments are the same, do nothing */ ereport(NOTICE, (errmsg("reorder policy already exists on hypertable \"%s\", skipping", get_rel_name(ht_oid)))); - return -1; + PG_RETURN_INT32(-1); } /* Next, insert a new job into jobs table */ diff --git a/tsl/src/compression/compress_utils.c b/tsl/src/compression/compress_utils.c index e6d373989..45527b1a8 100644 --- a/tsl/src/compression/compress_utils.c +++ b/tsl/src/compression/compress_utils.c @@ -281,20 +281,23 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_ ts_cache_release(hcache); } -Datum -tsl_compress_chunk(PG_FUNCTION_ARGS) +void +tsl_compress_chunk_wrapper(Oid chunk_relid) { - Oid chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); - Chunk *srcchunk = ts_chunk_get_by_relid(chunk_id, 0, true); + Chunk *srcchunk = ts_chunk_get_by_relid(chunk_relid, 0, true); if (srcchunk->fd.compressed_chunk_id != INVALID_CHUNK_ID) { ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("chunk is already compressed"))); } - license_enforce_enterprise_enabled(); - license_print_expiration_warning_if_needed(); + compress_chunk_impl(srcchunk->hypertable_relid, chunk_relid); +} - compress_chunk_impl(srcchunk->hypertable_relid, chunk_id); +Datum +tsl_compress_chunk(PG_FUNCTION_ARGS) +{ + Oid chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); + tsl_compress_chunk_wrapper(chunk_id); PG_RETURN_VOID(); } @@ -306,9 +309,6 @@ tsl_decompress_chunk(PG_FUNCTION_ARGS) if (NULL == uncompressed_chunk) elog(ERROR, "unkown chunk id %d", uncompressed_chunk_id); - license_enforce_enterprise_enabled(); - license_print_expiration_warning_if_needed(); - decompress_chunk_impl(uncompressed_chunk->hypertable_relid, uncompressed_chunk_id); PG_RETURN_VOID(); } diff --git a/tsl/src/compression/compress_utils.h b/tsl/src/compression/compress_utils.h index 9cc9b132e..b8a1e6ff8 100644 --- a/tsl/src/compression/compress_utils.h +++ b/tsl/src/compression/compress_utils.h @@ -8,5 +8,6 @@ extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS); extern Datum tsl_decompress_chunk(PG_FUNCTION_ARGS); +extern void tsl_compress_chunk_wrapper(Oid chunk_relid); #endif // TIMESCALEDB_TSL_COMPRESSION_UTILS_H diff --git a/tsl/src/compression/create.c b/tsl/src/compression/create.c index 6a45afe25..9859a3a1e 100644 --- a/tsl/src/compression/create.c +++ b/tsl/src/compression/create.c @@ -4,7 +4,6 @@ * LICENSE-TIMESCALE for a copy of the license. */ #include -#include #include #include #include @@ -13,24 +12,24 @@ #include #include #include +#include #include #include #include #include #include "catalog.h" -#include "compat.h" #include "create.h" #include "chunk.h" #include "chunk_index.h" -#include "trigger.h" -#include "scan_iterator.h" -#include "hypertable_cache.h" +#include "continuous_agg.h" #include "compression_with_clause.h" #include "compression.h" +#include "hypertable_cache.h" #include "hypertable_compression.h" #include "custom_type_cache.h" #include "license.h" +#include "trigger.h" /* entrypoint * tsl_process_compress_table : is the entry point. @@ -489,9 +488,25 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht, List *orderby_cols; bool compression_already_enabled; bool compressed_chunks_exist; + ContinuousAggHypertableStatus caggstat; - license_enforce_enterprise_enabled(); - license_print_expiration_warning_if_needed(); + /*check this is not a special internally created hypertable + * continuous agg table + * compression hypertable + */ + caggstat = ts_continuous_agg_hypertable_status(ht->fd.id); + if (!(caggstat == HypertableIsRawTable || caggstat == HypertableIsNotContinuousAgg)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("continuous aggregate tables do not support compression"))); + } + if (ht->fd.compressed) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot compress internal compression hypertable"))); + } /* Lock the uncompressed ht in exclusive mode and keep till end of txn */ LockRelationOid(ht->main_table_relid, AccessExclusiveLock); @@ -502,7 +517,7 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht, segmentby_cols = ts_compress_hypertable_parse_segment_by(with_clause_options, ht); orderby_cols = ts_compress_hypertable_parse_order_by(with_clause_options, ht); orderby_cols = add_time_to_order_by_if_not_included(orderby_cols, segmentby_cols, ht); - compression_already_enabled = ht->fd.compressed_hypertable_id != INVALID_HYPERTABLE_ID; + compression_already_enabled = TS_HYPERTABLE_HAS_COMPRESSION_ON(ht); compressed_chunks_exist = compression_already_enabled && ts_chunk_exists_with_compression(ht->fd.id); diff --git a/tsl/src/init.c b/tsl/src/init.c index 6cf1121b1..0627dc967 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -20,6 +20,7 @@ #include "bgw_policy/job.h" #include "bgw_policy/reorder_api.h" #include "bgw_policy/drop_chunks_api.h" +#include "bgw_policy/compress_chunks_api.h" #include "compression/compression.h" #include "compression/dictionary.h" #include "compression/gorilla.h" @@ -73,8 +74,10 @@ CrossModuleFunctions tsl_cm_functions = { .continuous_agg_materialize = continuous_agg_materialize, .add_drop_chunks_policy = drop_chunks_add_policy, .add_reorder_policy = reorder_add_policy, + .add_compress_chunks_policy = compress_chunks_add_policy, .remove_drop_chunks_policy = drop_chunks_remove_policy, .remove_reorder_policy = reorder_remove_policy, + .remove_compress_chunks_policy = compress_chunks_remove_policy, .create_upper_paths_hook = tsl_create_upper_paths_hook, .set_rel_pathlist_hook = tsl_set_rel_pathlist_hook, .gapfill_marker = gapfill_marker, diff --git a/tsl/test/expected/compression.out b/tsl/test/expected/compression.out index f8fcd2cf5..20eaac9ff 100644 --- a/tsl/test/expected/compression.out +++ b/tsl/test/expected/compression.out @@ -17,7 +17,6 @@ insert into foo values( 10 , 10 , 20, 120); insert into foo values( 20 , 11 , 20, 13); insert into foo values( 30 , 12 , 20, 14); alter table foo set (timescaledb.compress, timescaledb.compress_segmentby = 'a,b', timescaledb.compress_orderby = 'c desc, d asc nulls last'); -WARNING: Timescale License expired select id, schema_name, table_name, compressed, compressed_hypertable_id from _timescaledb_catalog.hypertable order by id; id | schema_name | table_name | compressed | compressed_hypertable_id diff --git a/tsl/test/expected/compression_bgw.out b/tsl/test/expected/compression_bgw.out new file mode 100644 index 000000000..1befd5e85 --- /dev/null +++ b/tsl/test/expected/compression_bgw.out @@ -0,0 +1,186 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\c :TEST_DBNAME :ROLE_SUPERUSER +SELECT _timescaledb_internal.stop_background_workers(); + stop_background_workers +------------------------- + t +(1 row) + +SELECT _timescaledb_internal.enterprise_enabled(); + enterprise_enabled +-------------------- + t +(1 row) + +CREATE OR REPLACE FUNCTION test_compress_chunks_policy(job_id INTEGER) +RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'ts_test_auto_compress_chunks' +LANGUAGE C VOLATILE STRICT; +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER +CREATE TABLE conditions ( + time TIMESTAMPTZ NOT NULL, + location TEXT NOT NULL, + location2 char(10) NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL + ); +select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days'::interval); + create_hypertable +------------------------- + (1,public,conditions,t) +(1 row) + +--TEST 1-- +--cannot set policy without enabling compression -- +\set ON_ERROR_STOP 0 +select add_compress_chunks_policy('conditions', '60d'::interval); +ERROR: can add compress_chunks policy only on hypertables with compression enabled +\set ON_ERROR_STOP 1 +-- TEST2 -- +--add a policy to compress chunks -- +alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby = 'location', timescaledb.compress_orderby = 'time'); +insert into conditions +select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75; +select add_compress_chunks_policy('conditions', '60d'::interval); + add_compress_chunks_policy +---------------------------- + 1000 +(1 row) + +select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks; + compressjob_id | hypertable_id | older_than +----------------+---------------+------------------ + 1000 | 1 | (t,"@ 60 days",) +(1 row) + +\gset +select * from _timescaledb_config.bgw_job where job_type like 'compress%'; + id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period +------+--------------------------------+-----------------+--------------------+-------------+-------------+-------------- + 1000 | Compress Chunks Background Job | compress_chunks | @ 15 days 12 hours | @ 0 | -1 | @ 1 hour +(1 row) + +select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s'); +WARNING: Timescale License expired + job_id | schedule_interval | max_runtime | max_retries | retry_period +--------+-------------------+-------------+-------------+-------------- + 1000 | @ 1 sec | @ 0 | -1 | @ 1 hour +(1 row) + +select * from _timescaledb_config.bgw_job where job_type like 'compress%'; + id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period +------+--------------------------------+-----------------+-------------------+-------------+-------------+-------------- + 1000 | Compress Chunks Background Job | compress_chunks | @ 1 sec | @ 0 | -1 | @ 1 hour +(1 row) + +insert into conditions +select generate_series(now()::timestamp, now()::timestamp+'1day', '1 min'), 'TOK', 'sony', 55, 75; +-- TEST3 -- +--only the old chunks will get compressed when policy is executed-- +select test_compress_chunks_policy(:compressjob_id); + test_compress_chunks_policy +----------------------------- + +(1 row) + +select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_bytes from timescaledb_information.compressed_chunk_size order by chunk_name; + hypertable_name | chunk_name | uncompressed_total_bytes | compressed_total_bytes +-----------------+----------------------------------------+--------------------------+------------------------ + conditions | _timescaledb_internal._hyper_1_1_chunk | 32 kB | 16 kB +(1 row) + +-- TEST 4 -- +--cannot set another policy +\set ON_ERROR_STOP 0 +select add_compress_chunks_policy('conditions', '60d'::interval, if_not_exists=>true); +NOTICE: compress chunks policy already exists on hypertable "conditions", skipping + add_compress_chunks_policy +---------------------------- + -1 +(1 row) + +select add_compress_chunks_policy('conditions', '60d'::interval); +ERROR: compress chunks policy already exists for hypertable "conditions" +select add_compress_chunks_policy('conditions', '30d'::interval, if_not_exists=>true); +WARNING: could not add compress_chunks policy due to existing policy on hypertable with different arguments + add_compress_chunks_policy +---------------------------- + -1 +(1 row) + +\set ON_ERROR_STOP 1 +--TEST 5 -- +-- drop the policy -- +select remove_compress_chunks_policy('conditions'); + remove_compress_chunks_policy +------------------------------- + t +(1 row) + +select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks; + compressjob_id | hypertable_id | older_than +----------------+---------------+------------ +(0 rows) + +--TEST 6 -- +-- try to execute the policy after it has been dropped -- +\set ON_ERROR_STOP 0 +select test_compress_chunks_policy(:compressjob_id); +ERROR: bgw job not found +\set ON_ERROR_STOP 1 +--TEST 7 +--compress chunks policy for integer based partition hypertable +CREATE TABLE test_table_int(time bigint, val int); +SELECT create_hypertable('test_table_int', 'time', chunk_time_interval => 1); +NOTICE: adding not-null constraint to column "time" + create_hypertable +----------------------------- + (3,public,test_table_int,t) +(1 row) + +create or replace function dummy_now() returns BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 5::BIGINT'; +select set_integer_now_func('test_table_int', 'dummy_now'); + set_integer_now_func +---------------------- + +(1 row) + +insert into test_table_int select generate_series(1,5), 10; +alter table test_table_int set (timescaledb.compress); +select add_compress_chunks_policy('test_table_int', 2::int); + add_compress_chunks_policy +---------------------------- + 1001 +(1 row) + +select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks +where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name like 'test_table_int'); + compressjob_id | hypertable_id | older_than +----------------+---------------+------------ + 1001 | 3 | (f,,2) +(1 row) + +\gset +select test_compress_chunks_policy(:compressjob_id); + test_compress_chunks_policy +----------------------------- + +(1 row) + +select test_compress_chunks_policy(:compressjob_id); + test_compress_chunks_policy +----------------------------- + +(1 row) + +select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_bytes from timescaledb_information.compressed_chunk_size +where hypertable_name::text like 'test_table_int' +order by chunk_name; + hypertable_name | chunk_name | uncompressed_total_bytes | compressed_total_bytes +-----------------+----------------------------------------+--------------------------+------------------------ + test_table_int | _timescaledb_internal._hyper_3_5_chunk | 24 kB | 16 kB + test_table_int | _timescaledb_internal._hyper_3_6_chunk | 24 kB | 16 kB +(2 rows) + diff --git a/tsl/test/expected/compression_errors.out b/tsl/test/expected/compression_errors.out index 58f015c44..e8be874ad 100644 --- a/tsl/test/expected/compression_errors.out +++ b/tsl/test/expected/compression_errors.out @@ -31,7 +31,6 @@ insert into non_compressed values( 3 , 16 , 20, 4); ALTER TABLE foo2 set (timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c'); ERROR: must set the 'compress' boolean option when setting compression options ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c'); -WARNING: Timescale License expired ERROR: cannot use the same column c in compress_orderby and compress_segmentby ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd DESC'); ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd'); diff --git a/tsl/test/expected/compression_hypertable.out b/tsl/test/expected/compression_hypertable.out index 22110ddae..14fcabebf 100644 --- a/tsl/test/expected/compression_hypertable.out +++ b/tsl/test/expected/compression_hypertable.out @@ -38,7 +38,6 @@ NOTICE: adding not-null constraint to column "Time" INSERT INTO test1 SELECT t, gen_rand_minstd(), gen_rand_minstd(), gen_rand_minstd()::text FROM generate_series('2018-03-02 1:00'::TIMESTAMPTZ, '2018-03-28 1:00', '1 hour') t; ALTER TABLE test1 set (timescaledb.compress, timescaledb.compress_segmentby = '', timescaledb.compress_orderby = '"Time" DESC'); -WARNING: Timescale License expired SELECT $$ SELECT * FROM test1 ORDER BY "Time" @@ -68,7 +67,6 @@ pg_dump: Consider using a full dump instead of a --data-only dump to avoid this Number of rows different between original and query on compressed data (expect 0) | 0 (1 row) -psql:include/compression_test_hypertable.sql:41: WARNING: Timescale License expired count_decompressed -------------------- 27 @@ -144,7 +142,6 @@ pg_dump: Consider using a full dump instead of a --data-only dump to avoid this Number of rows different between original and query on compressed data (expect 0) | 0 (1 row) -psql:include/compression_test_hypertable.sql:41: WARNING: Timescale License expired count_decompressed -------------------- 5 @@ -253,7 +250,6 @@ pg_dump: Consider using a full dump instead of a --data-only dump to avoid this Number of rows different between original and query on compressed data (expect 0) | 0 (1 row) -psql:include/compression_test_hypertable.sql:41: WARNING: Timescale License expired count_decompressed -------------------- 1 @@ -327,7 +323,6 @@ pg_dump: Consider using a full dump instead of a --data-only dump to avoid this Number of rows different between original and query on compressed data (expect 0) | 0 (1 row) -psql:include/compression_test_hypertable.sql:41: WARNING: Timescale License expired count_decompressed -------------------- 10 diff --git a/tsl/test/expected/transparent_decompression-10.out b/tsl/test/expected/transparent_decompression-10.out index 0b15780a1..e17a0fc24 100644 --- a/tsl/test/expected/transparent_decompression-10.out +++ b/tsl/test/expected/transparent_decompression-10.out @@ -44,7 +44,6 @@ ANALYZE metrics_space; \set ECHO none -- compress first and last chunk on the hypertable ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device_id'); -WARNING: Timescale License expired SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'); compress_chunk ---------------- diff --git a/tsl/test/expected/transparent_decompression-11.out b/tsl/test/expected/transparent_decompression-11.out index 15e415eb9..ef2c91f0e 100644 --- a/tsl/test/expected/transparent_decompression-11.out +++ b/tsl/test/expected/transparent_decompression-11.out @@ -44,7 +44,6 @@ ANALYZE metrics_space; \set ECHO none -- compress first and last chunk on the hypertable ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device_id'); -WARNING: Timescale License expired SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'); compress_chunk ---------------- diff --git a/tsl/test/expected/transparent_decompression-9.6.out b/tsl/test/expected/transparent_decompression-9.6.out index 97ae3890a..361b050f2 100644 --- a/tsl/test/expected/transparent_decompression-9.6.out +++ b/tsl/test/expected/transparent_decompression-9.6.out @@ -44,7 +44,6 @@ ANALYZE metrics_space; \set ECHO none -- compress first and last chunk on the hypertable ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device_id'); -WARNING: Timescale License expired SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'); compress_chunk ---------------- diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index ebbebc84a..c574afdcb 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -19,6 +19,7 @@ set(TEST_FILES_DEBUG compression_errors.sql compression_hypertable.sql compression_segment_meta.sql + compression_bgw.sql continuous_aggs.sql continuous_aggs_bgw.sql continuous_aggs_materialize.sql diff --git a/tsl/test/sql/compression_bgw.sql b/tsl/test/sql/compression_bgw.sql new file mode 100644 index 000000000..cf3509c10 --- /dev/null +++ b/tsl/test/sql/compression_bgw.sql @@ -0,0 +1,89 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\c :TEST_DBNAME :ROLE_SUPERUSER +SELECT _timescaledb_internal.stop_background_workers(); +SELECT _timescaledb_internal.enterprise_enabled(); + +CREATE OR REPLACE FUNCTION test_compress_chunks_policy(job_id INTEGER) +RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'ts_test_auto_compress_chunks' +LANGUAGE C VOLATILE STRICT; + +\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER + +CREATE TABLE conditions ( + time TIMESTAMPTZ NOT NULL, + location TEXT NOT NULL, + location2 char(10) NOT NULL, + temperature DOUBLE PRECISION NULL, + humidity DOUBLE PRECISION NULL + ); +select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days'::interval); + +--TEST 1-- +--cannot set policy without enabling compression -- +\set ON_ERROR_STOP 0 +select add_compress_chunks_policy('conditions', '60d'::interval); +\set ON_ERROR_STOP 1 + +-- TEST2 -- +--add a policy to compress chunks -- +alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby = 'location', timescaledb.compress_orderby = 'time'); +insert into conditions +select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75; + +select add_compress_chunks_policy('conditions', '60d'::interval); +select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks; +\gset +select * from _timescaledb_config.bgw_job where job_type like 'compress%'; +select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s'); +select * from _timescaledb_config.bgw_job where job_type like 'compress%'; +insert into conditions +select generate_series(now()::timestamp, now()::timestamp+'1day', '1 min'), 'TOK', 'sony', 55, 75; + +-- TEST3 -- +--only the old chunks will get compressed when policy is executed-- +select test_compress_chunks_policy(:compressjob_id); +select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_bytes from timescaledb_information.compressed_chunk_size order by chunk_name; + +-- TEST 4 -- +--cannot set another policy +\set ON_ERROR_STOP 0 +select add_compress_chunks_policy('conditions', '60d'::interval, if_not_exists=>true); +select add_compress_chunks_policy('conditions', '60d'::interval); +select add_compress_chunks_policy('conditions', '30d'::interval, if_not_exists=>true); +\set ON_ERROR_STOP 1 + +--TEST 5 -- +-- drop the policy -- +select remove_compress_chunks_policy('conditions'); +select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks; + +--TEST 6 -- +-- try to execute the policy after it has been dropped -- +\set ON_ERROR_STOP 0 +select test_compress_chunks_policy(:compressjob_id); +\set ON_ERROR_STOP 1 + +--TEST 7 +--compress chunks policy for integer based partition hypertable +CREATE TABLE test_table_int(time bigint, val int); +SELECT create_hypertable('test_table_int', 'time', chunk_time_interval => 1); + +create or replace function dummy_now() returns BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 5::BIGINT'; +select set_integer_now_func('test_table_int', 'dummy_now'); +insert into test_table_int select generate_series(1,5), 10; +alter table test_table_int set (timescaledb.compress); +select add_compress_chunks_policy('test_table_int', 2::int); + +select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks +where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name like 'test_table_int'); +\gset +select test_compress_chunks_policy(:compressjob_id); +select test_compress_chunks_policy(:compressjob_id); +select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_bytes from timescaledb_information.compressed_chunk_size +where hypertable_name::text like 'test_table_int' +order by chunk_name; + diff --git a/tsl/test/src/test_auto_policy.c b/tsl/test/src/test_auto_policy.c index c7c5edc06..fe5e88bf1 100644 --- a/tsl/test/src/test_auto_policy.c +++ b/tsl/test/src/test_auto_policy.c @@ -11,6 +11,7 @@ #include #include "bgw_policy/job.h" +#include "bgw/job_stat.h" #include "chunk.h" #include "reorder.h" @@ -18,6 +19,7 @@ TS_FUNCTION_INFO_V1(ts_test_auto_reorder); TS_FUNCTION_INFO_V1(ts_test_auto_drop_chunks); +TS_FUNCTION_INFO_V1(ts_test_auto_compress_chunks); static Oid chunk_oid; static Oid index_oid; @@ -67,3 +69,15 @@ ts_test_auto_drop_chunks(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } + +/* Call the real compress_chunks policy */ +Datum +ts_test_auto_compress_chunks(PG_FUNCTION_ARGS) +{ + int32 job_id = PG_GETARG_INT32(0); + BgwJob *job = ts_bgw_job_find(job_id, CurrentMemoryContext, true); + /*since we didn't come through the scheduler, need to mark job + * as started to create a job_stat record */ + ts_bgw_job_stat_mark_start(job_id); + return execute_compress_chunks_policy(job); +}