diff --git a/sql/bgw_scheduler.sql b/sql/bgw_scheduler.sql index fd0a02cd7..0756f5396 100644 --- a/sql/bgw_scheduler.sql +++ b/sql/bgw_scheduler.sql @@ -38,6 +38,12 @@ CREATE OR REPLACE FUNCTION add_retention_policy( RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_add_retention_policy' LANGUAGE C VOLATILE STRICT; +-- Remove the retention policy from a hypertable +CREATE OR REPLACE FUNCTION remove_retention_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID +AS '@MODULE_PATHNAME@', 'ts_remove_retention_policy' +LANGUAGE C VOLATILE STRICT; + +/* reorder policy */ CREATE OR REPLACE FUNCTION add_reorder_policy(hypertable REGCLASS, index_name NAME, if_not_exists BOOL = false) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_reorder_add' LANGUAGE C VOLATILE STRICT; @@ -50,19 +56,19 @@ CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_reorder(job_id INTEGER, AS '@MODULE_PATHNAME@', 'ts_policy_reorder_proc' LANGUAGE C; -CREATE OR REPLACE FUNCTION add_compress_chunks_policy(hypertable REGCLASS, older_than "any", if_not_exists BOOL = false) +/* compression policy */ +CREATE OR REPLACE FUNCTION add_compression_policy(hypertable REGCLASS, older_than "any", if_not_exists BOOL = false) RETURNS INTEGER -AS '@MODULE_PATHNAME@', 'ts_add_compress_chunks_policy' +AS '@MODULE_PATHNAME@', 'ts_policy_compression_add' LANGUAGE C VOLATILE STRICT; --- Remove the retention policy from a hypertable -CREATE OR REPLACE FUNCTION remove_retention_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID -AS '@MODULE_PATHNAME@', 'ts_remove_retention_policy' +CREATE OR REPLACE FUNCTION remove_compression_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS BOOL +AS '@MODULE_PATHNAME@', 'ts_policy_compression_remove' LANGUAGE C VOLATILE STRICT; -CREATE OR REPLACE FUNCTION remove_compress_chunks_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS BOOL -AS '@MODULE_PATHNAME@', 'ts_remove_compress_chunks_policy' -LANGUAGE C VOLATILE STRICT; +CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression(job_id INTEGER, config JSONB) +AS '@MODULE_PATHNAME@', 'ts_policy_compression_proc' +LANGUAGE C; -- Returns the updated job schedule values CREATE OR REPLACE FUNCTION alter_job_schedule( diff --git a/sql/pre_install/tables.sql b/sql/pre_install/tables.sql index 8cd1faf36..26771be6d 100644 --- a/sql/pre_install/tables.sql +++ b/sql/pre_install/tables.sql @@ -364,15 +364,6 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_chunk_size ( ); SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', ''); -CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_policy_compress_chunks( - job_id INTEGER PRIMARY KEY REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE, - hypertable_id INTEGER UNIQUE NOT NULL REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE, - older_than _timescaledb_catalog.ts_interval NOT NULL, - CONSTRAINT valid_older_than CHECK(_timescaledb_internal.valid_ts_interval(older_than)) -); - -SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_policy_compress_chunks', ''); - --This stores commit decisions for 2pc remote txns. Abort decisions are never stored. --If a PREPARE TRANSACTION fails for any data node then the entire --frontend transaction will be rolled back and no rows will be stored. diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index f49781d44..6e440e564 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -162,6 +162,23 @@ FROM _timescaledb_config.bgw_policy_reorder reorder WHERE job_type = 'reorder' AND job.id = reorder.job_id; +-- migrate compression jobs +UPDATE + _timescaledb_config.bgw_job job +SET proc_name = 'policy_compression', + proc_schema = '_timescaledb_internal', + config = jsonb_build_object('hypertable_id', c.hypertable_id, 'older_than', CASE WHEN (older_than).is_time_interval THEN (older_than).time_interval::text ELSE (older_than).integer_interval::text END), + hypertable_id = c.hypertable_id, + OWNER = ( + SELECT relowner::regrole::text + FROM _timescaledb_catalog.hypertable ht, + pg_class cl + WHERE ht.id = c.hypertable_id + AND cl.oid = format('%I.%I', schema_name, table_name)::regclass) +FROM _timescaledb_config.bgw_policy_compress_chunks c +WHERE job_type = 'compress_chunks' + AND job.id = c.job_id; + --rewrite catalog table to not break catalog scans on tables with missingval optimization CLUSTER _timescaledb_config.bgw_job USING bgw_job_pkey; ALTER TABLE _timescaledb_config.bgw_job SET WITHOUT CLUSTER; @@ -169,5 +186,10 @@ ALTER TABLE _timescaledb_config.bgw_job SET WITHOUT CLUSTER; CREATE INDEX IF NOT EXISTS bgw_job_proc_hypertable_id_idx ON _timescaledb_config.bgw_job(proc_name,proc_schema,hypertable_id); ALTER EXTENSION timescaledb DROP TABLE _timescaledb_config.bgw_policy_reorder; -DROP TABLE _timescaledb_config.bgw_policy_reorder CASCADE; +ALTER EXTENSION timescaledb DROP TABLE _timescaledb_config.bgw_policy_compress_chunks; +DROP TABLE IF EXISTS _timescaledb_config.bgw_policy_reorder CASCADE; +DROP TABLE IF EXISTS _timescaledb_config.bgw_policy_compress_chunks CASCADE; + +DROP FUNCTION IF EXISTS add_compress_chunks_policy; +DROP FUNCTION IF EXISTS remove_compress_chunks_policy; diff --git a/sql/views.sql b/sql/views.sql index 81495b317..44f7b2088 100644 --- a/sql/views.sql +++ b/sql/views.sql @@ -68,9 +68,8 @@ WHERE job_type = 'reorder'; CREATE OR REPLACE VIEW timescaledb_information.policy_stats as SELECT format('%1$I.%2$I', ht.schema_name, ht.table_name)::regclass as hypertable, p.job_id, j.job_type, js.last_run_success, js.last_finish, js.last_successful_finish, js.last_start, js.next_start, js.total_runs, js.total_failures - FROM (SELECT id AS job_id, hypertable_id FROM _timescaledb_config.bgw_job WHERE job_type IN ('reorder') + FROM (SELECT id AS job_id, hypertable_id FROM _timescaledb_config.bgw_job WHERE job_type IN ('reorder','compress_chunks') UNION SELECT job_id, hypertable_id FROM _timescaledb_config.bgw_policy_drop_chunks - UNION SELECT job_id, hypertable_id FROM _timescaledb_config.bgw_policy_compress_chunks UNION SELECT job_id, raw_hypertable_id FROM _timescaledb_catalog.continuous_agg) p INNER JOIN _timescaledb_catalog.hypertable ht ON p.hypertable_id = ht.id INNER JOIN _timescaledb_config.bgw_job j ON p.job_id = j.id diff --git a/src/bgw/job.c b/src/bgw/job.c index e39c3f2cc..9872f2ba1 100644 --- a/src/bgw/job.c +++ b/src/bgw/job.c @@ -32,7 +32,6 @@ #include "telemetry/telemetry.h" #include "bgw_policy/chunk_stats.h" #include "bgw_policy/drop_chunks.h" -#include "bgw_policy/compress_chunks.h" #include "bgw_policy/policy.h" #include "scan_iterator.h" @@ -128,14 +127,8 @@ ts_bgw_job_owner(BgwJob *job) return ts_rel_get_owner(ts_continuous_agg_get_user_view_oid(ca)); } - case JOB_TYPE_COMPRESS_CHUNKS: - { - BgwPolicyCompressChunks *policy = ts_bgw_policy_compress_chunks_find_by_job(job->fd.id); - if (policy == NULL) - elog(ERROR, "compress chunks policy for job with id \"%d\" not found", job->fd.id); - return ts_rel_get_owner(ts_hypertable_id_to_relid(policy->fd.hypertable_id)); - } + case JOB_TYPE_COMPRESS_CHUNKS: case JOB_TYPE_REORDER: case JOB_TYPE_CUSTOM: return get_role_oid(NameStr(job->fd.owner), false); @@ -570,7 +563,6 @@ bgw_job_tuple_delete(TupleInfo *ti, void *data) /* Delete any policy args associated with this job */ ts_bgw_policy_drop_chunks_delete_row_only_by_job_id(job_id); - ts_bgw_policy_compress_chunks_delete_row_only_by_job_id(job_id); /* Delete any stats in bgw_policy_chunk_stats related to this job */ ts_bgw_policy_chunk_stats_delete_row_only_by_job_id(job_id); diff --git a/src/bgw_policy/CMakeLists.txt b/src/bgw_policy/CMakeLists.txt index 35d2602cc..abfc8795f 100644 --- a/src/bgw_policy/CMakeLists.txt +++ b/src/bgw_policy/CMakeLists.txt @@ -1,6 +1,5 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/drop_chunks.c - ${CMAKE_CURRENT_SOURCE_DIR}/compress_chunks.c ${CMAKE_CURRENT_SOURCE_DIR}/policy.c ${CMAKE_CURRENT_SOURCE_DIR}/chunk_stats.c ) diff --git a/src/bgw_policy/compress_chunks.c b/src/bgw_policy/compress_chunks.c deleted file mode 100644 index 2f08d4720..000000000 --- a/src/bgw_policy/compress_chunks.c +++ /dev/null @@ -1,163 +0,0 @@ -/* - * This file and its contents are licensed under the Apache License 2.0. - * Please see the included NOTICE for copyright information and - * LICENSE-APACHE for a copy of the license. - */ -#include -#include - -#include "bgw/job.h" -#include "catalog.h" -#include "compress_chunks.h" -#include "hypertable.h" -#include "interval.h" -#include "policy.h" -#include "scanner.h" -#include "scan_iterator.h" -#include "utils.h" - -#include "compat.h" - -static ScanTupleResult -bgw_policy_compress_chunks_tuple_found(TupleInfo *ti, void *const data) -{ - BgwPolicyCompressChunks **policy = data; - bool nulls[Natts_bgw_policy_compress_chunks]; - Datum values[Natts_bgw_policy_compress_chunks]; - bool should_free; - HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); - - heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls); - - *policy = MemoryContextAllocZero(ti->mctx, sizeof(BgwPolicyCompressChunks)); - Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)]); - (*policy)->fd.job_id = - DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)]); - - Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)]); - (*policy)->fd.hypertable_id = DatumGetInt32( - values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)]); - - Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)]); - - (*policy)->fd.older_than = *ts_interval_from_tuple( - values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)]); - - if (should_free) - heap_freetuple(tuple); - - return SCAN_CONTINUE; -} - -static ScanTupleResult -compress_policy_delete_row_tuple_found(TupleInfo *ti, void *const data) -{ - CatalogSecurityContext sec_ctx; - - ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx); - ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti)); - ts_catalog_restore_user(&sec_ctx); - - return SCAN_CONTINUE; -} - -/* deletes only from the compress_chunks policy table. need to remove the job separately */ -bool -ts_bgw_policy_compress_chunks_delete_row_only_by_job_id(int32 job_id) -{ - ScanKeyData scankey[1]; - - ScanKeyInit(&scankey[0], - Anum_bgw_policy_compress_chunks_pkey_job_id, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(job_id)); - - return ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS, - BGW_POLICY_COMPRESS_CHUNKS_PKEY, - scankey, - 1, - compress_policy_delete_row_tuple_found, - RowExclusiveLock, - BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME, - NULL); -} - -BgwPolicyCompressChunks * -ts_bgw_policy_compress_chunks_find_by_hypertable(int32 hypertable_id) -{ - ScanKeyData scankey[1]; - BgwPolicyCompressChunks *ret = NULL; - - ScanKeyInit(&scankey[0], - Anum_bgw_policy_compress_chunks_hypertable_id_key_hypertable_id, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(hypertable_id)); - - ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS, - BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY, - scankey, - 1, - bgw_policy_compress_chunks_tuple_found, - RowExclusiveLock, - BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME, - (void *) &ret); - - return ret; -} - -void -ts_bgw_policy_compress_chunks_insert(BgwPolicyCompressChunks *policy) -{ - TupleDesc tupdesc; - CatalogSecurityContext sec_ctx; - Datum values[Natts_bgw_policy_compress_chunks]; - bool nulls[Natts_bgw_policy_compress_chunks] = { false }; - HeapTuple ht_older_than; - Catalog *catalog = ts_catalog_get(); - Relation rel = - table_open(catalog_get_table_id(catalog, BGW_POLICY_COMPRESS_CHUNKS), RowExclusiveLock); - - tupdesc = RelationGetDescr(rel); - - values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)] = - Int32GetDatum(policy->fd.job_id); - values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)] = - Int32GetDatum(policy->fd.hypertable_id); - - ht_older_than = ts_interval_form_heaptuple(&policy->fd.older_than); - - values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)] = - HeapTupleGetDatum(ht_older_than); - - ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx); - ts_catalog_insert_values(rel, tupdesc, values, nulls); - ts_catalog_restore_user(&sec_ctx); - heap_freetuple(ht_older_than); - table_close(rel, RowExclusiveLock); -} - -BgwPolicyCompressChunks * -ts_bgw_policy_compress_chunks_find_by_job(int32 job_id) -{ - ScanKeyData scankey[1]; - BgwPolicyCompressChunks *ret = NULL; - - ScanKeyInit(&scankey[0], - Anum_bgw_policy_compress_chunks_pkey_job_id, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(job_id)); - - ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS, - BGW_POLICY_COMPRESS_CHUNKS_PKEY, - scankey, - 1, - bgw_policy_compress_chunks_tuple_found, - RowExclusiveLock, - BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME, - (void *) &ret); - - return ret; -} diff --git a/src/bgw_policy/compress_chunks.h b/src/bgw_policy/compress_chunks.h deleted file mode 100644 index 009feace8..000000000 --- a/src/bgw_policy/compress_chunks.h +++ /dev/null @@ -1,23 +0,0 @@ -/* - * This file and its contents are licensed under the Apache License 2.0. - * Please see the included NOTICE for copyright information and - * LICENSE-APACHE for a copy of the license. - */ - -#ifndef TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H -#define TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H - -#include "catalog.h" -#include "export.h" - -typedef struct BgwPolicyCompressChunks -{ - FormData_bgw_policy_compress_chunks fd; -} BgwPolicyCompressChunks; - -extern TSDLLEXPORT BgwPolicyCompressChunks *ts_bgw_policy_compress_chunks_find_by_job(int32 job_id); -extern TSDLLEXPORT BgwPolicyCompressChunks * -ts_bgw_policy_compress_chunks_find_by_hypertable(int32 hypertable_id); -extern TSDLLEXPORT void ts_bgw_policy_compress_chunks_insert(BgwPolicyCompressChunks *policy); -extern TSDLLEXPORT bool ts_bgw_policy_compress_chunks_delete_row_only_by_job_id(int32 job_id); -#endif /* TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H */ diff --git a/src/bgw_policy/policy.c b/src/bgw_policy/policy.c index dd126c2fb..e401d688a 100644 --- a/src/bgw_policy/policy.c +++ b/src/bgw_policy/policy.c @@ -7,7 +7,6 @@ #include #include "bgw/job.h" -#include "bgw_policy/compress_chunks.h" #include "bgw_policy/drop_chunks.h" #include "policy.h" @@ -28,11 +27,6 @@ ts_bgw_policy_delete_by_hypertable_id(int32 hypertable_id) if (policy) ts_bgw_job_delete_by_id(((BgwPolicyDropChunks *) policy)->job_id); - policy = ts_bgw_policy_compress_chunks_find_by_hypertable(hypertable_id); - - if (policy) - ts_bgw_job_delete_by_id(((BgwPolicyCompressChunks *) policy)->fd.job_id); - jobs = ts_bgw_job_find_by_hypertable_id(hypertable_id); foreach (lc, jobs) { diff --git a/src/catalog.c b/src/catalog.c index aa63533c8..05cf02735 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -107,10 +107,6 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = { .schema_name = CATALOG_SCHEMA_NAME, .table_name = COMPRESSION_CHUNK_SIZE_TABLE_NAME, }, - [BGW_POLICY_COMPRESS_CHUNKS] = { - .schema_name = CONFIG_SCHEMA_NAME, - .table_name = BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME, - }, [REMOTE_TXN] = { .schema_name = CATALOG_SCHEMA_NAME, .table_name = REMOTE_TXN_TABLE_NAME, @@ -265,13 +261,6 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES] [COMPRESSION_CHUNK_SIZE_PKEY] = "compression_chunk_size_pkey", }, }, - [BGW_POLICY_COMPRESS_CHUNKS] = { - .length = _MAX_BGW_POLICY_COMPRESS_CHUNKS_INDEX, - .names = (char *[]) { - [BGW_POLICY_COMPRESS_CHUNKS_PKEY] = "bgw_policy_compress_chunks_pkey", - [BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY] = "bgw_policy_compress_chunks_hypertable_id_key", - }, - }, [REMOTE_TXN] = { .length = _MAX_REMOTE_TXN_INDEX, .names = (char *[]) { @@ -300,7 +289,6 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = { [CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG] = NULL, [HYPERTABLE_COMPRESSION] = NULL, [COMPRESSION_CHUNK_SIZE] = NULL, - [BGW_POLICY_COMPRESS_CHUNKS] = NULL, [REMOTE_TXN] = NULL, }; diff --git a/src/catalog.h b/src/catalog.h index 5afbcda93..8d1cc5ac0 100644 --- a/src/catalog.h +++ b/src/catalog.h @@ -55,7 +55,6 @@ typedef enum CatalogTable CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG, HYPERTABLE_COMPRESSION, COMPRESSION_CHUNK_SIZE, - BGW_POLICY_COMPRESS_CHUNKS, REMOTE_TXN, _MAX_CATALOG_TABLES, } CatalogTable; @@ -1249,50 +1248,6 @@ typedef enum Anum_compression_chunk_size_pkey #define Natts_compression_chunk_size_pkey (_Anum_compression_chunk_size_pkey_max - 1) -#define BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME "bgw_policy_compress_chunks" -typedef enum Anum_bgw_policy_compress_chunks -{ - Anum_bgw_policy_compress_chunks_job_id = 1, - Anum_bgw_policy_compress_chunks_hypertable_id, - Anum_bgw_policy_compress_chunks_older_than, - _Anum_bgw_policy_compress_chunks_max, -} Anum_bgw_policy_compress_chunks; - -#define Natts_bgw_policy_compress_chunks (_Anum_bgw_policy_compress_chunks_max - 1) - -typedef struct FormData_bgw_policy_compress_chunks -{ - int32 job_id; - int32 hypertable_id; - FormData_ts_interval older_than; -} FormData_bgw_policy_compress_chunks; - -typedef FormData_bgw_policy_compress_chunks *Form_bgw_policy_compress_chunks; - -enum -{ - BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY = 0, - BGW_POLICY_COMPRESS_CHUNKS_PKEY, - _MAX_BGW_POLICY_COMPRESS_CHUNKS_INDEX, -}; - -typedef enum Anum_bgw_policy_compress_chunks_hypertable_id_key -{ - Anum_bgw_policy_compress_chunks_hypertable_id_key_hypertable_id = 1, - _Anum_bgw_policy_compress_chunks_hypertable_id_key_max, -} Anum_bgw_policy_compress_chunks_hypertable_id_key; - -#define Natts_bgw_policy_compress_chunks_hypertable_id_key \ - (_Anum_bgw_policy_compress_chunks_hypertable_id_key_max - 1) - -typedef enum Anum_bgw_policy_compress_chunks_pkey -{ - Anum_bgw_policy_compress_chunks_pkey_job_id = 1, - _Anum_bgw_policy_compress_chunks_pkey_max, -} Anum_bgw_policy_compress_chunks_pkey; - -#define Natts_bgw_policy_compress_chunks_pkey (_Anum_bgw_policy_compress_chunks_pkey_max - 1) - /* * The maximum number of indexes a catalog table can have. * This needs to be bumped in case of new catalog tables that have more indexes. diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 51a7904ef..ada6d5559 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -21,14 +21,15 @@ } /* bgw policy functions */ -CROSSMODULE_WRAPPER(add_compress_chunks_policy); CROSSMODULE_WRAPPER(add_retention_policy); +CROSSMODULE_WRAPPER(remove_retention_policy); +CROSSMODULE_WRAPPER(policy_compression_add); +CROSSMODULE_WRAPPER(policy_compression_proc); +CROSSMODULE_WRAPPER(policy_compression_remove); CROSSMODULE_WRAPPER(policy_reorder_add); CROSSMODULE_WRAPPER(policy_reorder_proc); CROSSMODULE_WRAPPER(policy_reorder_remove); CROSSMODULE_WRAPPER(alter_job_schedule); -CROSSMODULE_WRAPPER(remove_compress_chunks_policy); -CROSSMODULE_WRAPPER(remove_retention_policy); CROSSMODULE_WRAPPER(reorder_chunk); CROSSMODULE_WRAPPER(move_chunk); @@ -337,15 +338,16 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .gapfill_timestamptz_time_bucket = error_no_default_fn_pg_community, /* bgw policies */ - .add_compress_chunks_policy = error_no_default_fn_pg_community, - .add_retention_policy = error_no_default_fn_pg_community, + .policy_compression_add = error_no_default_fn_pg_community, + .policy_compression_proc = error_no_default_fn_pg_community, + .policy_compression_remove = error_no_default_fn_pg_community, .policy_reorder_add = error_no_default_fn_pg_community, .policy_reorder_proc = error_no_default_fn_pg_community, .policy_reorder_remove = error_no_default_fn_pg_community, + .add_retention_policy = error_no_default_fn_pg_community, + .remove_retention_policy = error_no_default_fn_pg_community, .alter_job_schedule = error_no_default_fn_pg_community, .bgw_policy_job_execute = bgw_policy_job_execute_default_fn, - .remove_compress_chunks_policy = error_no_default_fn_pg_community, - .remove_retention_policy = error_no_default_fn_pg_community, .move_chunk = error_no_default_fn_pg_enterprise, .reorder_chunk = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index dadd9f906..00dc8361d 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -49,12 +49,13 @@ typedef struct CrossModuleFunctions bool (*continuous_agg_materialize)(int32 materialization_id, ContinuousAggMatOptions *options); PGFunction add_retention_policy; + PGFunction remove_retention_policy; + PGFunction policy_compression_add; + PGFunction policy_compression_proc; + PGFunction policy_compression_remove; PGFunction policy_reorder_add; PGFunction policy_reorder_proc; PGFunction policy_reorder_remove; - PGFunction add_compress_chunks_policy; - PGFunction remove_retention_policy; - PGFunction remove_compress_chunks_policy; void (*create_upper_paths_hook)(PlannerInfo *, UpperRelationKind, RelOptInfo *, RelOptInfo *, TsRelType input_reltype, Hypertable *ht, void *extra); diff --git a/src/interval.c b/src/interval.c index 24a777b84..5436a7421 100644 --- a/src/interval.c +++ b/src/interval.c @@ -263,7 +263,7 @@ ts_interval_now_func_validate(Oid now_func_oid, Oid open_dim_type) ReleaseSysCache(tuple); } -static Datum +Datum ts_interval_from_now_func_get_datum(int64 interval, Oid time_dim_type, Oid now_func) { Datum now; @@ -314,8 +314,8 @@ noarg_integer_now_func_filter(Form_pg_proc form, void *arg) /* maybe this can be exported later if other parts of the code need * to access the integer_now_func */ -static Oid -get_integer_now_func(Dimension *open_dim) +Oid +ts_get_integer_now_func(Dimension *open_dim) { Oid rettype; Oid now_func; @@ -344,7 +344,7 @@ ts_get_now_internal(Dimension *open_dim) if (IS_INTEGER_TYPE(dim_post_part_type)) { Datum now_datum; - Oid now_func = get_integer_now_func(open_dim); + Oid now_func = ts_get_integer_now_func(open_dim); ts_interval_now_func_validate(now_func, dim_post_part_type); now_datum = OidFunctionCall0(now_func); return ts_time_value_to_internal(now_datum, dim_post_part_type); @@ -430,7 +430,7 @@ ts_interval_subtract_from_now(FormData_ts_interval *invl, Dimension *open_dim) } else { - Oid now_func = get_integer_now_func(open_dim); + Oid now_func = ts_get_integer_now_func(open_dim); ts_interval_now_func_validate(now_func, type_oid); if (InvalidOid == now_func) diff --git a/src/interval.h b/src/interval.h index b102cdd68..3145adfe5 100644 --- a/src/interval.h +++ b/src/interval.h @@ -24,4 +24,8 @@ TSDLLEXPORT int64 ts_get_now_internal(Dimension *open_dim); TSDLLEXPORT FormData_ts_interval * ts_interval_from_sql_input_internal(Dimension *open_dim, Datum interval, Oid interval_type, const char *parameter_name, const char *caller_name); +TSDLLEXPORT Datum ts_interval_from_now_func_get_datum(int64 interval, Oid time_dim_type, + Oid now_func); +TSDLLEXPORT Oid ts_get_integer_now_func(Dimension *open_dim); + #endif /* TIMESCALEDB_INTERVAL */ diff --git a/src/jsonb_utils.c b/src/jsonb_utils.c index 03364c4f8..971e54ac9 100644 --- a/src/jsonb_utils.c +++ b/src/jsonb_utils.c @@ -113,7 +113,7 @@ ts_jsonb_add_pair(JsonbParseState *state, JsonbValue *key, JsonbValue *value) } char * -ts_jsonb_get_str_field(Jsonb *jsonb, const char *key) +ts_jsonb_get_str_field(const Jsonb *jsonb, const char *key) { /* * `jsonb_object_field_text` returns NULL when the field is not found so @@ -136,7 +136,7 @@ ts_jsonb_get_str_field(Jsonb *jsonb, const char *key) } TimestampTz -ts_jsonb_get_time_field(Jsonb *jsonb, const char *key, bool *field_found) +ts_jsonb_get_time_field(const Jsonb *jsonb, const char *key, bool *field_found) { Datum time_datum; char *time_str = ts_jsonb_get_str_field(jsonb, key); @@ -157,7 +157,7 @@ ts_jsonb_get_time_field(Jsonb *jsonb, const char *key, bool *field_found) } int32 -ts_jsonb_get_int32_field(Jsonb *json, const char *key, bool *field_found) +ts_jsonb_get_int32_field(const Jsonb *json, const char *key, bool *field_found) { Datum int_datum; char *int_str = ts_jsonb_get_str_field(json, key); @@ -175,7 +175,7 @@ ts_jsonb_get_int32_field(Jsonb *json, const char *key, bool *field_found) } int64 -ts_jsonb_get_int64_field(Jsonb *json, const char *key, bool *field_found) +ts_jsonb_get_int64_field(const Jsonb *json, const char *key, bool *field_found) { Datum int_datum; char *int_str = ts_jsonb_get_str_field(json, key); @@ -193,7 +193,7 @@ ts_jsonb_get_int64_field(Jsonb *json, const char *key, bool *field_found) } Interval * -ts_jsonb_get_interval_field(Jsonb *json, const char *key) +ts_jsonb_get_interval_field(const Jsonb *json, const char *key) { Datum interval_datum; char *interval_str = ts_jsonb_get_str_field(json, key); @@ -201,7 +201,8 @@ ts_jsonb_get_interval_field(Jsonb *json, const char *key) if (interval_str == NULL) return NULL; - interval_datum = DirectFunctionCall3(interval_in, CStringGetDatum("1 day"), InvalidOid, -1); + interval_datum = + DirectFunctionCall3(interval_in, CStringGetDatum(interval_str), InvalidOid, -1); return DatumGetIntervalP(interval_datum); } diff --git a/src/jsonb_utils.h b/src/jsonb_utils.h index 6e4436a4c..5e3e2f282 100644 --- a/src/jsonb_utils.h +++ b/src/jsonb_utils.h @@ -26,11 +26,13 @@ extern TSDLLEXPORT void ts_jsonb_add_numeric(JsonbParseState *state, const char extern void ts_jsonb_add_value(JsonbParseState *state, const char *key, JsonbValue *value); -extern TSDLLEXPORT char *ts_jsonb_get_str_field(Jsonb *jsonb, const char *key); -extern TSDLLEXPORT Interval *ts_jsonb_get_interval_field(Jsonb *jsonb, const char *key); -extern TSDLLEXPORT TimestampTz ts_jsonb_get_time_field(Jsonb *jsonb, const char *key, +extern TSDLLEXPORT char *ts_jsonb_get_str_field(const Jsonb *jsonb, const char *key); +extern TSDLLEXPORT Interval *ts_jsonb_get_interval_field(const Jsonb *jsonb, const char *key); +extern TSDLLEXPORT TimestampTz ts_jsonb_get_time_field(const Jsonb *jsonb, const char *key, bool *field_found); -extern TSDLLEXPORT int32 ts_jsonb_get_int32_field(Jsonb *json, const char *key, bool *field_found); -extern TSDLLEXPORT int64 ts_jsonb_get_int64_field(Jsonb *json, const char *key, bool *field_found); +extern TSDLLEXPORT int32 ts_jsonb_get_int32_field(const Jsonb *json, const char *key, + bool *field_found); +extern TSDLLEXPORT int64 ts_jsonb_get_int64_field(const Jsonb *json, const char *key, + bool *field_found); #endif /* TIMESCALEDB_JSONB_UTILS_H */ diff --git a/test/expected/extension.out b/test/expected/extension.out index f579b5577..4bd36ec31 100644 --- a/test/expected/extension.out +++ b/test/expected/extension.out @@ -19,7 +19,7 @@ WHERE oid IN ( ORDER BY proname; proname ---------------------------------- - add_compress_chunks_policy + add_compression_policy add_data_node add_dimension add_reorder_policy @@ -54,7 +54,7 @@ WHERE oid IN ( locf move_chunk refresh_continuous_aggregate - remove_compress_chunks_policy + remove_compression_policy remove_reorder_policy remove_retention_policy reorder_chunk diff --git a/tsl/src/bgw_policy/CMakeLists.txt b/tsl/src/bgw_policy/CMakeLists.txt index 5594fc479..1d176cb91 100644 --- a/tsl/src/bgw_policy/CMakeLists.txt +++ b/tsl/src/bgw_policy/CMakeLists.txt @@ -1,7 +1,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/reorder_api.c ${CMAKE_CURRENT_SOURCE_DIR}/drop_chunks_api.c - ${CMAKE_CURRENT_SOURCE_DIR}/compress_chunks_api.c + ${CMAKE_CURRENT_SOURCE_DIR}/compression_api.c ${CMAKE_CURRENT_SOURCE_DIR}/job.c ) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/bgw_policy/compress_chunks_api.h b/tsl/src/bgw_policy/compress_chunks_api.h deleted file mode 100644 index 4854bc4f0..000000000 --- a/tsl/src/bgw_policy/compress_chunks_api.h +++ /dev/null @@ -1,15 +0,0 @@ -/* - * This file and its contents are licensed under the Timescale License. - * Please see the included NOTICE for copyright information and - * LICENSE-TIMESCALE for a copy of the license. - */ - -#ifndef TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H -#define TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H - -#include - -/* User-facing API functions */ -extern Datum compress_chunks_add_policy(PG_FUNCTION_ARGS); -extern Datum compress_chunks_remove_policy(PG_FUNCTION_ARGS); -#endif /* TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H */ diff --git a/tsl/src/bgw_policy/compress_chunks_api.c b/tsl/src/bgw_policy/compression_api.c similarity index 50% rename from tsl/src/bgw_policy/compress_chunks_api.c rename to tsl/src/bgw_policy/compression_api.c index d8638a292..25dae5522 100644 --- a/tsl/src/bgw_policy/compress_chunks_api.c +++ b/tsl/src/bgw_policy/compression_api.c @@ -9,80 +9,140 @@ #include #include -#include "bgw_policy/compress_chunks.h" #include "bgw/job.h" -#include "compress_chunks_api.h" +#include "compression_api.h" #include "errors.h" #include "hypertable.h" #include "hypertable_cache.h" #include "interval.h" #include "license.h" #include "utils.h" +#include "jsonb_utils.h" +#include "bgw_policy/job.h" /* * Default scheduled interval for compress jobs = default chunk length. * If this is non-timestamp based hypertable, then default is 1 day */ #define DEFAULT_SCHEDULE_INTERVAL \ - DatumGetIntervalP(DirectFunctionCall7(make_interval, \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Int32GetDatum(1), \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Float8GetDatum(0))) + DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("1 day"), InvalidOid, -1)) + /* Default max runtime is unlimited for compress chunks */ #define DEFAULT_MAX_RUNTIME \ - DatumGetIntervalP(DirectFunctionCall7(make_interval, \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Float8GetDatum(0))) + DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("0"), InvalidOid, -1)) + /* Right now, there is an infinite number of retries for compress_chunks jobs */ #define DEFAULT_MAX_RETRIES -1 /* Default retry period for reorder_jobs is currently 1 hour */ #define DEFAULT_RETRY_PERIOD \ - DatumGetIntervalP(DirectFunctionCall7(make_interval, \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Int32GetDatum(0), \ - Int32GetDatum(1), \ - Int32GetDatum(0), \ - Float8GetDatum(0))) + DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("1 hour"), InvalidOid, -1)) + +#define POLICY_COMPRESSION_PROC_NAME "policy_compression" +#define CONFIG_KEY_HYPERTABLE_ID "hypertable_id" +#define CONFIG_KEY_OLDER_THAN "older_than" + +int32 +policy_compression_get_hypertable_id(const Jsonb *config) +{ + bool found; + int32 hypertable_id = ts_jsonb_get_int32_field(config, CONFIG_KEY_HYPERTABLE_ID, &found); + + if (!found) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("could not find hypertable_id in config for job"))); + + return hypertable_id; +} + +int64 +policy_compression_get_older_than_int(const Jsonb *config) +{ + bool found; + int32 hypertable_id = ts_jsonb_get_int64_field(config, CONFIG_KEY_OLDER_THAN, &found); + + if (!found) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("could not find older_than in config for job"))); + + return hypertable_id; +} + +Interval * +policy_compression_get_older_than_interval(const Jsonb *config) +{ + Interval *interval = ts_jsonb_get_interval_field(config, CONFIG_KEY_OLDER_THAN); + + if (interval == NULL) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("could not find older_than in config for job"))); + + return interval; +} + +static bool +compression_lag_equal(Oid partitioning_type, Jsonb *config, Oid lag_type, Datum lag_datum) +{ + if (IS_INTEGER_TYPE(partitioning_type)) + { + int64 config_value = policy_compression_get_older_than_int(config); + + switch (lag_type) + { + case INT2OID: + return config_value == DatumGetInt16(lag_datum); + case INT4OID: + return config_value == DatumGetInt32(lag_datum); + case INT8OID: + return config_value == DatumGetInt64(lag_datum); + default: + return false; + } + } + else + { + if (lag_type != INTERVALOID) + return false; + + Interval *config_value = policy_compression_get_older_than_interval(config); + + return DatumGetBool( + DirectFunctionCall2(interval_eq, IntervalPGetDatum(config_value), lag_datum)); + } +} Datum -compress_chunks_add_policy(PG_FUNCTION_ARGS) +policy_compression_proc(PG_FUNCTION_ARGS) +{ + if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1)) + PG_RETURN_VOID(); + + policy_compression_execute(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1)); + + PG_RETURN_VOID(); +} + +Datum +policy_compression_add(PG_FUNCTION_ARGS) { NameData application_name; NameData compress_chunks_name; NameData proc_name, proc_schema, owner; int32 job_id; - BgwPolicyCompressChunks *existing; Oid ht_oid = PG_GETARG_OID(0); Datum older_than_datum = PG_GETARG_DATUM(1); Oid older_than_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1); bool if_not_exists = PG_GETARG_BOOL(2); Interval *default_schedule_interval = DEFAULT_SCHEDULE_INTERVAL; - BgwPolicyCompressChunks policy; Hypertable *hypertable; Cache *hcache; Dimension *dim; - FormData_ts_interval *older_than; ts_hypertable_permissions_check(ht_oid, GetUserId()); Oid owner_id = ts_hypertable_permissions_check(ht_oid, GetUserId()); - older_than = ts_interval_from_sql_input(ht_oid, - older_than_datum, - older_than_type, - "older_than", - "compress_chunks_add_policy"); - /* check if this is a table with compression enabled */ hypertable = ts_hypertable_cache_get_cache_and_entry(ht_oid, CACHE_FLAG_NONE, &hcache); if (!TS_HYPERTABLE_HAS_COMPRESSION(hypertable)) @@ -97,9 +157,14 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS) ts_bgw_job_validate_job_owner(owner_id, JOB_TYPE_COMPRESS_CHUNKS); /* Make sure that an existing policy doesn't exist on this hypertable */ - existing = ts_bgw_policy_compress_chunks_find_by_hypertable(hypertable->fd.id); + List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_COMPRESSION_PROC_NAME, + INTERNAL_SCHEMA_NAME, + hypertable->fd.id); - if (existing != NULL) + dim = hyperspace_get_open_dimension(hypertable->space, 0); + Oid partitioning_type = ts_dimension_get_partition_type(dim); + + if (jobs != NIL) { if (!if_not_exists) { @@ -109,7 +174,12 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS) errmsg("compress chunks policy already exists for hypertable \"%s\"", get_rel_name(ht_oid)))); } - if (ts_interval_equal(&existing->fd.older_than, older_than)) + Assert(list_length(jobs) == 1); + BgwJob *existing = linitial(jobs); + if (compression_lag_equal(partitioning_type, + existing->fd.config, + older_than_type, + older_than_datum)) { /* If all arguments are the same, do nothing */ ts_cache_release(hcache); @@ -127,7 +197,6 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS) PG_RETURN_INT32(-1); } } - dim = hyperspace_get_open_dimension(hypertable->space, 0); if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim))) { @@ -138,10 +207,41 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS) /* insert a new job into jobs table */ namestrcpy(&application_name, "Compress Chunks Background Job"); namestrcpy(&compress_chunks_name, "compress_chunks"); - namestrcpy(&proc_name, ""); - namestrcpy(&proc_schema, ""); + namestrcpy(&proc_name, POLICY_COMPRESSION_PROC_NAME); + namestrcpy(&proc_schema, INTERNAL_SCHEMA_NAME); namestrcpy(&owner, GetUserNameFromId(owner_id, false)); + JsonbParseState *parse_state = NULL; + + pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL); + ts_jsonb_add_int32(parse_state, CONFIG_KEY_HYPERTABLE_ID, hypertable->fd.id); + + switch (older_than_type) + { + case INTERVALOID: + ts_jsonb_add_interval(parse_state, + CONFIG_KEY_OLDER_THAN, + DatumGetIntervalP(older_than_datum)); + break; + case INT2OID: + ts_jsonb_add_int64(parse_state, CONFIG_KEY_OLDER_THAN, DatumGetInt16(older_than_datum)); + break; + case INT4OID: + ts_jsonb_add_int64(parse_state, CONFIG_KEY_OLDER_THAN, DatumGetInt32(older_than_datum)); + break; + case INT8OID: + ts_jsonb_add_int64(parse_state, CONFIG_KEY_OLDER_THAN, DatumGetInt64(older_than_datum)); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unsupported datatype for older_than: %s", + format_type_be(older_than_type)))); + } + + JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL); + Jsonb *config = JsonbValueToJsonb(result); + job_id = ts_bgw_job_insert_relation(&application_name, &compress_chunks_name, default_schedule_interval, @@ -153,34 +253,26 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS) &owner, true, hypertable->fd.id, - NULL); + config); - policy = (BgwPolicyCompressChunks){ .fd = { - .job_id = job_id, - .hypertable_id = ts_hypertable_relid_to_id(ht_oid), - .older_than = *older_than, - } }; - - /* Now, insert a new row in the compress_chunks args table */ - ts_bgw_policy_compress_chunks_insert(&policy); ts_cache_release(hcache); PG_RETURN_INT32(job_id); } Datum -compress_chunks_remove_policy(PG_FUNCTION_ARGS) +policy_compression_remove(PG_FUNCTION_ARGS) { Oid hypertable_oid = PG_GETARG_OID(0); bool if_exists = PG_GETARG_BOOL(1); /* Remove the job, then remove the policy */ int ht_id = ts_hypertable_relid_to_id(hypertable_oid); - BgwPolicyCompressChunks *policy = ts_bgw_policy_compress_chunks_find_by_hypertable(ht_id); - ts_hypertable_permissions_check(hypertable_oid, GetUserId()); - - if (policy == NULL) + List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_COMPRESSION_PROC_NAME, + INTERNAL_SCHEMA_NAME, + ht_id); + if (jobs == NIL) { if (!if_exists) ereport(ERROR, @@ -195,7 +287,12 @@ compress_chunks_remove_policy(PG_FUNCTION_ARGS) } } - ts_bgw_job_delete_by_id(policy->fd.job_id); + ts_hypertable_permissions_check(hypertable_oid, GetUserId()); + + Assert(list_length(jobs) == 1); + BgwJob *job = linitial(jobs); + + ts_bgw_job_delete_by_id(job->fd.id); PG_RETURN_BOOL(true); } diff --git a/tsl/src/bgw_policy/compression_api.h b/tsl/src/bgw_policy/compression_api.h new file mode 100644 index 000000000..4e1ff28d8 --- /dev/null +++ b/tsl/src/bgw_policy/compression_api.h @@ -0,0 +1,23 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#ifndef TIMESCALEDB_TSL_BGW_POLICY_COMPRESSION_API_H +#define TIMESCALEDB_TSL_BGW_POLICY_COMPRESSION_API_H + +#include +#include +#include + +/* User-facing API functions */ +extern Datum policy_compression_add(PG_FUNCTION_ARGS); +extern Datum policy_compression_remove(PG_FUNCTION_ARGS); +extern Datum policy_compression_proc(PG_FUNCTION_ARGS); + +int32 policy_compression_get_hypertable_id(const Jsonb *config); +int64 policy_compression_get_older_than_int(const Jsonb *config); +Interval *policy_compression_get_older_than_interval(const Jsonb *config); + +#endif /* TIMESCALEDB_TSL_BGW_POLICY_COMPRESSION_API_H */ diff --git a/tsl/src/bgw_policy/job.c b/tsl/src/bgw_policy/job.c index ed67d9760..710e4912f 100644 --- a/tsl/src/bgw_policy/job.c +++ b/tsl/src/bgw_policy/job.c @@ -25,8 +25,8 @@ #include "bgw/job_stat.h" #include "bgw_policy/chunk_stats.h" #include "bgw_policy/drop_chunks.h" -#include "bgw_policy/compress_chunks.h" #include "bgw_policy/reorder_api.h" +#include "bgw_policy/compression_api.h" #include "compression/compress_utils.h" #include "continuous_aggs/materialize.h" #include "continuous_aggs/job.h" @@ -89,14 +89,65 @@ get_chunk_id_to_reorder(int32 job_id, Hypertable *ht) -1); } +static int64 +get_compression_window_end_value(Dimension *dim, const Jsonb *config) +{ + Oid partitioning_type = ts_dimension_get_partition_type(dim); + + if (IS_INTEGER_TYPE(partitioning_type)) + { + int64 lag = policy_compression_get_older_than_int(config); + Oid now_func = ts_get_integer_now_func(dim); + + if (InvalidOid == now_func) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("integer_now function must be set"))); + + return ts_time_value_to_internal(ts_interval_from_now_func_get_datum(lag, + partitioning_type, + now_func), + partitioning_type); + } + else + { + Datum res = TimestampTzGetDatum(GetCurrentTimestamp()); + Interval *lag = policy_compression_get_older_than_interval(config); + + switch (partitioning_type) + { + case TIMESTAMPOID: + res = DirectFunctionCall1(timestamptz_timestamp, res); + res = DirectFunctionCall2(timestamp_mi_interval, res, IntervalPGetDatum(lag)); + + return ts_time_value_to_internal(res, partitioning_type); + case TIMESTAMPTZOID: + res = DirectFunctionCall2(timestamptz_mi_interval, res, IntervalPGetDatum(lag)); + + return ts_time_value_to_internal(res, partitioning_type); + case DATEOID: + res = DirectFunctionCall1(timestamptz_timestamp, res); + res = DirectFunctionCall2(timestamp_mi_interval, res, IntervalPGetDatum(lag)); + res = DirectFunctionCall1(timestamp_date, res); + + return ts_time_value_to_internal(res, partitioning_type); + default: + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unknown time type OID %d", partitioning_type))); + pg_unreachable(); + } + } +} + static int32 -get_chunk_to_compress(Hypertable *ht, FormData_ts_interval *older_than) +get_chunk_to_compress(Hypertable *ht, const Jsonb *config) { Dimension *open_dim = hyperspace_get_open_dimension(ht->space, 0); StrategyNumber end_strategy = BTLessStrategyNumber; - Oid partitioning_type = ts_dimension_get_partition_type(open_dim); - int64 end_value = ts_time_value_to_internal(ts_interval_subtract_from_now(older_than, open_dim), - partitioning_type); + + int64 end_value = get_compression_window_end_value(open_dim, config); + return ts_dimension_slice_get_chunkid_to_compress(open_dim->fd.id, InvalidStrategy, /*start_strategy*/ -1, /*start_value*/ @@ -105,7 +156,7 @@ get_chunk_to_compress(Hypertable *ht, FormData_ts_interval *older_than) } bool -execute_reorder_policy(int32 job_id, Jsonb *config, reorder_func reorder, bool fast_continue) +policy_reorder_execute(int32 job_id, Jsonb *config, reorder_func reorder, bool fast_continue) { int chunk_id; bool started = false; @@ -156,8 +207,10 @@ execute_reorder_policy(int32 job_id, Jsonb *config, reorder_func reorder, bool f if (fast_continue && get_chunk_id_to_reorder(job_id, ht) != -1) { - BgwJob *job = ts_bgw_job_find(job_id, CurrentMemoryContext, true); - enable_fast_restart(job, "reorder"); + BgwJob *job = ts_bgw_job_find(job_id, CurrentMemoryContext, false); + + if (job != NULL) + enable_fast_restart(job, "reorder"); } commit: @@ -289,16 +342,14 @@ execute_materialize_continuous_aggregate(BgwJob *job) } bool -execute_compress_chunks_policy(BgwJob *job) +policy_compression_execute(int32 job_id, Jsonb *config) { bool started = false; - BgwPolicyCompressChunks *args; Oid table_relid; Hypertable *ht; Cache *hcache; int32 chunkid; Chunk *chunk = NULL; - int job_id = job->fd.id; if (!IsTransactionOrTransactionBlock()) { @@ -307,19 +358,10 @@ execute_compress_chunks_policy(BgwJob *job) PushActiveSnapshot(GetTransactionSnapshot()); } - /* Get the arguments from the compress_chunks_policy table */ - args = ts_bgw_policy_compress_chunks_find_by_job(job_id); - - if (args == NULL) - ereport(ERROR, - (errcode(ERRCODE_TS_INTERNAL_ERROR), - errmsg("could not run compress_chunks policy #%d because no args in policy table", - job_id))); - - table_relid = ts_hypertable_id_to_relid(args->fd.hypertable_id); + table_relid = ts_hypertable_id_to_relid(policy_compression_get_hypertable_id(config)); ht = ts_hypertable_cache_get_cache_and_entry(table_relid, CACHE_FLAG_NONE, &hcache); - chunkid = get_chunk_to_compress(ht, &args->fd.older_than); + chunkid = get_chunk_to_compress(ht, config); if (chunkid == INVALID_CHUNK_ID) { elog(NOTICE, @@ -338,9 +380,14 @@ execute_compress_chunks_policy(BgwJob *job) NameStr(chunk->fd.table_name)); } - chunkid = get_chunk_to_compress(ht, &args->fd.older_than); + chunkid = get_chunk_to_compress(ht, config); if (chunkid != INVALID_CHUNK_ID) - enable_fast_restart(job, "compress_chunks"); + { + BgwJob *job = ts_bgw_job_find(job_id, CurrentMemoryContext, false); + + if (job != NULL) + enable_fast_restart(job, "compression"); + } ts_cache_release(hcache); if (started) @@ -479,9 +526,8 @@ tsl_bgw_policy_job_execute(BgwJob *job) return execute_drop_chunks_policy(job->fd.id); case JOB_TYPE_CONTINUOUS_AGGREGATE: return execute_materialize_continuous_aggregate(job); - case JOB_TYPE_COMPRESS_CHUNKS: - return execute_compress_chunks_policy(job); + case JOB_TYPE_COMPRESS_CHUNKS: case JOB_TYPE_REORDER: case JOB_TYPE_CUSTOM: return job_execute(job); diff --git a/tsl/src/bgw_policy/job.h b/tsl/src/bgw_policy/job.h index 1432e2ecd..93fc2a60f 100644 --- a/tsl/src/bgw_policy/job.h +++ b/tsl/src/bgw_policy/job.h @@ -18,10 +18,10 @@ typedef void (*reorder_func)(Oid tableOid, Oid indexOid, bool verbose, Oid wait_ Oid destination_tablespace, Oid index_tablespace); /* Functions exposed only for testing */ -extern bool execute_reorder_policy(int32 job_id, Jsonb *config, reorder_func reorder, +extern bool policy_reorder_execute(int32 job_id, Jsonb *config, reorder_func reorder, bool fast_continue); extern bool execute_drop_chunks_policy(int32 job_id); -extern bool execute_compress_chunks_policy(BgwJob *job); +extern bool policy_compression_execute(int32 job_id, Jsonb *config); extern bool tsl_bgw_policy_job_execute(BgwJob *job); extern Datum bgw_policy_alter_job_schedule(PG_FUNCTION_ARGS); diff --git a/tsl/src/bgw_policy/reorder_api.c b/tsl/src/bgw_policy/reorder_api.c index c6c0dda06..25407f7af 100644 --- a/tsl/src/bgw_policy/reorder_api.c +++ b/tsl/src/bgw_policy/reorder_api.c @@ -52,7 +52,7 @@ #define POLICY_REORDER_PROC_NAME "policy_reorder" int32 -policy_reorder_get_hypertable_id(Jsonb *config) +policy_reorder_get_hypertable_id(const Jsonb *config) { bool found; int32 hypertable_id = ts_jsonb_get_int32_field(config, CONFIG_KEY_HYPERTABLE_ID, &found); @@ -66,7 +66,7 @@ policy_reorder_get_hypertable_id(Jsonb *config) } char * -policy_reorder_get_index_name(Jsonb *config) +policy_reorder_get_index_name(const Jsonb *config) { char *index_name = NULL; @@ -108,10 +108,10 @@ check_valid_index(Hypertable *ht, Name index_name) Datum policy_reorder_proc(PG_FUNCTION_ARGS) { - int32 job_id = PG_GETARG_INT32(0); - Jsonb *config = PG_GETARG_JSONB_P(1); + if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1)) + PG_RETURN_VOID(); - execute_reorder_policy(job_id, config, reorder_chunk, true); + policy_reorder_execute(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1), reorder_chunk, true); PG_RETURN_VOID(); } @@ -235,7 +235,6 @@ policy_reorder_remove(PG_FUNCTION_ARGS) Oid hypertable_oid = PG_GETARG_OID(0); bool if_exists = PG_GETARG_BOOL(1); - /* Remove the job, then remove the policy */ int ht_id = ts_hypertable_relid_to_id(hypertable_oid); List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REORDER_PROC_NAME, diff --git a/tsl/src/bgw_policy/reorder_api.h b/tsl/src/bgw_policy/reorder_api.h index e73b0481f..a9f7f2df2 100644 --- a/tsl/src/bgw_policy/reorder_api.h +++ b/tsl/src/bgw_policy/reorder_api.h @@ -14,7 +14,7 @@ extern Datum policy_reorder_add(PG_FUNCTION_ARGS); extern Datum policy_reorder_remove(PG_FUNCTION_ARGS); extern Datum policy_reorder_proc(PG_FUNCTION_ARGS); -extern int32 policy_reorder_get_hypertable_id(Jsonb *config); -extern char *policy_reorder_get_index_name(Jsonb *config); +extern int32 policy_reorder_get_hypertable_id(const Jsonb *config); +extern char *policy_reorder_get_index_name(const Jsonb *config); #endif /* TIMESCALEDB_TSL_BGW_POLICY_REORDER_API_H */ diff --git a/tsl/src/compression/create.c b/tsl/src/compression/create.c index abc46d8f0..f8b36c450 100644 --- a/tsl/src/compression/create.c +++ b/tsl/src/compression/create.c @@ -42,7 +42,6 @@ #include "license.h" #include "trigger.h" #include "utils.h" -#include "bgw_policy/compress_chunks.h" /* entrypoint * tsl_process_compress_table : is the entry point. @@ -815,13 +814,10 @@ static void check_modify_compression_options(Hypertable *ht, WithClauseResult *with_clause_options) { bool compress_enable = DatumGetBool(with_clause_options[CompressEnabled].parsed); - bool compression_has_policy; bool compressed_chunks_exist; bool compression_already_enabled = TS_HYPERTABLE_HAS_COMPRESSION(ht); compressed_chunks_exist = compression_already_enabled && ts_chunk_exists_with_compression(ht->fd.id); - compression_has_policy = - compression_already_enabled && ts_bgw_policy_compress_chunks_find_by_hypertable(ht->fd.id); if (compressed_chunks_exist) ereport(ERROR, @@ -829,12 +825,6 @@ check_modify_compression_options(Hypertable *ht, WithClauseResult *with_clause_o errmsg("cannot change compression options as compressed chunks already exist for " "this table"))); - if (compression_has_policy) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot change compression options as a compression policy exists on the " - "table"))); - /* Require both order by and segment by when altering if they were previously set because * otherwise it's not clear what the default value means: does it mean leave as-is or is it an * empty list. */ diff --git a/tsl/src/init.c b/tsl/src/init.c index 392358216..23caea2fe 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -6,7 +6,7 @@ #include #include -#include "bgw_policy/compress_chunks_api.h" +#include "bgw_policy/compression_api.h" #include "bgw_policy/drop_chunks_api.h" #include "bgw_policy/job.h" #include "bgw_policy/reorder_api.h" @@ -93,15 +93,18 @@ CrossModuleFunctions tsl_cm_functions = { .bgw_policy_job_execute = tsl_bgw_policy_job_execute, .continuous_agg_materialize = continuous_agg_materialize, .add_retention_policy = drop_chunks_add_policy, + .remove_retention_policy = drop_chunks_remove_policy, + .policy_compression_add = policy_compression_add, + .policy_compression_proc = policy_compression_proc, + .policy_compression_remove = policy_compression_remove, .policy_reorder_add = policy_reorder_add, .policy_reorder_proc = policy_reorder_proc, .policy_reorder_remove = policy_reorder_remove, - .add_compress_chunks_policy = compress_chunks_add_policy, - .remove_retention_policy = drop_chunks_remove_policy, - .remove_compress_chunks_policy = compress_chunks_remove_policy, .create_upper_paths_hook = tsl_create_upper_paths_hook, .set_rel_pathlist_dml = tsl_set_rel_pathlist_dml, .set_rel_pathlist_query = tsl_set_rel_pathlist_query, + + /* gapfill */ .gapfill_marker = gapfill_marker, .gapfill_int16_time_bucket = gapfill_int16_time_bucket, .gapfill_int32_time_bucket = gapfill_int32_time_bucket, @@ -109,6 +112,7 @@ CrossModuleFunctions tsl_cm_functions = { .gapfill_date_time_bucket = gapfill_date_time_bucket, .gapfill_timestamp_time_bucket = gapfill_timestamp_time_bucket, .gapfill_timestamptz_time_bucket = gapfill_timestamptz_time_bucket, + .alter_job_schedule = bgw_policy_alter_job_schedule, .reorder_chunk = tsl_reorder_chunk, .move_chunk = tsl_move_chunk, diff --git a/tsl/test/expected/compression_bgw.out b/tsl/test/expected/compression_bgw.out index 709ba35d5..d785c03a4 100644 --- a/tsl/test/expected/compression_bgw.out +++ b/tsl/test/expected/compression_bgw.out @@ -31,7 +31,7 @@ select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days':: --TEST 1-- --cannot set policy without enabling compression -- \set ON_ERROR_STOP 0 -select add_compress_chunks_policy('conditions', '60d'::interval); +select add_compression_policy('conditions', '60d'::interval); ERROR: can add compress_chunks policy only on hypertables with compression enabled \set ON_ERROR_STOP 1 -- TEST2 -- @@ -40,23 +40,12 @@ alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby NOTICE: adding index _compressed_hypertable_2_location__ts_meta_sequence_num_idx ON _timescaledb_internal._compressed_hypertable_2 USING BTREE(location, _ts_meta_sequence_num) insert into conditions select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75; -select add_compress_chunks_policy('conditions', '60d'::interval); - add_compress_chunks_policy ----------------------------- - 1000 -(1 row) - -select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks; - compressjob_id | hypertable_id | older_than -----------------+---------------+------------------ - 1000 | 1 | (t,"@ 60 days",) -(1 row) - +select add_compression_policy('conditions', '60d'::interval) AS compressjob_id \gset -select * from _timescaledb_config.bgw_job where job_type like 'compress%'; - id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config -------+--------------------------------+-----------------+--------------------+-------------+-------------+--------------+-----------+-------------+-------------------+-----------+---------------+-------- - 1000 | Compress Chunks Background Job | compress_chunks | @ 15 days 12 hours | @ 0 | -1 | @ 1 hour | | | default_perm_user | t | 1 | +select * from _timescaledb_config.bgw_job where id = :compressjob_id; + id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config +------+--------------------------------+-----------------+--------------------+-------------+-------------+--------------+--------------------+-----------------------+-------------------+-----------+---------------+------------------------------------------------- + 1000 | Compress Chunks Background Job | compress_chunks | @ 15 days 12 hours | @ 0 | -1 | @ 1 hour | policy_compression | _timescaledb_internal | default_perm_user | t | 1 | {"older_than": "@ 60 days", "hypertable_id": 1} (1 row) select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s'); @@ -66,9 +55,9 @@ select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s'); (1 row) select * from _timescaledb_config.bgw_job where job_type like 'compress%'; - id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config -------+--------------------------------+-----------------+-------------------+-------------+-------------+--------------+-----------+-------------+-------------------+-----------+---------------+-------- - 1000 | Compress Chunks Background Job | compress_chunks | @ 1 sec | @ 0 | -1 | @ 1 hour | | | default_perm_user | t | 1 | + id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config +------+--------------------------------+-----------------+-------------------+-------------+-------------+--------------+--------------------+-----------------------+-------------------+-----------+---------------+------------------------------------------------- + 1000 | Compress Chunks Background Job | compress_chunks | @ 1 sec | @ 0 | -1 | @ 1 hour | policy_compression | _timescaledb_internal | default_perm_user | t | 1 | {"older_than": "@ 60 days", "hypertable_id": 1} (1 row) insert into conditions @@ -90,35 +79,36 @@ select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_b -- TEST 4 -- --cannot set another policy \set ON_ERROR_STOP 0 -select add_compress_chunks_policy('conditions', '60d'::interval, if_not_exists=>true); +select add_compression_policy('conditions', '60d'::interval, if_not_exists=>true); NOTICE: compress chunks policy already exists on hypertable "conditions", skipping - add_compress_chunks_policy ----------------------------- - -1 + add_compression_policy +------------------------ + -1 (1 row) -select add_compress_chunks_policy('conditions', '60d'::interval); +select add_compression_policy('conditions', '60d'::interval); ERROR: compress chunks policy already exists for hypertable "conditions" -select add_compress_chunks_policy('conditions', '30d'::interval, if_not_exists=>true); +select add_compression_policy('conditions', '30d'::interval, if_not_exists=>true); WARNING: could not add compress_chunks policy due to existing policy on hypertable with different arguments - add_compress_chunks_policy ----------------------------- - -1 + add_compression_policy +------------------------ + -1 (1 row) \set ON_ERROR_STOP 1 --TEST 5 -- -- drop the policy -- -select remove_compress_chunks_policy('conditions'); - remove_compress_chunks_policy -------------------------------- +select remove_compression_policy('conditions'); + remove_compression_policy +--------------------------- t (1 row) -select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks; - compressjob_id | hypertable_id | older_than -----------------+---------------+------------ -(0 rows) +select count(*) from _timescaledb_config.bgw_job WHERE id>=1000; + count +------- + 0 +(1 row) --TEST 6 -- -- try to execute the policy after it has been dropped -- @@ -148,17 +138,12 @@ select set_integer_now_func('test_table_int', 'dummy_now'); insert into test_table_int select generate_series(1,5), 10; alter table test_table_int set (timescaledb.compress); -select add_compress_chunks_policy('test_table_int', 2::int); - add_compress_chunks_policy ----------------------------- - 1001 -(1 row) - -select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks -where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name like 'test_table_int'); - compressjob_id | hypertable_id | older_than -----------------+---------------+------------ - 1001 | 3 | (f,,2) +select add_compression_policy('test_table_int', 2::int) AS compressjob_id +\gset +select * from _timescaledb_config.bgw_job where id=:compressjob_id; + id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config +------+--------------------------------+-----------------+-------------------+-------------+-------------+--------------+--------------------+-----------------------+-------------------+-----------+---------------+--------------------------------------- + 1001 | Compress Chunks Background Job | compress_chunks | @ 1 day | @ 0 | -1 | @ 1 hour | policy_compression | _timescaledb_internal | default_perm_user | t | 3 | {"older_than": 2, "hypertable_id": 3} (1 row) \gset @@ -203,7 +188,7 @@ SELECT set_integer_now_func('test_table_nologin', 'dummy_now'); ALTER TABLE test_table_nologin set (timescaledb.compress); \set ON_ERROR_STOP 0 -SELECT add_compress_chunks_policy('test_table_nologin', 2::int); +SELECT add_compression_policy('test_table_nologin', 2::int); ERROR: permission denied to start compress_chunks background process as role "nologin_role" \set ON_ERROR_STOP 1 RESET ROLE; @@ -260,8 +245,8 @@ SELECT COUNT(*) AS dropped_chunks_count 14 (1 row) -SELECT add_compress_chunks_policy AS job_id - FROM add_compress_chunks_policy('conditions', INTERVAL '1 day') \gset +SELECT add_compression_policy AS job_id + FROM add_compression_policy('conditions', INTERVAL '1 day') \gset SELECT test_compress_chunks_policy(:job_id); test_compress_chunks_policy ----------------------------- diff --git a/tsl/test/expected/compression_ddl.out b/tsl/test/expected/compression_ddl.out index 4760164d1..92325126a 100644 --- a/tsl/test/expected/compression_ddl.out +++ b/tsl/test/expected/compression_ddl.out @@ -452,13 +452,13 @@ SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable; (1 row) --add policy to make sure it's dropped later -select add_compress_chunks_policy(:'UNCOMPRESSED_HYPER_NAME', interval '1 day'); - add_compress_chunks_policy ----------------------------- - 1000 +select add_compression_policy(:'UNCOMPRESSED_HYPER_NAME', interval '1 day'); + add_compression_policy +------------------------ + 1000 (1 row) -SELECT count(*) FROM _timescaledb_config.bgw_policy_compress_chunks; +SELECT count(*) FROM _timescaledb_config.bgw_job WHERE job_type = 'compress_chunks'; count ------- 1 @@ -479,7 +479,7 @@ SELECT count(*) FROM _timescaledb_catalog.hypertable_compression; (1 row) --verify that the policy is gone -SELECT count(*) FROM _timescaledb_config.bgw_policy_compress_chunks; +SELECT count(*) FROM _timescaledb_config.bgw_job WHERE job_type = 'compress_chunks'; count ------- 0 @@ -560,19 +560,18 @@ AS sub; 1 (1 row) -select add_compress_chunks_policy('test1', interval '1 day'); - add_compress_chunks_policy ----------------------------- - 1002 +select add_compression_policy('test1', interval '1 day'); + add_compression_policy +------------------------ + 1002 (1 row) \set ON_ERROR_STOP 0 ALTER table test1 set (timescaledb.compress='f'); -ERROR: cannot change compression options as a compression policy exists on the table \set ON_ERROR_STOP 1 -select remove_compress_chunks_policy('test1'); - remove_compress_chunks_policy -------------------------------- +select remove_compression_policy('test1'); + remove_compression_policy +--------------------------- t (1 row) diff --git a/tsl/test/expected/compression_permissions.out b/tsl/test/expected/compression_permissions.out index 2f2aed013..2e9ef599b 100644 --- a/tsl/test/expected/compression_permissions.out +++ b/tsl/test/expected/compression_permissions.out @@ -39,18 +39,18 @@ ERROR: must be owner of hypertable "conditions" select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name) FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'conditions'; ERROR: must be owner of hypertable "conditions" -select add_compress_chunks_policy('conditions', '1day'::interval); +select add_compression_policy('conditions', '1day'::interval); ERROR: must be owner of hypertable "conditions" \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER -select add_compress_chunks_policy('conditions', '1day'::interval); - add_compress_chunks_policy ----------------------------- - 1000 +select add_compression_policy('conditions', '1day'::interval); + add_compression_policy +------------------------ + 1000 (1 row) \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 --try dropping policy -select remove_compress_chunks_policy('conditions', true); +select remove_compression_policy('conditions', true); ERROR: must be owner of hypertable "conditions" --Tests for GRANTS. -- as owner grant select , compress chunk and check SELECT works diff --git a/tsl/test/sql/compression_bgw.sql b/tsl/test/sql/compression_bgw.sql index a3d589288..e273eb1da 100644 --- a/tsl/test/sql/compression_bgw.sql +++ b/tsl/test/sql/compression_bgw.sql @@ -27,7 +27,7 @@ select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days':: --TEST 1-- --cannot set policy without enabling compression -- \set ON_ERROR_STOP 0 -select add_compress_chunks_policy('conditions', '60d'::interval); +select add_compression_policy('conditions', '60d'::interval); \set ON_ERROR_STOP 1 -- TEST2 -- @@ -36,10 +36,10 @@ alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby insert into conditions select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75; -select add_compress_chunks_policy('conditions', '60d'::interval); -select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks; +select add_compression_policy('conditions', '60d'::interval) AS compressjob_id \gset -select * from _timescaledb_config.bgw_job where job_type like 'compress%'; + +select * from _timescaledb_config.bgw_job where id = :compressjob_id; select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s'); select * from _timescaledb_config.bgw_job where job_type like 'compress%'; insert into conditions @@ -53,15 +53,15 @@ select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_b -- TEST 4 -- --cannot set another policy \set ON_ERROR_STOP 0 -select add_compress_chunks_policy('conditions', '60d'::interval, if_not_exists=>true); -select add_compress_chunks_policy('conditions', '60d'::interval); -select add_compress_chunks_policy('conditions', '30d'::interval, if_not_exists=>true); +select add_compression_policy('conditions', '60d'::interval, if_not_exists=>true); +select add_compression_policy('conditions', '60d'::interval); +select add_compression_policy('conditions', '30d'::interval, if_not_exists=>true); \set ON_ERROR_STOP 1 --TEST 5 -- -- drop the policy -- -select remove_compress_chunks_policy('conditions'); -select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks; +select remove_compression_policy('conditions'); +select count(*) from _timescaledb_config.bgw_job WHERE id>=1000; --TEST 6 -- -- try to execute the policy after it has been dropped -- @@ -81,10 +81,10 @@ create or replace function dummy_now() returns BIGINT LANGUAGE SQL IMMUTABLE as select set_integer_now_func('test_table_int', 'dummy_now'); insert into test_table_int select generate_series(1,5), 10; alter table test_table_int set (timescaledb.compress); -select add_compress_chunks_policy('test_table_int', 2::int); +select add_compression_policy('test_table_int', 2::int) AS compressjob_id +\gset -select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks -where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name like 'test_table_int'); +select * from _timescaledb_config.bgw_job where id=:compressjob_id; \gset select test_compress_chunks_policy(:compressjob_id); select test_compress_chunks_policy(:compressjob_id); @@ -101,7 +101,7 @@ SELECT create_hypertable('test_table_nologin', 'time', chunk_time_interval => 1) SELECT set_integer_now_func('test_table_nologin', 'dummy_now'); ALTER TABLE test_table_nologin set (timescaledb.compress); \set ON_ERROR_STOP 0 -SELECT add_compress_chunks_policy('test_table_nologin', 2::int); +SELECT add_compression_policy('test_table_nologin', 2::int); \set ON_ERROR_STOP 1 RESET ROLE; REVOKE NOLOGIN_ROLE FROM :ROLE_DEFAULT_PERM_USER; @@ -149,6 +149,6 @@ SELECT COUNT(*) AS dropped_chunks_count FROM _timescaledb_catalog.chunk WHERE dropped = TRUE; -SELECT add_compress_chunks_policy AS job_id - FROM add_compress_chunks_policy('conditions', INTERVAL '1 day') \gset +SELECT add_compression_policy AS job_id + FROM add_compression_policy('conditions', INTERVAL '1 day') \gset SELECT test_compress_chunks_policy(:job_id); diff --git a/tsl/test/sql/compression_ddl.sql b/tsl/test/sql/compression_ddl.sql index 60910cf8f..0dc013c63 100644 --- a/tsl/test/sql/compression_ddl.sql +++ b/tsl/test/sql/compression_ddl.sql @@ -301,8 +301,8 @@ WHERE hypertable.table_name like 'test1' ORDER BY hypertable.id LIMIT 1 \gset --before the drop there are 2 hypertables: the compressed and uncompressed ones SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable; --add policy to make sure it's dropped later -select add_compress_chunks_policy(:'UNCOMPRESSED_HYPER_NAME', interval '1 day'); -SELECT count(*) FROM _timescaledb_config.bgw_policy_compress_chunks; +select add_compression_policy(:'UNCOMPRESSED_HYPER_NAME', interval '1 day'); +SELECT count(*) FROM _timescaledb_config.bgw_job WHERE job_type = 'compress_chunks'; DROP TABLE :UNCOMPRESSED_HYPER_NAME; @@ -311,7 +311,7 @@ SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable; SELECT count(*) FROM _timescaledb_catalog.hypertable_compression; --verify that the policy is gone -SELECT count(*) FROM _timescaledb_config.bgw_policy_compress_chunks; +SELECT count(*) FROM _timescaledb_config.bgw_job WHERE job_type = 'compress_chunks'; ROLLBACK; @@ -370,12 +370,12 @@ WHERE hypertable.table_name like 'test1' and chunk.compressed_chunk_id IS NOT NU ) AS sub; -select add_compress_chunks_policy('test1', interval '1 day'); +select add_compression_policy('test1', interval '1 day'); \set ON_ERROR_STOP 0 ALTER table test1 set (timescaledb.compress='f'); \set ON_ERROR_STOP 1 -select remove_compress_chunks_policy('test1'); +select remove_compression_policy('test1'); ALTER table test1 set (timescaledb.compress='f'); --only one hypertable left @@ -403,8 +403,6 @@ WHERE hypertable.table_name like 'test1' and chunk.compressed_chunk_id IS NULL O ) AS sub; - - DROP TABLE test1 CASCADE; DROP TABLESPACE tablespace1; DROP TABLESPACE tablespace2; @@ -447,8 +445,3 @@ FROM _timescaledb_catalog.chunk chunk INNER JOIN _timescaledb_catalog.hypertable hypertable ON (chunk.hypertable_id = hypertable.id) WHERE hypertable.table_name like 'test1' ORDER BY chunk.id ) as subq; - - - - - diff --git a/tsl/test/sql/compression_permissions.sql b/tsl/test/sql/compression_permissions.sql index cebe95305..7836a02bd 100644 --- a/tsl/test/sql/compression_permissions.sql +++ b/tsl/test/sql/compression_permissions.sql @@ -35,13 +35,13 @@ select compress_chunk(ch1.schema_name|| '.' || ch1.table_name) FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'conditions' and ch1.compressed_chunk_id IS NULL; select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name) FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'conditions'; -select add_compress_chunks_policy('conditions', '1day'::interval); +select add_compression_policy('conditions', '1day'::interval); \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER -select add_compress_chunks_policy('conditions', '1day'::interval); +select add_compression_policy('conditions', '1day'::interval); \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2 --try dropping policy -select remove_compress_chunks_policy('conditions', true); +select remove_compression_policy('conditions', true); --Tests for GRANTS. -- as owner grant select , compress chunk and check SELECT works diff --git a/tsl/test/src/test_auto_policy.c b/tsl/test/src/test_auto_policy.c index ba3d77fae..f66ac6882 100644 --- a/tsl/test/src/test_auto_policy.c +++ b/tsl/test/src/test_auto_policy.c @@ -52,7 +52,7 @@ ts_test_auto_reorder(PG_FUNCTION_ARGS) "that cannot accept type record"))); } - execute_reorder_policy(job->fd.id, job->fd.config, dummy_reorder_func, false); + policy_reorder_execute(job->fd.id, job->fd.config, dummy_reorder_func, false); values[0] = ObjectIdGetDatum(chunk_oid); values[1] = ObjectIdGetDatum(index_oid); @@ -80,5 +80,5 @@ ts_test_auto_compress_chunks(PG_FUNCTION_ARGS) /*since we didn't come through the scheduler, need to mark job * as started to create a job_stat record */ ts_bgw_job_stat_mark_start(job_id); - return execute_compress_chunks_policy(job); + return policy_compression_execute(job->fd.id, job->fd.config); }