mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 02:23:49 +08:00
Refactor compress chunk policy
This patch changes the compression policy to store its configuration in the bgw_job table and removes the bgw_policy_compress_chunks table.
This commit is contained in:
parent
68aee5144c
commit
0d5f1ffc83
@ -38,6 +38,12 @@ CREATE OR REPLACE FUNCTION add_retention_policy(
|
||||
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_add_retention_policy'
|
||||
LANGUAGE C VOLATILE STRICT;
|
||||
|
||||
-- Remove the retention policy from a hypertable
|
||||
CREATE OR REPLACE FUNCTION remove_retention_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID
|
||||
AS '@MODULE_PATHNAME@', 'ts_remove_retention_policy'
|
||||
LANGUAGE C VOLATILE STRICT;
|
||||
|
||||
/* reorder policy */
|
||||
CREATE OR REPLACE FUNCTION add_reorder_policy(hypertable REGCLASS, index_name NAME, if_not_exists BOOL = false) RETURNS INTEGER
|
||||
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_add'
|
||||
LANGUAGE C VOLATILE STRICT;
|
||||
@ -50,19 +56,19 @@ CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_reorder(job_id INTEGER,
|
||||
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_proc'
|
||||
LANGUAGE C;
|
||||
|
||||
CREATE OR REPLACE FUNCTION add_compress_chunks_policy(hypertable REGCLASS, older_than "any", if_not_exists BOOL = false)
|
||||
/* compression policy */
|
||||
CREATE OR REPLACE FUNCTION add_compression_policy(hypertable REGCLASS, older_than "any", if_not_exists BOOL = false)
|
||||
RETURNS INTEGER
|
||||
AS '@MODULE_PATHNAME@', 'ts_add_compress_chunks_policy'
|
||||
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
|
||||
LANGUAGE C VOLATILE STRICT;
|
||||
|
||||
-- Remove the retention policy from a hypertable
|
||||
CREATE OR REPLACE FUNCTION remove_retention_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID
|
||||
AS '@MODULE_PATHNAME@', 'ts_remove_retention_policy'
|
||||
CREATE OR REPLACE FUNCTION remove_compression_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS BOOL
|
||||
AS '@MODULE_PATHNAME@', 'ts_policy_compression_remove'
|
||||
LANGUAGE C VOLATILE STRICT;
|
||||
|
||||
CREATE OR REPLACE FUNCTION remove_compress_chunks_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS BOOL
|
||||
AS '@MODULE_PATHNAME@', 'ts_remove_compress_chunks_policy'
|
||||
LANGUAGE C VOLATILE STRICT;
|
||||
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression(job_id INTEGER, config JSONB)
|
||||
AS '@MODULE_PATHNAME@', 'ts_policy_compression_proc'
|
||||
LANGUAGE C;
|
||||
|
||||
-- Returns the updated job schedule values
|
||||
CREATE OR REPLACE FUNCTION alter_job_schedule(
|
||||
|
@ -364,15 +364,6 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_chunk_size (
|
||||
);
|
||||
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', '');
|
||||
|
||||
CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_policy_compress_chunks(
|
||||
job_id INTEGER PRIMARY KEY REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE,
|
||||
hypertable_id INTEGER UNIQUE NOT NULL REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE,
|
||||
older_than _timescaledb_catalog.ts_interval NOT NULL,
|
||||
CONSTRAINT valid_older_than CHECK(_timescaledb_internal.valid_ts_interval(older_than))
|
||||
);
|
||||
|
||||
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_policy_compress_chunks', '');
|
||||
|
||||
--This stores commit decisions for 2pc remote txns. Abort decisions are never stored.
|
||||
--If a PREPARE TRANSACTION fails for any data node then the entire
|
||||
--frontend transaction will be rolled back and no rows will be stored.
|
||||
|
@ -162,6 +162,23 @@ FROM _timescaledb_config.bgw_policy_reorder reorder
|
||||
WHERE job_type = 'reorder'
|
||||
AND job.id = reorder.job_id;
|
||||
|
||||
-- migrate compression jobs
|
||||
UPDATE
|
||||
_timescaledb_config.bgw_job job
|
||||
SET proc_name = 'policy_compression',
|
||||
proc_schema = '_timescaledb_internal',
|
||||
config = jsonb_build_object('hypertable_id', c.hypertable_id, 'older_than', CASE WHEN (older_than).is_time_interval THEN (older_than).time_interval::text ELSE (older_than).integer_interval::text END),
|
||||
hypertable_id = c.hypertable_id,
|
||||
OWNER = (
|
||||
SELECT relowner::regrole::text
|
||||
FROM _timescaledb_catalog.hypertable ht,
|
||||
pg_class cl
|
||||
WHERE ht.id = c.hypertable_id
|
||||
AND cl.oid = format('%I.%I', schema_name, table_name)::regclass)
|
||||
FROM _timescaledb_config.bgw_policy_compress_chunks c
|
||||
WHERE job_type = 'compress_chunks'
|
||||
AND job.id = c.job_id;
|
||||
|
||||
--rewrite catalog table to not break catalog scans on tables with missingval optimization
|
||||
CLUSTER _timescaledb_config.bgw_job USING bgw_job_pkey;
|
||||
ALTER TABLE _timescaledb_config.bgw_job SET WITHOUT CLUSTER;
|
||||
@ -169,5 +186,10 @@ ALTER TABLE _timescaledb_config.bgw_job SET WITHOUT CLUSTER;
|
||||
CREATE INDEX IF NOT EXISTS bgw_job_proc_hypertable_id_idx ON _timescaledb_config.bgw_job(proc_name,proc_schema,hypertable_id);
|
||||
|
||||
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_config.bgw_policy_reorder;
|
||||
DROP TABLE _timescaledb_config.bgw_policy_reorder CASCADE;
|
||||
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_config.bgw_policy_compress_chunks;
|
||||
DROP TABLE IF EXISTS _timescaledb_config.bgw_policy_reorder CASCADE;
|
||||
DROP TABLE IF EXISTS _timescaledb_config.bgw_policy_compress_chunks CASCADE;
|
||||
|
||||
DROP FUNCTION IF EXISTS add_compress_chunks_policy;
|
||||
DROP FUNCTION IF EXISTS remove_compress_chunks_policy;
|
||||
|
||||
|
@ -68,9 +68,8 @@ WHERE job_type = 'reorder';
|
||||
CREATE OR REPLACE VIEW timescaledb_information.policy_stats as
|
||||
SELECT format('%1$I.%2$I', ht.schema_name, ht.table_name)::regclass as hypertable, p.job_id, j.job_type, js.last_run_success, js.last_finish, js.last_successful_finish, js.last_start, js.next_start,
|
||||
js.total_runs, js.total_failures
|
||||
FROM (SELECT id AS job_id, hypertable_id FROM _timescaledb_config.bgw_job WHERE job_type IN ('reorder')
|
||||
FROM (SELECT id AS job_id, hypertable_id FROM _timescaledb_config.bgw_job WHERE job_type IN ('reorder','compress_chunks')
|
||||
UNION SELECT job_id, hypertable_id FROM _timescaledb_config.bgw_policy_drop_chunks
|
||||
UNION SELECT job_id, hypertable_id FROM _timescaledb_config.bgw_policy_compress_chunks
|
||||
UNION SELECT job_id, raw_hypertable_id FROM _timescaledb_catalog.continuous_agg) p
|
||||
INNER JOIN _timescaledb_catalog.hypertable ht ON p.hypertable_id = ht.id
|
||||
INNER JOIN _timescaledb_config.bgw_job j ON p.job_id = j.id
|
||||
|
@ -32,7 +32,6 @@
|
||||
#include "telemetry/telemetry.h"
|
||||
#include "bgw_policy/chunk_stats.h"
|
||||
#include "bgw_policy/drop_chunks.h"
|
||||
#include "bgw_policy/compress_chunks.h"
|
||||
#include "bgw_policy/policy.h"
|
||||
#include "scan_iterator.h"
|
||||
|
||||
@ -128,14 +127,8 @@ ts_bgw_job_owner(BgwJob *job)
|
||||
|
||||
return ts_rel_get_owner(ts_continuous_agg_get_user_view_oid(ca));
|
||||
}
|
||||
case JOB_TYPE_COMPRESS_CHUNKS:
|
||||
{
|
||||
BgwPolicyCompressChunks *policy = ts_bgw_policy_compress_chunks_find_by_job(job->fd.id);
|
||||
if (policy == NULL)
|
||||
elog(ERROR, "compress chunks policy for job with id \"%d\" not found", job->fd.id);
|
||||
return ts_rel_get_owner(ts_hypertable_id_to_relid(policy->fd.hypertable_id));
|
||||
}
|
||||
|
||||
case JOB_TYPE_COMPRESS_CHUNKS:
|
||||
case JOB_TYPE_REORDER:
|
||||
case JOB_TYPE_CUSTOM:
|
||||
return get_role_oid(NameStr(job->fd.owner), false);
|
||||
@ -570,7 +563,6 @@ bgw_job_tuple_delete(TupleInfo *ti, void *data)
|
||||
|
||||
/* Delete any policy args associated with this job */
|
||||
ts_bgw_policy_drop_chunks_delete_row_only_by_job_id(job_id);
|
||||
ts_bgw_policy_compress_chunks_delete_row_only_by_job_id(job_id);
|
||||
|
||||
/* Delete any stats in bgw_policy_chunk_stats related to this job */
|
||||
ts_bgw_policy_chunk_stats_delete_row_only_by_job_id(job_id);
|
||||
|
@ -1,6 +1,5 @@
|
||||
set(SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/drop_chunks.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/compress_chunks.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/policy.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/chunk_stats.c
|
||||
)
|
||||
|
@ -1,163 +0,0 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Apache License 2.0.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-APACHE for a copy of the license.
|
||||
*/
|
||||
#include <postgres.h>
|
||||
#include <funcapi.h>
|
||||
|
||||
#include "bgw/job.h"
|
||||
#include "catalog.h"
|
||||
#include "compress_chunks.h"
|
||||
#include "hypertable.h"
|
||||
#include "interval.h"
|
||||
#include "policy.h"
|
||||
#include "scanner.h"
|
||||
#include "scan_iterator.h"
|
||||
#include "utils.h"
|
||||
|
||||
#include "compat.h"
|
||||
|
||||
static ScanTupleResult
|
||||
bgw_policy_compress_chunks_tuple_found(TupleInfo *ti, void *const data)
|
||||
{
|
||||
BgwPolicyCompressChunks **policy = data;
|
||||
bool nulls[Natts_bgw_policy_compress_chunks];
|
||||
Datum values[Natts_bgw_policy_compress_chunks];
|
||||
bool should_free;
|
||||
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
|
||||
|
||||
heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
|
||||
|
||||
*policy = MemoryContextAllocZero(ti->mctx, sizeof(BgwPolicyCompressChunks));
|
||||
Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)]);
|
||||
(*policy)->fd.job_id =
|
||||
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)]);
|
||||
|
||||
Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)]);
|
||||
(*policy)->fd.hypertable_id = DatumGetInt32(
|
||||
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)]);
|
||||
|
||||
Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)]);
|
||||
|
||||
(*policy)->fd.older_than = *ts_interval_from_tuple(
|
||||
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)]);
|
||||
|
||||
if (should_free)
|
||||
heap_freetuple(tuple);
|
||||
|
||||
return SCAN_CONTINUE;
|
||||
}
|
||||
|
||||
static ScanTupleResult
|
||||
compress_policy_delete_row_tuple_found(TupleInfo *ti, void *const data)
|
||||
{
|
||||
CatalogSecurityContext sec_ctx;
|
||||
|
||||
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
|
||||
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
|
||||
ts_catalog_restore_user(&sec_ctx);
|
||||
|
||||
return SCAN_CONTINUE;
|
||||
}
|
||||
|
||||
/* deletes only from the compress_chunks policy table. need to remove the job separately */
|
||||
bool
|
||||
ts_bgw_policy_compress_chunks_delete_row_only_by_job_id(int32 job_id)
|
||||
{
|
||||
ScanKeyData scankey[1];
|
||||
|
||||
ScanKeyInit(&scankey[0],
|
||||
Anum_bgw_policy_compress_chunks_pkey_job_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(job_id));
|
||||
|
||||
return ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS,
|
||||
BGW_POLICY_COMPRESS_CHUNKS_PKEY,
|
||||
scankey,
|
||||
1,
|
||||
compress_policy_delete_row_tuple_found,
|
||||
RowExclusiveLock,
|
||||
BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME,
|
||||
NULL);
|
||||
}
|
||||
|
||||
BgwPolicyCompressChunks *
|
||||
ts_bgw_policy_compress_chunks_find_by_hypertable(int32 hypertable_id)
|
||||
{
|
||||
ScanKeyData scankey[1];
|
||||
BgwPolicyCompressChunks *ret = NULL;
|
||||
|
||||
ScanKeyInit(&scankey[0],
|
||||
Anum_bgw_policy_compress_chunks_hypertable_id_key_hypertable_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(hypertable_id));
|
||||
|
||||
ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS,
|
||||
BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY,
|
||||
scankey,
|
||||
1,
|
||||
bgw_policy_compress_chunks_tuple_found,
|
||||
RowExclusiveLock,
|
||||
BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME,
|
||||
(void *) &ret);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
ts_bgw_policy_compress_chunks_insert(BgwPolicyCompressChunks *policy)
|
||||
{
|
||||
TupleDesc tupdesc;
|
||||
CatalogSecurityContext sec_ctx;
|
||||
Datum values[Natts_bgw_policy_compress_chunks];
|
||||
bool nulls[Natts_bgw_policy_compress_chunks] = { false };
|
||||
HeapTuple ht_older_than;
|
||||
Catalog *catalog = ts_catalog_get();
|
||||
Relation rel =
|
||||
table_open(catalog_get_table_id(catalog, BGW_POLICY_COMPRESS_CHUNKS), RowExclusiveLock);
|
||||
|
||||
tupdesc = RelationGetDescr(rel);
|
||||
|
||||
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)] =
|
||||
Int32GetDatum(policy->fd.job_id);
|
||||
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)] =
|
||||
Int32GetDatum(policy->fd.hypertable_id);
|
||||
|
||||
ht_older_than = ts_interval_form_heaptuple(&policy->fd.older_than);
|
||||
|
||||
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)] =
|
||||
HeapTupleGetDatum(ht_older_than);
|
||||
|
||||
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
|
||||
ts_catalog_insert_values(rel, tupdesc, values, nulls);
|
||||
ts_catalog_restore_user(&sec_ctx);
|
||||
heap_freetuple(ht_older_than);
|
||||
table_close(rel, RowExclusiveLock);
|
||||
}
|
||||
|
||||
BgwPolicyCompressChunks *
|
||||
ts_bgw_policy_compress_chunks_find_by_job(int32 job_id)
|
||||
{
|
||||
ScanKeyData scankey[1];
|
||||
BgwPolicyCompressChunks *ret = NULL;
|
||||
|
||||
ScanKeyInit(&scankey[0],
|
||||
Anum_bgw_policy_compress_chunks_pkey_job_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(job_id));
|
||||
|
||||
ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS,
|
||||
BGW_POLICY_COMPRESS_CHUNKS_PKEY,
|
||||
scankey,
|
||||
1,
|
||||
bgw_policy_compress_chunks_tuple_found,
|
||||
RowExclusiveLock,
|
||||
BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME,
|
||||
(void *) &ret);
|
||||
|
||||
return ret;
|
||||
}
|
@ -1,23 +0,0 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Apache License 2.0.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-APACHE for a copy of the license.
|
||||
*/
|
||||
|
||||
#ifndef TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H
|
||||
#define TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H
|
||||
|
||||
#include "catalog.h"
|
||||
#include "export.h"
|
||||
|
||||
typedef struct BgwPolicyCompressChunks
|
||||
{
|
||||
FormData_bgw_policy_compress_chunks fd;
|
||||
} BgwPolicyCompressChunks;
|
||||
|
||||
extern TSDLLEXPORT BgwPolicyCompressChunks *ts_bgw_policy_compress_chunks_find_by_job(int32 job_id);
|
||||
extern TSDLLEXPORT BgwPolicyCompressChunks *
|
||||
ts_bgw_policy_compress_chunks_find_by_hypertable(int32 hypertable_id);
|
||||
extern TSDLLEXPORT void ts_bgw_policy_compress_chunks_insert(BgwPolicyCompressChunks *policy);
|
||||
extern TSDLLEXPORT bool ts_bgw_policy_compress_chunks_delete_row_only_by_job_id(int32 job_id);
|
||||
#endif /* TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H */
|
@ -7,7 +7,6 @@
|
||||
#include <postgres.h>
|
||||
|
||||
#include "bgw/job.h"
|
||||
#include "bgw_policy/compress_chunks.h"
|
||||
#include "bgw_policy/drop_chunks.h"
|
||||
#include "policy.h"
|
||||
|
||||
@ -28,11 +27,6 @@ ts_bgw_policy_delete_by_hypertable_id(int32 hypertable_id)
|
||||
if (policy)
|
||||
ts_bgw_job_delete_by_id(((BgwPolicyDropChunks *) policy)->job_id);
|
||||
|
||||
policy = ts_bgw_policy_compress_chunks_find_by_hypertable(hypertable_id);
|
||||
|
||||
if (policy)
|
||||
ts_bgw_job_delete_by_id(((BgwPolicyCompressChunks *) policy)->fd.job_id);
|
||||
|
||||
jobs = ts_bgw_job_find_by_hypertable_id(hypertable_id);
|
||||
foreach (lc, jobs)
|
||||
{
|
||||
|
@ -107,10 +107,6 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
|
||||
.schema_name = CATALOG_SCHEMA_NAME,
|
||||
.table_name = COMPRESSION_CHUNK_SIZE_TABLE_NAME,
|
||||
},
|
||||
[BGW_POLICY_COMPRESS_CHUNKS] = {
|
||||
.schema_name = CONFIG_SCHEMA_NAME,
|
||||
.table_name = BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME,
|
||||
},
|
||||
[REMOTE_TXN] = {
|
||||
.schema_name = CATALOG_SCHEMA_NAME,
|
||||
.table_name = REMOTE_TXN_TABLE_NAME,
|
||||
@ -265,13 +261,6 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
|
||||
[COMPRESSION_CHUNK_SIZE_PKEY] = "compression_chunk_size_pkey",
|
||||
},
|
||||
},
|
||||
[BGW_POLICY_COMPRESS_CHUNKS] = {
|
||||
.length = _MAX_BGW_POLICY_COMPRESS_CHUNKS_INDEX,
|
||||
.names = (char *[]) {
|
||||
[BGW_POLICY_COMPRESS_CHUNKS_PKEY] = "bgw_policy_compress_chunks_pkey",
|
||||
[BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY] = "bgw_policy_compress_chunks_hypertable_id_key",
|
||||
},
|
||||
},
|
||||
[REMOTE_TXN] = {
|
||||
.length = _MAX_REMOTE_TXN_INDEX,
|
||||
.names = (char *[]) {
|
||||
@ -300,7 +289,6 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = {
|
||||
[CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG] = NULL,
|
||||
[HYPERTABLE_COMPRESSION] = NULL,
|
||||
[COMPRESSION_CHUNK_SIZE] = NULL,
|
||||
[BGW_POLICY_COMPRESS_CHUNKS] = NULL,
|
||||
[REMOTE_TXN] = NULL,
|
||||
};
|
||||
|
||||
|
@ -55,7 +55,6 @@ typedef enum CatalogTable
|
||||
CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG,
|
||||
HYPERTABLE_COMPRESSION,
|
||||
COMPRESSION_CHUNK_SIZE,
|
||||
BGW_POLICY_COMPRESS_CHUNKS,
|
||||
REMOTE_TXN,
|
||||
_MAX_CATALOG_TABLES,
|
||||
} CatalogTable;
|
||||
@ -1249,50 +1248,6 @@ typedef enum Anum_compression_chunk_size_pkey
|
||||
|
||||
#define Natts_compression_chunk_size_pkey (_Anum_compression_chunk_size_pkey_max - 1)
|
||||
|
||||
#define BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME "bgw_policy_compress_chunks"
|
||||
typedef enum Anum_bgw_policy_compress_chunks
|
||||
{
|
||||
Anum_bgw_policy_compress_chunks_job_id = 1,
|
||||
Anum_bgw_policy_compress_chunks_hypertable_id,
|
||||
Anum_bgw_policy_compress_chunks_older_than,
|
||||
_Anum_bgw_policy_compress_chunks_max,
|
||||
} Anum_bgw_policy_compress_chunks;
|
||||
|
||||
#define Natts_bgw_policy_compress_chunks (_Anum_bgw_policy_compress_chunks_max - 1)
|
||||
|
||||
typedef struct FormData_bgw_policy_compress_chunks
|
||||
{
|
||||
int32 job_id;
|
||||
int32 hypertable_id;
|
||||
FormData_ts_interval older_than;
|
||||
} FormData_bgw_policy_compress_chunks;
|
||||
|
||||
typedef FormData_bgw_policy_compress_chunks *Form_bgw_policy_compress_chunks;
|
||||
|
||||
enum
|
||||
{
|
||||
BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY = 0,
|
||||
BGW_POLICY_COMPRESS_CHUNKS_PKEY,
|
||||
_MAX_BGW_POLICY_COMPRESS_CHUNKS_INDEX,
|
||||
};
|
||||
|
||||
typedef enum Anum_bgw_policy_compress_chunks_hypertable_id_key
|
||||
{
|
||||
Anum_bgw_policy_compress_chunks_hypertable_id_key_hypertable_id = 1,
|
||||
_Anum_bgw_policy_compress_chunks_hypertable_id_key_max,
|
||||
} Anum_bgw_policy_compress_chunks_hypertable_id_key;
|
||||
|
||||
#define Natts_bgw_policy_compress_chunks_hypertable_id_key \
|
||||
(_Anum_bgw_policy_compress_chunks_hypertable_id_key_max - 1)
|
||||
|
||||
typedef enum Anum_bgw_policy_compress_chunks_pkey
|
||||
{
|
||||
Anum_bgw_policy_compress_chunks_pkey_job_id = 1,
|
||||
_Anum_bgw_policy_compress_chunks_pkey_max,
|
||||
} Anum_bgw_policy_compress_chunks_pkey;
|
||||
|
||||
#define Natts_bgw_policy_compress_chunks_pkey (_Anum_bgw_policy_compress_chunks_pkey_max - 1)
|
||||
|
||||
/*
|
||||
* The maximum number of indexes a catalog table can have.
|
||||
* This needs to be bumped in case of new catalog tables that have more indexes.
|
||||
|
@ -21,14 +21,15 @@
|
||||
}
|
||||
|
||||
/* bgw policy functions */
|
||||
CROSSMODULE_WRAPPER(add_compress_chunks_policy);
|
||||
CROSSMODULE_WRAPPER(add_retention_policy);
|
||||
CROSSMODULE_WRAPPER(remove_retention_policy);
|
||||
CROSSMODULE_WRAPPER(policy_compression_add);
|
||||
CROSSMODULE_WRAPPER(policy_compression_proc);
|
||||
CROSSMODULE_WRAPPER(policy_compression_remove);
|
||||
CROSSMODULE_WRAPPER(policy_reorder_add);
|
||||
CROSSMODULE_WRAPPER(policy_reorder_proc);
|
||||
CROSSMODULE_WRAPPER(policy_reorder_remove);
|
||||
CROSSMODULE_WRAPPER(alter_job_schedule);
|
||||
CROSSMODULE_WRAPPER(remove_compress_chunks_policy);
|
||||
CROSSMODULE_WRAPPER(remove_retention_policy);
|
||||
|
||||
CROSSMODULE_WRAPPER(reorder_chunk);
|
||||
CROSSMODULE_WRAPPER(move_chunk);
|
||||
@ -337,15 +338,16 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
|
||||
.gapfill_timestamptz_time_bucket = error_no_default_fn_pg_community,
|
||||
|
||||
/* bgw policies */
|
||||
.add_compress_chunks_policy = error_no_default_fn_pg_community,
|
||||
.add_retention_policy = error_no_default_fn_pg_community,
|
||||
.policy_compression_add = error_no_default_fn_pg_community,
|
||||
.policy_compression_proc = error_no_default_fn_pg_community,
|
||||
.policy_compression_remove = error_no_default_fn_pg_community,
|
||||
.policy_reorder_add = error_no_default_fn_pg_community,
|
||||
.policy_reorder_proc = error_no_default_fn_pg_community,
|
||||
.policy_reorder_remove = error_no_default_fn_pg_community,
|
||||
.add_retention_policy = error_no_default_fn_pg_community,
|
||||
.remove_retention_policy = error_no_default_fn_pg_community,
|
||||
.alter_job_schedule = error_no_default_fn_pg_community,
|
||||
.bgw_policy_job_execute = bgw_policy_job_execute_default_fn,
|
||||
.remove_compress_chunks_policy = error_no_default_fn_pg_community,
|
||||
.remove_retention_policy = error_no_default_fn_pg_community,
|
||||
|
||||
.move_chunk = error_no_default_fn_pg_enterprise,
|
||||
.reorder_chunk = error_no_default_fn_pg_community,
|
||||
|
@ -49,12 +49,13 @@ typedef struct CrossModuleFunctions
|
||||
bool (*continuous_agg_materialize)(int32 materialization_id, ContinuousAggMatOptions *options);
|
||||
|
||||
PGFunction add_retention_policy;
|
||||
PGFunction remove_retention_policy;
|
||||
PGFunction policy_compression_add;
|
||||
PGFunction policy_compression_proc;
|
||||
PGFunction policy_compression_remove;
|
||||
PGFunction policy_reorder_add;
|
||||
PGFunction policy_reorder_proc;
|
||||
PGFunction policy_reorder_remove;
|
||||
PGFunction add_compress_chunks_policy;
|
||||
PGFunction remove_retention_policy;
|
||||
PGFunction remove_compress_chunks_policy;
|
||||
|
||||
void (*create_upper_paths_hook)(PlannerInfo *, UpperRelationKind, RelOptInfo *, RelOptInfo *,
|
||||
TsRelType input_reltype, Hypertable *ht, void *extra);
|
||||
|
@ -263,7 +263,7 @@ ts_interval_now_func_validate(Oid now_func_oid, Oid open_dim_type)
|
||||
ReleaseSysCache(tuple);
|
||||
}
|
||||
|
||||
static Datum
|
||||
Datum
|
||||
ts_interval_from_now_func_get_datum(int64 interval, Oid time_dim_type, Oid now_func)
|
||||
{
|
||||
Datum now;
|
||||
@ -314,8 +314,8 @@ noarg_integer_now_func_filter(Form_pg_proc form, void *arg)
|
||||
/* maybe this can be exported later if other parts of the code need
|
||||
* to access the integer_now_func
|
||||
*/
|
||||
static Oid
|
||||
get_integer_now_func(Dimension *open_dim)
|
||||
Oid
|
||||
ts_get_integer_now_func(Dimension *open_dim)
|
||||
{
|
||||
Oid rettype;
|
||||
Oid now_func;
|
||||
@ -344,7 +344,7 @@ ts_get_now_internal(Dimension *open_dim)
|
||||
if (IS_INTEGER_TYPE(dim_post_part_type))
|
||||
{
|
||||
Datum now_datum;
|
||||
Oid now_func = get_integer_now_func(open_dim);
|
||||
Oid now_func = ts_get_integer_now_func(open_dim);
|
||||
ts_interval_now_func_validate(now_func, dim_post_part_type);
|
||||
now_datum = OidFunctionCall0(now_func);
|
||||
return ts_time_value_to_internal(now_datum, dim_post_part_type);
|
||||
@ -430,7 +430,7 @@ ts_interval_subtract_from_now(FormData_ts_interval *invl, Dimension *open_dim)
|
||||
}
|
||||
else
|
||||
{
|
||||
Oid now_func = get_integer_now_func(open_dim);
|
||||
Oid now_func = ts_get_integer_now_func(open_dim);
|
||||
ts_interval_now_func_validate(now_func, type_oid);
|
||||
|
||||
if (InvalidOid == now_func)
|
||||
|
@ -24,4 +24,8 @@ TSDLLEXPORT int64 ts_get_now_internal(Dimension *open_dim);
|
||||
TSDLLEXPORT FormData_ts_interval *
|
||||
ts_interval_from_sql_input_internal(Dimension *open_dim, Datum interval, Oid interval_type,
|
||||
const char *parameter_name, const char *caller_name);
|
||||
TSDLLEXPORT Datum ts_interval_from_now_func_get_datum(int64 interval, Oid time_dim_type,
|
||||
Oid now_func);
|
||||
TSDLLEXPORT Oid ts_get_integer_now_func(Dimension *open_dim);
|
||||
|
||||
#endif /* TIMESCALEDB_INTERVAL */
|
||||
|
@ -113,7 +113,7 @@ ts_jsonb_add_pair(JsonbParseState *state, JsonbValue *key, JsonbValue *value)
|
||||
}
|
||||
|
||||
char *
|
||||
ts_jsonb_get_str_field(Jsonb *jsonb, const char *key)
|
||||
ts_jsonb_get_str_field(const Jsonb *jsonb, const char *key)
|
||||
{
|
||||
/*
|
||||
* `jsonb_object_field_text` returns NULL when the field is not found so
|
||||
@ -136,7 +136,7 @@ ts_jsonb_get_str_field(Jsonb *jsonb, const char *key)
|
||||
}
|
||||
|
||||
TimestampTz
|
||||
ts_jsonb_get_time_field(Jsonb *jsonb, const char *key, bool *field_found)
|
||||
ts_jsonb_get_time_field(const Jsonb *jsonb, const char *key, bool *field_found)
|
||||
{
|
||||
Datum time_datum;
|
||||
char *time_str = ts_jsonb_get_str_field(jsonb, key);
|
||||
@ -157,7 +157,7 @@ ts_jsonb_get_time_field(Jsonb *jsonb, const char *key, bool *field_found)
|
||||
}
|
||||
|
||||
int32
|
||||
ts_jsonb_get_int32_field(Jsonb *json, const char *key, bool *field_found)
|
||||
ts_jsonb_get_int32_field(const Jsonb *json, const char *key, bool *field_found)
|
||||
{
|
||||
Datum int_datum;
|
||||
char *int_str = ts_jsonb_get_str_field(json, key);
|
||||
@ -175,7 +175,7 @@ ts_jsonb_get_int32_field(Jsonb *json, const char *key, bool *field_found)
|
||||
}
|
||||
|
||||
int64
|
||||
ts_jsonb_get_int64_field(Jsonb *json, const char *key, bool *field_found)
|
||||
ts_jsonb_get_int64_field(const Jsonb *json, const char *key, bool *field_found)
|
||||
{
|
||||
Datum int_datum;
|
||||
char *int_str = ts_jsonb_get_str_field(json, key);
|
||||
@ -193,7 +193,7 @@ ts_jsonb_get_int64_field(Jsonb *json, const char *key, bool *field_found)
|
||||
}
|
||||
|
||||
Interval *
|
||||
ts_jsonb_get_interval_field(Jsonb *json, const char *key)
|
||||
ts_jsonb_get_interval_field(const Jsonb *json, const char *key)
|
||||
{
|
||||
Datum interval_datum;
|
||||
char *interval_str = ts_jsonb_get_str_field(json, key);
|
||||
@ -201,7 +201,8 @@ ts_jsonb_get_interval_field(Jsonb *json, const char *key)
|
||||
if (interval_str == NULL)
|
||||
return NULL;
|
||||
|
||||
interval_datum = DirectFunctionCall3(interval_in, CStringGetDatum("1 day"), InvalidOid, -1);
|
||||
interval_datum =
|
||||
DirectFunctionCall3(interval_in, CStringGetDatum(interval_str), InvalidOid, -1);
|
||||
|
||||
return DatumGetIntervalP(interval_datum);
|
||||
}
|
||||
|
@ -26,11 +26,13 @@ extern TSDLLEXPORT void ts_jsonb_add_numeric(JsonbParseState *state, const char
|
||||
|
||||
extern void ts_jsonb_add_value(JsonbParseState *state, const char *key, JsonbValue *value);
|
||||
|
||||
extern TSDLLEXPORT char *ts_jsonb_get_str_field(Jsonb *jsonb, const char *key);
|
||||
extern TSDLLEXPORT Interval *ts_jsonb_get_interval_field(Jsonb *jsonb, const char *key);
|
||||
extern TSDLLEXPORT TimestampTz ts_jsonb_get_time_field(Jsonb *jsonb, const char *key,
|
||||
extern TSDLLEXPORT char *ts_jsonb_get_str_field(const Jsonb *jsonb, const char *key);
|
||||
extern TSDLLEXPORT Interval *ts_jsonb_get_interval_field(const Jsonb *jsonb, const char *key);
|
||||
extern TSDLLEXPORT TimestampTz ts_jsonb_get_time_field(const Jsonb *jsonb, const char *key,
|
||||
bool *field_found);
|
||||
extern TSDLLEXPORT int32 ts_jsonb_get_int32_field(Jsonb *json, const char *key, bool *field_found);
|
||||
extern TSDLLEXPORT int64 ts_jsonb_get_int64_field(Jsonb *json, const char *key, bool *field_found);
|
||||
extern TSDLLEXPORT int32 ts_jsonb_get_int32_field(const Jsonb *json, const char *key,
|
||||
bool *field_found);
|
||||
extern TSDLLEXPORT int64 ts_jsonb_get_int64_field(const Jsonb *json, const char *key,
|
||||
bool *field_found);
|
||||
|
||||
#endif /* TIMESCALEDB_JSONB_UTILS_H */
|
||||
|
@ -19,7 +19,7 @@ WHERE oid IN (
|
||||
ORDER BY proname;
|
||||
proname
|
||||
----------------------------------
|
||||
add_compress_chunks_policy
|
||||
add_compression_policy
|
||||
add_data_node
|
||||
add_dimension
|
||||
add_reorder_policy
|
||||
@ -54,7 +54,7 @@ WHERE oid IN (
|
||||
locf
|
||||
move_chunk
|
||||
refresh_continuous_aggregate
|
||||
remove_compress_chunks_policy
|
||||
remove_compression_policy
|
||||
remove_reorder_policy
|
||||
remove_retention_policy
|
||||
reorder_chunk
|
||||
|
@ -1,7 +1,7 @@
|
||||
set(SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/reorder_api.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/drop_chunks_api.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/compress_chunks_api.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/compression_api.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/job.c
|
||||
)
|
||||
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
|
||||
|
@ -1,15 +0,0 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Timescale License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
|
||||
#ifndef TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H
|
||||
#define TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H
|
||||
|
||||
#include <postgres.h>
|
||||
|
||||
/* User-facing API functions */
|
||||
extern Datum compress_chunks_add_policy(PG_FUNCTION_ARGS);
|
||||
extern Datum compress_chunks_remove_policy(PG_FUNCTION_ARGS);
|
||||
#endif /* TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H */
|
@ -9,80 +9,140 @@
|
||||
#include <miscadmin.h>
|
||||
#include <utils/builtins.h>
|
||||
|
||||
#include "bgw_policy/compress_chunks.h"
|
||||
#include "bgw/job.h"
|
||||
#include "compress_chunks_api.h"
|
||||
#include "compression_api.h"
|
||||
#include "errors.h"
|
||||
#include "hypertable.h"
|
||||
#include "hypertable_cache.h"
|
||||
#include "interval.h"
|
||||
#include "license.h"
|
||||
#include "utils.h"
|
||||
#include "jsonb_utils.h"
|
||||
#include "bgw_policy/job.h"
|
||||
|
||||
/*
|
||||
* Default scheduled interval for compress jobs = default chunk length.
|
||||
* If this is non-timestamp based hypertable, then default is 1 day
|
||||
*/
|
||||
#define DEFAULT_SCHEDULE_INTERVAL \
|
||||
DatumGetIntervalP(DirectFunctionCall7(make_interval, \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(1), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Float8GetDatum(0)))
|
||||
DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("1 day"), InvalidOid, -1))
|
||||
|
||||
/* Default max runtime is unlimited for compress chunks */
|
||||
#define DEFAULT_MAX_RUNTIME \
|
||||
DatumGetIntervalP(DirectFunctionCall7(make_interval, \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Float8GetDatum(0)))
|
||||
DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("0"), InvalidOid, -1))
|
||||
|
||||
/* Right now, there is an infinite number of retries for compress_chunks jobs */
|
||||
#define DEFAULT_MAX_RETRIES -1
|
||||
/* Default retry period for reorder_jobs is currently 1 hour */
|
||||
#define DEFAULT_RETRY_PERIOD \
|
||||
DatumGetIntervalP(DirectFunctionCall7(make_interval, \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(0), \
|
||||
Int32GetDatum(1), \
|
||||
Int32GetDatum(0), \
|
||||
Float8GetDatum(0)))
|
||||
DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("1 hour"), InvalidOid, -1))
|
||||
|
||||
#define POLICY_COMPRESSION_PROC_NAME "policy_compression"
|
||||
#define CONFIG_KEY_HYPERTABLE_ID "hypertable_id"
|
||||
#define CONFIG_KEY_OLDER_THAN "older_than"
|
||||
|
||||
int32
|
||||
policy_compression_get_hypertable_id(const Jsonb *config)
|
||||
{
|
||||
bool found;
|
||||
int32 hypertable_id = ts_jsonb_get_int32_field(config, CONFIG_KEY_HYPERTABLE_ID, &found);
|
||||
|
||||
if (!found)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("could not find hypertable_id in config for job")));
|
||||
|
||||
return hypertable_id;
|
||||
}
|
||||
|
||||
int64
|
||||
policy_compression_get_older_than_int(const Jsonb *config)
|
||||
{
|
||||
bool found;
|
||||
int32 hypertable_id = ts_jsonb_get_int64_field(config, CONFIG_KEY_OLDER_THAN, &found);
|
||||
|
||||
if (!found)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("could not find older_than in config for job")));
|
||||
|
||||
return hypertable_id;
|
||||
}
|
||||
|
||||
Interval *
|
||||
policy_compression_get_older_than_interval(const Jsonb *config)
|
||||
{
|
||||
Interval *interval = ts_jsonb_get_interval_field(config, CONFIG_KEY_OLDER_THAN);
|
||||
|
||||
if (interval == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("could not find older_than in config for job")));
|
||||
|
||||
return interval;
|
||||
}
|
||||
|
||||
static bool
|
||||
compression_lag_equal(Oid partitioning_type, Jsonb *config, Oid lag_type, Datum lag_datum)
|
||||
{
|
||||
if (IS_INTEGER_TYPE(partitioning_type))
|
||||
{
|
||||
int64 config_value = policy_compression_get_older_than_int(config);
|
||||
|
||||
switch (lag_type)
|
||||
{
|
||||
case INT2OID:
|
||||
return config_value == DatumGetInt16(lag_datum);
|
||||
case INT4OID:
|
||||
return config_value == DatumGetInt32(lag_datum);
|
||||
case INT8OID:
|
||||
return config_value == DatumGetInt64(lag_datum);
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (lag_type != INTERVALOID)
|
||||
return false;
|
||||
|
||||
Interval *config_value = policy_compression_get_older_than_interval(config);
|
||||
|
||||
return DatumGetBool(
|
||||
DirectFunctionCall2(interval_eq, IntervalPGetDatum(config_value), lag_datum));
|
||||
}
|
||||
}
|
||||
|
||||
Datum
|
||||
compress_chunks_add_policy(PG_FUNCTION_ARGS)
|
||||
policy_compression_proc(PG_FUNCTION_ARGS)
|
||||
{
|
||||
if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1))
|
||||
PG_RETURN_VOID();
|
||||
|
||||
policy_compression_execute(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1));
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
Datum
|
||||
policy_compression_add(PG_FUNCTION_ARGS)
|
||||
{
|
||||
NameData application_name;
|
||||
NameData compress_chunks_name;
|
||||
NameData proc_name, proc_schema, owner;
|
||||
int32 job_id;
|
||||
BgwPolicyCompressChunks *existing;
|
||||
Oid ht_oid = PG_GETARG_OID(0);
|
||||
Datum older_than_datum = PG_GETARG_DATUM(1);
|
||||
Oid older_than_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1);
|
||||
bool if_not_exists = PG_GETARG_BOOL(2);
|
||||
Interval *default_schedule_interval = DEFAULT_SCHEDULE_INTERVAL;
|
||||
|
||||
BgwPolicyCompressChunks policy;
|
||||
Hypertable *hypertable;
|
||||
Cache *hcache;
|
||||
Dimension *dim;
|
||||
FormData_ts_interval *older_than;
|
||||
ts_hypertable_permissions_check(ht_oid, GetUserId());
|
||||
Oid owner_id = ts_hypertable_permissions_check(ht_oid, GetUserId());
|
||||
|
||||
older_than = ts_interval_from_sql_input(ht_oid,
|
||||
older_than_datum,
|
||||
older_than_type,
|
||||
"older_than",
|
||||
"compress_chunks_add_policy");
|
||||
|
||||
/* check if this is a table with compression enabled */
|
||||
hypertable = ts_hypertable_cache_get_cache_and_entry(ht_oid, CACHE_FLAG_NONE, &hcache);
|
||||
if (!TS_HYPERTABLE_HAS_COMPRESSION(hypertable))
|
||||
@ -97,9 +157,14 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS)
|
||||
ts_bgw_job_validate_job_owner(owner_id, JOB_TYPE_COMPRESS_CHUNKS);
|
||||
|
||||
/* Make sure that an existing policy doesn't exist on this hypertable */
|
||||
existing = ts_bgw_policy_compress_chunks_find_by_hypertable(hypertable->fd.id);
|
||||
List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_COMPRESSION_PROC_NAME,
|
||||
INTERNAL_SCHEMA_NAME,
|
||||
hypertable->fd.id);
|
||||
|
||||
if (existing != NULL)
|
||||
dim = hyperspace_get_open_dimension(hypertable->space, 0);
|
||||
Oid partitioning_type = ts_dimension_get_partition_type(dim);
|
||||
|
||||
if (jobs != NIL)
|
||||
{
|
||||
if (!if_not_exists)
|
||||
{
|
||||
@ -109,7 +174,12 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS)
|
||||
errmsg("compress chunks policy already exists for hypertable \"%s\"",
|
||||
get_rel_name(ht_oid))));
|
||||
}
|
||||
if (ts_interval_equal(&existing->fd.older_than, older_than))
|
||||
Assert(list_length(jobs) == 1);
|
||||
BgwJob *existing = linitial(jobs);
|
||||
if (compression_lag_equal(partitioning_type,
|
||||
existing->fd.config,
|
||||
older_than_type,
|
||||
older_than_datum))
|
||||
{
|
||||
/* If all arguments are the same, do nothing */
|
||||
ts_cache_release(hcache);
|
||||
@ -127,7 +197,6 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_INT32(-1);
|
||||
}
|
||||
}
|
||||
dim = hyperspace_get_open_dimension(hypertable->space, 0);
|
||||
|
||||
if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim)))
|
||||
{
|
||||
@ -138,10 +207,41 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS)
|
||||
/* insert a new job into jobs table */
|
||||
namestrcpy(&application_name, "Compress Chunks Background Job");
|
||||
namestrcpy(&compress_chunks_name, "compress_chunks");
|
||||
namestrcpy(&proc_name, "");
|
||||
namestrcpy(&proc_schema, "");
|
||||
namestrcpy(&proc_name, POLICY_COMPRESSION_PROC_NAME);
|
||||
namestrcpy(&proc_schema, INTERNAL_SCHEMA_NAME);
|
||||
namestrcpy(&owner, GetUserNameFromId(owner_id, false));
|
||||
|
||||
JsonbParseState *parse_state = NULL;
|
||||
|
||||
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
|
||||
ts_jsonb_add_int32(parse_state, CONFIG_KEY_HYPERTABLE_ID, hypertable->fd.id);
|
||||
|
||||
switch (older_than_type)
|
||||
{
|
||||
case INTERVALOID:
|
||||
ts_jsonb_add_interval(parse_state,
|
||||
CONFIG_KEY_OLDER_THAN,
|
||||
DatumGetIntervalP(older_than_datum));
|
||||
break;
|
||||
case INT2OID:
|
||||
ts_jsonb_add_int64(parse_state, CONFIG_KEY_OLDER_THAN, DatumGetInt16(older_than_datum));
|
||||
break;
|
||||
case INT4OID:
|
||||
ts_jsonb_add_int64(parse_state, CONFIG_KEY_OLDER_THAN, DatumGetInt32(older_than_datum));
|
||||
break;
|
||||
case INT8OID:
|
||||
ts_jsonb_add_int64(parse_state, CONFIG_KEY_OLDER_THAN, DatumGetInt64(older_than_datum));
|
||||
break;
|
||||
default:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("unsupported datatype for older_than: %s",
|
||||
format_type_be(older_than_type))));
|
||||
}
|
||||
|
||||
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
|
||||
Jsonb *config = JsonbValueToJsonb(result);
|
||||
|
||||
job_id = ts_bgw_job_insert_relation(&application_name,
|
||||
&compress_chunks_name,
|
||||
default_schedule_interval,
|
||||
@ -153,34 +253,26 @@ compress_chunks_add_policy(PG_FUNCTION_ARGS)
|
||||
&owner,
|
||||
true,
|
||||
hypertable->fd.id,
|
||||
NULL);
|
||||
config);
|
||||
|
||||
policy = (BgwPolicyCompressChunks){ .fd = {
|
||||
.job_id = job_id,
|
||||
.hypertable_id = ts_hypertable_relid_to_id(ht_oid),
|
||||
.older_than = *older_than,
|
||||
} };
|
||||
|
||||
/* Now, insert a new row in the compress_chunks args table */
|
||||
ts_bgw_policy_compress_chunks_insert(&policy);
|
||||
ts_cache_release(hcache);
|
||||
|
||||
PG_RETURN_INT32(job_id);
|
||||
}
|
||||
|
||||
Datum
|
||||
compress_chunks_remove_policy(PG_FUNCTION_ARGS)
|
||||
policy_compression_remove(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid hypertable_oid = PG_GETARG_OID(0);
|
||||
bool if_exists = PG_GETARG_BOOL(1);
|
||||
|
||||
/* Remove the job, then remove the policy */
|
||||
int ht_id = ts_hypertable_relid_to_id(hypertable_oid);
|
||||
BgwPolicyCompressChunks *policy = ts_bgw_policy_compress_chunks_find_by_hypertable(ht_id);
|
||||
|
||||
ts_hypertable_permissions_check(hypertable_oid, GetUserId());
|
||||
|
||||
if (policy == NULL)
|
||||
List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_COMPRESSION_PROC_NAME,
|
||||
INTERNAL_SCHEMA_NAME,
|
||||
ht_id);
|
||||
if (jobs == NIL)
|
||||
{
|
||||
if (!if_exists)
|
||||
ereport(ERROR,
|
||||
@ -195,7 +287,12 @@ compress_chunks_remove_policy(PG_FUNCTION_ARGS)
|
||||
}
|
||||
}
|
||||
|
||||
ts_bgw_job_delete_by_id(policy->fd.job_id);
|
||||
ts_hypertable_permissions_check(hypertable_oid, GetUserId());
|
||||
|
||||
Assert(list_length(jobs) == 1);
|
||||
BgwJob *job = linitial(jobs);
|
||||
|
||||
ts_bgw_job_delete_by_id(job->fd.id);
|
||||
|
||||
PG_RETURN_BOOL(true);
|
||||
}
|
23
tsl/src/bgw_policy/compression_api.h
Normal file
23
tsl/src/bgw_policy/compression_api.h
Normal file
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Timescale License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
|
||||
#ifndef TIMESCALEDB_TSL_BGW_POLICY_COMPRESSION_API_H
|
||||
#define TIMESCALEDB_TSL_BGW_POLICY_COMPRESSION_API_H
|
||||
|
||||
#include <postgres.h>
|
||||
#include <utils/jsonb.h>
|
||||
#include <utils/timestamp.h>
|
||||
|
||||
/* User-facing API functions */
|
||||
extern Datum policy_compression_add(PG_FUNCTION_ARGS);
|
||||
extern Datum policy_compression_remove(PG_FUNCTION_ARGS);
|
||||
extern Datum policy_compression_proc(PG_FUNCTION_ARGS);
|
||||
|
||||
int32 policy_compression_get_hypertable_id(const Jsonb *config);
|
||||
int64 policy_compression_get_older_than_int(const Jsonb *config);
|
||||
Interval *policy_compression_get_older_than_interval(const Jsonb *config);
|
||||
|
||||
#endif /* TIMESCALEDB_TSL_BGW_POLICY_COMPRESSION_API_H */
|
@ -25,8 +25,8 @@
|
||||
#include "bgw/job_stat.h"
|
||||
#include "bgw_policy/chunk_stats.h"
|
||||
#include "bgw_policy/drop_chunks.h"
|
||||
#include "bgw_policy/compress_chunks.h"
|
||||
#include "bgw_policy/reorder_api.h"
|
||||
#include "bgw_policy/compression_api.h"
|
||||
#include "compression/compress_utils.h"
|
||||
#include "continuous_aggs/materialize.h"
|
||||
#include "continuous_aggs/job.h"
|
||||
@ -89,14 +89,65 @@ get_chunk_id_to_reorder(int32 job_id, Hypertable *ht)
|
||||
-1);
|
||||
}
|
||||
|
||||
static int64
|
||||
get_compression_window_end_value(Dimension *dim, const Jsonb *config)
|
||||
{
|
||||
Oid partitioning_type = ts_dimension_get_partition_type(dim);
|
||||
|
||||
if (IS_INTEGER_TYPE(partitioning_type))
|
||||
{
|
||||
int64 lag = policy_compression_get_older_than_int(config);
|
||||
Oid now_func = ts_get_integer_now_func(dim);
|
||||
|
||||
if (InvalidOid == now_func)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("integer_now function must be set")));
|
||||
|
||||
return ts_time_value_to_internal(ts_interval_from_now_func_get_datum(lag,
|
||||
partitioning_type,
|
||||
now_func),
|
||||
partitioning_type);
|
||||
}
|
||||
else
|
||||
{
|
||||
Datum res = TimestampTzGetDatum(GetCurrentTimestamp());
|
||||
Interval *lag = policy_compression_get_older_than_interval(config);
|
||||
|
||||
switch (partitioning_type)
|
||||
{
|
||||
case TIMESTAMPOID:
|
||||
res = DirectFunctionCall1(timestamptz_timestamp, res);
|
||||
res = DirectFunctionCall2(timestamp_mi_interval, res, IntervalPGetDatum(lag));
|
||||
|
||||
return ts_time_value_to_internal(res, partitioning_type);
|
||||
case TIMESTAMPTZOID:
|
||||
res = DirectFunctionCall2(timestamptz_mi_interval, res, IntervalPGetDatum(lag));
|
||||
|
||||
return ts_time_value_to_internal(res, partitioning_type);
|
||||
case DATEOID:
|
||||
res = DirectFunctionCall1(timestamptz_timestamp, res);
|
||||
res = DirectFunctionCall2(timestamp_mi_interval, res, IntervalPGetDatum(lag));
|
||||
res = DirectFunctionCall1(timestamp_date, res);
|
||||
|
||||
return ts_time_value_to_internal(res, partitioning_type);
|
||||
default:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("unknown time type OID %d", partitioning_type)));
|
||||
pg_unreachable();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32
|
||||
get_chunk_to_compress(Hypertable *ht, FormData_ts_interval *older_than)
|
||||
get_chunk_to_compress(Hypertable *ht, const Jsonb *config)
|
||||
{
|
||||
Dimension *open_dim = hyperspace_get_open_dimension(ht->space, 0);
|
||||
StrategyNumber end_strategy = BTLessStrategyNumber;
|
||||
Oid partitioning_type = ts_dimension_get_partition_type(open_dim);
|
||||
int64 end_value = ts_time_value_to_internal(ts_interval_subtract_from_now(older_than, open_dim),
|
||||
partitioning_type);
|
||||
|
||||
int64 end_value = get_compression_window_end_value(open_dim, config);
|
||||
|
||||
return ts_dimension_slice_get_chunkid_to_compress(open_dim->fd.id,
|
||||
InvalidStrategy, /*start_strategy*/
|
||||
-1, /*start_value*/
|
||||
@ -105,7 +156,7 @@ get_chunk_to_compress(Hypertable *ht, FormData_ts_interval *older_than)
|
||||
}
|
||||
|
||||
bool
|
||||
execute_reorder_policy(int32 job_id, Jsonb *config, reorder_func reorder, bool fast_continue)
|
||||
policy_reorder_execute(int32 job_id, Jsonb *config, reorder_func reorder, bool fast_continue)
|
||||
{
|
||||
int chunk_id;
|
||||
bool started = false;
|
||||
@ -156,8 +207,10 @@ execute_reorder_policy(int32 job_id, Jsonb *config, reorder_func reorder, bool f
|
||||
|
||||
if (fast_continue && get_chunk_id_to_reorder(job_id, ht) != -1)
|
||||
{
|
||||
BgwJob *job = ts_bgw_job_find(job_id, CurrentMemoryContext, true);
|
||||
enable_fast_restart(job, "reorder");
|
||||
BgwJob *job = ts_bgw_job_find(job_id, CurrentMemoryContext, false);
|
||||
|
||||
if (job != NULL)
|
||||
enable_fast_restart(job, "reorder");
|
||||
}
|
||||
|
||||
commit:
|
||||
@ -289,16 +342,14 @@ execute_materialize_continuous_aggregate(BgwJob *job)
|
||||
}
|
||||
|
||||
bool
|
||||
execute_compress_chunks_policy(BgwJob *job)
|
||||
policy_compression_execute(int32 job_id, Jsonb *config)
|
||||
{
|
||||
bool started = false;
|
||||
BgwPolicyCompressChunks *args;
|
||||
Oid table_relid;
|
||||
Hypertable *ht;
|
||||
Cache *hcache;
|
||||
int32 chunkid;
|
||||
Chunk *chunk = NULL;
|
||||
int job_id = job->fd.id;
|
||||
|
||||
if (!IsTransactionOrTransactionBlock())
|
||||
{
|
||||
@ -307,19 +358,10 @@ execute_compress_chunks_policy(BgwJob *job)
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
}
|
||||
|
||||
/* Get the arguments from the compress_chunks_policy table */
|
||||
args = ts_bgw_policy_compress_chunks_find_by_job(job_id);
|
||||
|
||||
if (args == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_TS_INTERNAL_ERROR),
|
||||
errmsg("could not run compress_chunks policy #%d because no args in policy table",
|
||||
job_id)));
|
||||
|
||||
table_relid = ts_hypertable_id_to_relid(args->fd.hypertable_id);
|
||||
table_relid = ts_hypertable_id_to_relid(policy_compression_get_hypertable_id(config));
|
||||
ht = ts_hypertable_cache_get_cache_and_entry(table_relid, CACHE_FLAG_NONE, &hcache);
|
||||
|
||||
chunkid = get_chunk_to_compress(ht, &args->fd.older_than);
|
||||
chunkid = get_chunk_to_compress(ht, config);
|
||||
if (chunkid == INVALID_CHUNK_ID)
|
||||
{
|
||||
elog(NOTICE,
|
||||
@ -338,9 +380,14 @@ execute_compress_chunks_policy(BgwJob *job)
|
||||
NameStr(chunk->fd.table_name));
|
||||
}
|
||||
|
||||
chunkid = get_chunk_to_compress(ht, &args->fd.older_than);
|
||||
chunkid = get_chunk_to_compress(ht, config);
|
||||
if (chunkid != INVALID_CHUNK_ID)
|
||||
enable_fast_restart(job, "compress_chunks");
|
||||
{
|
||||
BgwJob *job = ts_bgw_job_find(job_id, CurrentMemoryContext, false);
|
||||
|
||||
if (job != NULL)
|
||||
enable_fast_restart(job, "compression");
|
||||
}
|
||||
|
||||
ts_cache_release(hcache);
|
||||
if (started)
|
||||
@ -479,9 +526,8 @@ tsl_bgw_policy_job_execute(BgwJob *job)
|
||||
return execute_drop_chunks_policy(job->fd.id);
|
||||
case JOB_TYPE_CONTINUOUS_AGGREGATE:
|
||||
return execute_materialize_continuous_aggregate(job);
|
||||
case JOB_TYPE_COMPRESS_CHUNKS:
|
||||
return execute_compress_chunks_policy(job);
|
||||
|
||||
case JOB_TYPE_COMPRESS_CHUNKS:
|
||||
case JOB_TYPE_REORDER:
|
||||
case JOB_TYPE_CUSTOM:
|
||||
return job_execute(job);
|
||||
|
@ -18,10 +18,10 @@ typedef void (*reorder_func)(Oid tableOid, Oid indexOid, bool verbose, Oid wait_
|
||||
Oid destination_tablespace, Oid index_tablespace);
|
||||
|
||||
/* Functions exposed only for testing */
|
||||
extern bool execute_reorder_policy(int32 job_id, Jsonb *config, reorder_func reorder,
|
||||
extern bool policy_reorder_execute(int32 job_id, Jsonb *config, reorder_func reorder,
|
||||
bool fast_continue);
|
||||
extern bool execute_drop_chunks_policy(int32 job_id);
|
||||
extern bool execute_compress_chunks_policy(BgwJob *job);
|
||||
extern bool policy_compression_execute(int32 job_id, Jsonb *config);
|
||||
extern bool tsl_bgw_policy_job_execute(BgwJob *job);
|
||||
extern Datum bgw_policy_alter_job_schedule(PG_FUNCTION_ARGS);
|
||||
|
||||
|
@ -52,7 +52,7 @@
|
||||
#define POLICY_REORDER_PROC_NAME "policy_reorder"
|
||||
|
||||
int32
|
||||
policy_reorder_get_hypertable_id(Jsonb *config)
|
||||
policy_reorder_get_hypertable_id(const Jsonb *config)
|
||||
{
|
||||
bool found;
|
||||
int32 hypertable_id = ts_jsonb_get_int32_field(config, CONFIG_KEY_HYPERTABLE_ID, &found);
|
||||
@ -66,7 +66,7 @@ policy_reorder_get_hypertable_id(Jsonb *config)
|
||||
}
|
||||
|
||||
char *
|
||||
policy_reorder_get_index_name(Jsonb *config)
|
||||
policy_reorder_get_index_name(const Jsonb *config)
|
||||
{
|
||||
char *index_name = NULL;
|
||||
|
||||
@ -108,10 +108,10 @@ check_valid_index(Hypertable *ht, Name index_name)
|
||||
Datum
|
||||
policy_reorder_proc(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 job_id = PG_GETARG_INT32(0);
|
||||
Jsonb *config = PG_GETARG_JSONB_P(1);
|
||||
if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1))
|
||||
PG_RETURN_VOID();
|
||||
|
||||
execute_reorder_policy(job_id, config, reorder_chunk, true);
|
||||
policy_reorder_execute(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1), reorder_chunk, true);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
@ -235,7 +235,6 @@ policy_reorder_remove(PG_FUNCTION_ARGS)
|
||||
Oid hypertable_oid = PG_GETARG_OID(0);
|
||||
bool if_exists = PG_GETARG_BOOL(1);
|
||||
|
||||
/* Remove the job, then remove the policy */
|
||||
int ht_id = ts_hypertable_relid_to_id(hypertable_oid);
|
||||
|
||||
List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REORDER_PROC_NAME,
|
||||
|
@ -14,7 +14,7 @@ extern Datum policy_reorder_add(PG_FUNCTION_ARGS);
|
||||
extern Datum policy_reorder_remove(PG_FUNCTION_ARGS);
|
||||
extern Datum policy_reorder_proc(PG_FUNCTION_ARGS);
|
||||
|
||||
extern int32 policy_reorder_get_hypertable_id(Jsonb *config);
|
||||
extern char *policy_reorder_get_index_name(Jsonb *config);
|
||||
extern int32 policy_reorder_get_hypertable_id(const Jsonb *config);
|
||||
extern char *policy_reorder_get_index_name(const Jsonb *config);
|
||||
|
||||
#endif /* TIMESCALEDB_TSL_BGW_POLICY_REORDER_API_H */
|
||||
|
@ -42,7 +42,6 @@
|
||||
#include "license.h"
|
||||
#include "trigger.h"
|
||||
#include "utils.h"
|
||||
#include "bgw_policy/compress_chunks.h"
|
||||
|
||||
/* entrypoint
|
||||
* tsl_process_compress_table : is the entry point.
|
||||
@ -815,13 +814,10 @@ static void
|
||||
check_modify_compression_options(Hypertable *ht, WithClauseResult *with_clause_options)
|
||||
{
|
||||
bool compress_enable = DatumGetBool(with_clause_options[CompressEnabled].parsed);
|
||||
bool compression_has_policy;
|
||||
bool compressed_chunks_exist;
|
||||
bool compression_already_enabled = TS_HYPERTABLE_HAS_COMPRESSION(ht);
|
||||
compressed_chunks_exist =
|
||||
compression_already_enabled && ts_chunk_exists_with_compression(ht->fd.id);
|
||||
compression_has_policy =
|
||||
compression_already_enabled && ts_bgw_policy_compress_chunks_find_by_hypertable(ht->fd.id);
|
||||
|
||||
if (compressed_chunks_exist)
|
||||
ereport(ERROR,
|
||||
@ -829,12 +825,6 @@ check_modify_compression_options(Hypertable *ht, WithClauseResult *with_clause_o
|
||||
errmsg("cannot change compression options as compressed chunks already exist for "
|
||||
"this table")));
|
||||
|
||||
if (compression_has_policy)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot change compression options as a compression policy exists on the "
|
||||
"table")));
|
||||
|
||||
/* Require both order by and segment by when altering if they were previously set because
|
||||
* otherwise it's not clear what the default value means: does it mean leave as-is or is it an
|
||||
* empty list. */
|
||||
|
@ -6,7 +6,7 @@
|
||||
#include <postgres.h>
|
||||
#include <fmgr.h>
|
||||
|
||||
#include "bgw_policy/compress_chunks_api.h"
|
||||
#include "bgw_policy/compression_api.h"
|
||||
#include "bgw_policy/drop_chunks_api.h"
|
||||
#include "bgw_policy/job.h"
|
||||
#include "bgw_policy/reorder_api.h"
|
||||
@ -93,15 +93,18 @@ CrossModuleFunctions tsl_cm_functions = {
|
||||
.bgw_policy_job_execute = tsl_bgw_policy_job_execute,
|
||||
.continuous_agg_materialize = continuous_agg_materialize,
|
||||
.add_retention_policy = drop_chunks_add_policy,
|
||||
.remove_retention_policy = drop_chunks_remove_policy,
|
||||
.policy_compression_add = policy_compression_add,
|
||||
.policy_compression_proc = policy_compression_proc,
|
||||
.policy_compression_remove = policy_compression_remove,
|
||||
.policy_reorder_add = policy_reorder_add,
|
||||
.policy_reorder_proc = policy_reorder_proc,
|
||||
.policy_reorder_remove = policy_reorder_remove,
|
||||
.add_compress_chunks_policy = compress_chunks_add_policy,
|
||||
.remove_retention_policy = drop_chunks_remove_policy,
|
||||
.remove_compress_chunks_policy = compress_chunks_remove_policy,
|
||||
.create_upper_paths_hook = tsl_create_upper_paths_hook,
|
||||
.set_rel_pathlist_dml = tsl_set_rel_pathlist_dml,
|
||||
.set_rel_pathlist_query = tsl_set_rel_pathlist_query,
|
||||
|
||||
/* gapfill */
|
||||
.gapfill_marker = gapfill_marker,
|
||||
.gapfill_int16_time_bucket = gapfill_int16_time_bucket,
|
||||
.gapfill_int32_time_bucket = gapfill_int32_time_bucket,
|
||||
@ -109,6 +112,7 @@ CrossModuleFunctions tsl_cm_functions = {
|
||||
.gapfill_date_time_bucket = gapfill_date_time_bucket,
|
||||
.gapfill_timestamp_time_bucket = gapfill_timestamp_time_bucket,
|
||||
.gapfill_timestamptz_time_bucket = gapfill_timestamptz_time_bucket,
|
||||
|
||||
.alter_job_schedule = bgw_policy_alter_job_schedule,
|
||||
.reorder_chunk = tsl_reorder_chunk,
|
||||
.move_chunk = tsl_move_chunk,
|
||||
|
@ -31,7 +31,7 @@ select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days'::
|
||||
--TEST 1--
|
||||
--cannot set policy without enabling compression --
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compress_chunks_policy('conditions', '60d'::interval);
|
||||
select add_compression_policy('conditions', '60d'::interval);
|
||||
ERROR: can add compress_chunks policy only on hypertables with compression enabled
|
||||
\set ON_ERROR_STOP 1
|
||||
-- TEST2 --
|
||||
@ -40,23 +40,12 @@ alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby
|
||||
NOTICE: adding index _compressed_hypertable_2_location__ts_meta_sequence_num_idx ON _timescaledb_internal._compressed_hypertable_2 USING BTREE(location, _ts_meta_sequence_num)
|
||||
insert into conditions
|
||||
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75;
|
||||
select add_compress_chunks_policy('conditions', '60d'::interval);
|
||||
add_compress_chunks_policy
|
||||
----------------------------
|
||||
1000
|
||||
(1 row)
|
||||
|
||||
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks;
|
||||
compressjob_id | hypertable_id | older_than
|
||||
----------------+---------------+------------------
|
||||
1000 | 1 | (t,"@ 60 days",)
|
||||
(1 row)
|
||||
|
||||
select add_compression_policy('conditions', '60d'::interval) AS compressjob_id
|
||||
\gset
|
||||
select * from _timescaledb_config.bgw_job where job_type like 'compress%';
|
||||
id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config
|
||||
------+--------------------------------+-----------------+--------------------+-------------+-------------+--------------+-----------+-------------+-------------------+-----------+---------------+--------
|
||||
1000 | Compress Chunks Background Job | compress_chunks | @ 15 days 12 hours | @ 0 | -1 | @ 1 hour | | | default_perm_user | t | 1 |
|
||||
select * from _timescaledb_config.bgw_job where id = :compressjob_id;
|
||||
id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config
|
||||
------+--------------------------------+-----------------+--------------------+-------------+-------------+--------------+--------------------+-----------------------+-------------------+-----------+---------------+-------------------------------------------------
|
||||
1000 | Compress Chunks Background Job | compress_chunks | @ 15 days 12 hours | @ 0 | -1 | @ 1 hour | policy_compression | _timescaledb_internal | default_perm_user | t | 1 | {"older_than": "@ 60 days", "hypertable_id": 1}
|
||||
(1 row)
|
||||
|
||||
select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s');
|
||||
@ -66,9 +55,9 @@ select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s');
|
||||
(1 row)
|
||||
|
||||
select * from _timescaledb_config.bgw_job where job_type like 'compress%';
|
||||
id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config
|
||||
------+--------------------------------+-----------------+-------------------+-------------+-------------+--------------+-----------+-------------+-------------------+-----------+---------------+--------
|
||||
1000 | Compress Chunks Background Job | compress_chunks | @ 1 sec | @ 0 | -1 | @ 1 hour | | | default_perm_user | t | 1 |
|
||||
id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config
|
||||
------+--------------------------------+-----------------+-------------------+-------------+-------------+--------------+--------------------+-----------------------+-------------------+-----------+---------------+-------------------------------------------------
|
||||
1000 | Compress Chunks Background Job | compress_chunks | @ 1 sec | @ 0 | -1 | @ 1 hour | policy_compression | _timescaledb_internal | default_perm_user | t | 1 | {"older_than": "@ 60 days", "hypertable_id": 1}
|
||||
(1 row)
|
||||
|
||||
insert into conditions
|
||||
@ -90,35 +79,36 @@ select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_b
|
||||
-- TEST 4 --
|
||||
--cannot set another policy
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compress_chunks_policy('conditions', '60d'::interval, if_not_exists=>true);
|
||||
select add_compression_policy('conditions', '60d'::interval, if_not_exists=>true);
|
||||
NOTICE: compress chunks policy already exists on hypertable "conditions", skipping
|
||||
add_compress_chunks_policy
|
||||
----------------------------
|
||||
-1
|
||||
add_compression_policy
|
||||
------------------------
|
||||
-1
|
||||
(1 row)
|
||||
|
||||
select add_compress_chunks_policy('conditions', '60d'::interval);
|
||||
select add_compression_policy('conditions', '60d'::interval);
|
||||
ERROR: compress chunks policy already exists for hypertable "conditions"
|
||||
select add_compress_chunks_policy('conditions', '30d'::interval, if_not_exists=>true);
|
||||
select add_compression_policy('conditions', '30d'::interval, if_not_exists=>true);
|
||||
WARNING: could not add compress_chunks policy due to existing policy on hypertable with different arguments
|
||||
add_compress_chunks_policy
|
||||
----------------------------
|
||||
-1
|
||||
add_compression_policy
|
||||
------------------------
|
||||
-1
|
||||
(1 row)
|
||||
|
||||
\set ON_ERROR_STOP 1
|
||||
--TEST 5 --
|
||||
-- drop the policy --
|
||||
select remove_compress_chunks_policy('conditions');
|
||||
remove_compress_chunks_policy
|
||||
-------------------------------
|
||||
select remove_compression_policy('conditions');
|
||||
remove_compression_policy
|
||||
---------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks;
|
||||
compressjob_id | hypertable_id | older_than
|
||||
----------------+---------------+------------
|
||||
(0 rows)
|
||||
select count(*) from _timescaledb_config.bgw_job WHERE id>=1000;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
--TEST 6 --
|
||||
-- try to execute the policy after it has been dropped --
|
||||
@ -148,17 +138,12 @@ select set_integer_now_func('test_table_int', 'dummy_now');
|
||||
|
||||
insert into test_table_int select generate_series(1,5), 10;
|
||||
alter table test_table_int set (timescaledb.compress);
|
||||
select add_compress_chunks_policy('test_table_int', 2::int);
|
||||
add_compress_chunks_policy
|
||||
----------------------------
|
||||
1001
|
||||
(1 row)
|
||||
|
||||
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks
|
||||
where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name like 'test_table_int');
|
||||
compressjob_id | hypertable_id | older_than
|
||||
----------------+---------------+------------
|
||||
1001 | 3 | (f,,2)
|
||||
select add_compression_policy('test_table_int', 2::int) AS compressjob_id
|
||||
\gset
|
||||
select * from _timescaledb_config.bgw_job where id=:compressjob_id;
|
||||
id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period | proc_name | proc_schema | owner | scheduled | hypertable_id | config
|
||||
------+--------------------------------+-----------------+-------------------+-------------+-------------+--------------+--------------------+-----------------------+-------------------+-----------+---------------+---------------------------------------
|
||||
1001 | Compress Chunks Background Job | compress_chunks | @ 1 day | @ 0 | -1 | @ 1 hour | policy_compression | _timescaledb_internal | default_perm_user | t | 3 | {"older_than": 2, "hypertable_id": 3}
|
||||
(1 row)
|
||||
|
||||
\gset
|
||||
@ -203,7 +188,7 @@ SELECT set_integer_now_func('test_table_nologin', 'dummy_now');
|
||||
|
||||
ALTER TABLE test_table_nologin set (timescaledb.compress);
|
||||
\set ON_ERROR_STOP 0
|
||||
SELECT add_compress_chunks_policy('test_table_nologin', 2::int);
|
||||
SELECT add_compression_policy('test_table_nologin', 2::int);
|
||||
ERROR: permission denied to start compress_chunks background process as role "nologin_role"
|
||||
\set ON_ERROR_STOP 1
|
||||
RESET ROLE;
|
||||
@ -260,8 +245,8 @@ SELECT COUNT(*) AS dropped_chunks_count
|
||||
14
|
||||
(1 row)
|
||||
|
||||
SELECT add_compress_chunks_policy AS job_id
|
||||
FROM add_compress_chunks_policy('conditions', INTERVAL '1 day') \gset
|
||||
SELECT add_compression_policy AS job_id
|
||||
FROM add_compression_policy('conditions', INTERVAL '1 day') \gset
|
||||
SELECT test_compress_chunks_policy(:job_id);
|
||||
test_compress_chunks_policy
|
||||
-----------------------------
|
||||
|
@ -452,13 +452,13 @@ SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;
|
||||
(1 row)
|
||||
|
||||
--add policy to make sure it's dropped later
|
||||
select add_compress_chunks_policy(:'UNCOMPRESSED_HYPER_NAME', interval '1 day');
|
||||
add_compress_chunks_policy
|
||||
----------------------------
|
||||
1000
|
||||
select add_compression_policy(:'UNCOMPRESSED_HYPER_NAME', interval '1 day');
|
||||
add_compression_policy
|
||||
------------------------
|
||||
1000
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM _timescaledb_config.bgw_policy_compress_chunks;
|
||||
SELECT count(*) FROM _timescaledb_config.bgw_job WHERE job_type = 'compress_chunks';
|
||||
count
|
||||
-------
|
||||
1
|
||||
@ -479,7 +479,7 @@ SELECT count(*) FROM _timescaledb_catalog.hypertable_compression;
|
||||
(1 row)
|
||||
|
||||
--verify that the policy is gone
|
||||
SELECT count(*) FROM _timescaledb_config.bgw_policy_compress_chunks;
|
||||
SELECT count(*) FROM _timescaledb_config.bgw_job WHERE job_type = 'compress_chunks';
|
||||
count
|
||||
-------
|
||||
0
|
||||
@ -560,19 +560,18 @@ AS sub;
|
||||
1
|
||||
(1 row)
|
||||
|
||||
select add_compress_chunks_policy('test1', interval '1 day');
|
||||
add_compress_chunks_policy
|
||||
----------------------------
|
||||
1002
|
||||
select add_compression_policy('test1', interval '1 day');
|
||||
add_compression_policy
|
||||
------------------------
|
||||
1002
|
||||
(1 row)
|
||||
|
||||
\set ON_ERROR_STOP 0
|
||||
ALTER table test1 set (timescaledb.compress='f');
|
||||
ERROR: cannot change compression options as a compression policy exists on the table
|
||||
\set ON_ERROR_STOP 1
|
||||
select remove_compress_chunks_policy('test1');
|
||||
remove_compress_chunks_policy
|
||||
-------------------------------
|
||||
select remove_compression_policy('test1');
|
||||
remove_compression_policy
|
||||
---------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
|
@ -39,18 +39,18 @@ ERROR: must be owner of hypertable "conditions"
|
||||
select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name)
|
||||
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'conditions';
|
||||
ERROR: must be owner of hypertable "conditions"
|
||||
select add_compress_chunks_policy('conditions', '1day'::interval);
|
||||
select add_compression_policy('conditions', '1day'::interval);
|
||||
ERROR: must be owner of hypertable "conditions"
|
||||
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
|
||||
select add_compress_chunks_policy('conditions', '1day'::interval);
|
||||
add_compress_chunks_policy
|
||||
----------------------------
|
||||
1000
|
||||
select add_compression_policy('conditions', '1day'::interval);
|
||||
add_compression_policy
|
||||
------------------------
|
||||
1000
|
||||
(1 row)
|
||||
|
||||
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2
|
||||
--try dropping policy
|
||||
select remove_compress_chunks_policy('conditions', true);
|
||||
select remove_compression_policy('conditions', true);
|
||||
ERROR: must be owner of hypertable "conditions"
|
||||
--Tests for GRANTS.
|
||||
-- as owner grant select , compress chunk and check SELECT works
|
||||
|
@ -27,7 +27,7 @@ select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days'::
|
||||
--TEST 1--
|
||||
--cannot set policy without enabling compression --
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compress_chunks_policy('conditions', '60d'::interval);
|
||||
select add_compression_policy('conditions', '60d'::interval);
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
-- TEST2 --
|
||||
@ -36,10 +36,10 @@ alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby
|
||||
insert into conditions
|
||||
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75;
|
||||
|
||||
select add_compress_chunks_policy('conditions', '60d'::interval);
|
||||
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks;
|
||||
select add_compression_policy('conditions', '60d'::interval) AS compressjob_id
|
||||
\gset
|
||||
select * from _timescaledb_config.bgw_job where job_type like 'compress%';
|
||||
|
||||
select * from _timescaledb_config.bgw_job where id = :compressjob_id;
|
||||
select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s');
|
||||
select * from _timescaledb_config.bgw_job where job_type like 'compress%';
|
||||
insert into conditions
|
||||
@ -53,15 +53,15 @@ select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_b
|
||||
-- TEST 4 --
|
||||
--cannot set another policy
|
||||
\set ON_ERROR_STOP 0
|
||||
select add_compress_chunks_policy('conditions', '60d'::interval, if_not_exists=>true);
|
||||
select add_compress_chunks_policy('conditions', '60d'::interval);
|
||||
select add_compress_chunks_policy('conditions', '30d'::interval, if_not_exists=>true);
|
||||
select add_compression_policy('conditions', '60d'::interval, if_not_exists=>true);
|
||||
select add_compression_policy('conditions', '60d'::interval);
|
||||
select add_compression_policy('conditions', '30d'::interval, if_not_exists=>true);
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
--TEST 5 --
|
||||
-- drop the policy --
|
||||
select remove_compress_chunks_policy('conditions');
|
||||
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks;
|
||||
select remove_compression_policy('conditions');
|
||||
select count(*) from _timescaledb_config.bgw_job WHERE id>=1000;
|
||||
|
||||
--TEST 6 --
|
||||
-- try to execute the policy after it has been dropped --
|
||||
@ -81,10 +81,10 @@ create or replace function dummy_now() returns BIGINT LANGUAGE SQL IMMUTABLE as
|
||||
select set_integer_now_func('test_table_int', 'dummy_now');
|
||||
insert into test_table_int select generate_series(1,5), 10;
|
||||
alter table test_table_int set (timescaledb.compress);
|
||||
select add_compress_chunks_policy('test_table_int', 2::int);
|
||||
select add_compression_policy('test_table_int', 2::int) AS compressjob_id
|
||||
\gset
|
||||
|
||||
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks
|
||||
where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name like 'test_table_int');
|
||||
select * from _timescaledb_config.bgw_job where id=:compressjob_id;
|
||||
\gset
|
||||
select test_compress_chunks_policy(:compressjob_id);
|
||||
select test_compress_chunks_policy(:compressjob_id);
|
||||
@ -101,7 +101,7 @@ SELECT create_hypertable('test_table_nologin', 'time', chunk_time_interval => 1)
|
||||
SELECT set_integer_now_func('test_table_nologin', 'dummy_now');
|
||||
ALTER TABLE test_table_nologin set (timescaledb.compress);
|
||||
\set ON_ERROR_STOP 0
|
||||
SELECT add_compress_chunks_policy('test_table_nologin', 2::int);
|
||||
SELECT add_compression_policy('test_table_nologin', 2::int);
|
||||
\set ON_ERROR_STOP 1
|
||||
RESET ROLE;
|
||||
REVOKE NOLOGIN_ROLE FROM :ROLE_DEFAULT_PERM_USER;
|
||||
@ -149,6 +149,6 @@ SELECT COUNT(*) AS dropped_chunks_count
|
||||
FROM _timescaledb_catalog.chunk
|
||||
WHERE dropped = TRUE;
|
||||
|
||||
SELECT add_compress_chunks_policy AS job_id
|
||||
FROM add_compress_chunks_policy('conditions', INTERVAL '1 day') \gset
|
||||
SELECT add_compression_policy AS job_id
|
||||
FROM add_compression_policy('conditions', INTERVAL '1 day') \gset
|
||||
SELECT test_compress_chunks_policy(:job_id);
|
||||
|
@ -301,8 +301,8 @@ WHERE hypertable.table_name like 'test1' ORDER BY hypertable.id LIMIT 1 \gset
|
||||
--before the drop there are 2 hypertables: the compressed and uncompressed ones
|
||||
SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;
|
||||
--add policy to make sure it's dropped later
|
||||
select add_compress_chunks_policy(:'UNCOMPRESSED_HYPER_NAME', interval '1 day');
|
||||
SELECT count(*) FROM _timescaledb_config.bgw_policy_compress_chunks;
|
||||
select add_compression_policy(:'UNCOMPRESSED_HYPER_NAME', interval '1 day');
|
||||
SELECT count(*) FROM _timescaledb_config.bgw_job WHERE job_type = 'compress_chunks';
|
||||
|
||||
DROP TABLE :UNCOMPRESSED_HYPER_NAME;
|
||||
|
||||
@ -311,7 +311,7 @@ SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;
|
||||
SELECT count(*) FROM _timescaledb_catalog.hypertable_compression;
|
||||
|
||||
--verify that the policy is gone
|
||||
SELECT count(*) FROM _timescaledb_config.bgw_policy_compress_chunks;
|
||||
SELECT count(*) FROM _timescaledb_config.bgw_job WHERE job_type = 'compress_chunks';
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
@ -370,12 +370,12 @@ WHERE hypertable.table_name like 'test1' and chunk.compressed_chunk_id IS NOT NU
|
||||
)
|
||||
AS sub;
|
||||
|
||||
select add_compress_chunks_policy('test1', interval '1 day');
|
||||
select add_compression_policy('test1', interval '1 day');
|
||||
\set ON_ERROR_STOP 0
|
||||
ALTER table test1 set (timescaledb.compress='f');
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
select remove_compress_chunks_policy('test1');
|
||||
select remove_compression_policy('test1');
|
||||
ALTER table test1 set (timescaledb.compress='f');
|
||||
|
||||
--only one hypertable left
|
||||
@ -403,8 +403,6 @@ WHERE hypertable.table_name like 'test1' and chunk.compressed_chunk_id IS NULL O
|
||||
)
|
||||
AS sub;
|
||||
|
||||
|
||||
|
||||
DROP TABLE test1 CASCADE;
|
||||
DROP TABLESPACE tablespace1;
|
||||
DROP TABLESPACE tablespace2;
|
||||
@ -447,8 +445,3 @@ FROM _timescaledb_catalog.chunk chunk
|
||||
INNER JOIN _timescaledb_catalog.hypertable hypertable ON (chunk.hypertable_id = hypertable.id)
|
||||
WHERE hypertable.table_name like 'test1' ORDER BY chunk.id ) as subq;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -35,13 +35,13 @@ select compress_chunk(ch1.schema_name|| '.' || ch1.table_name)
|
||||
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'conditions' and ch1.compressed_chunk_id IS NULL;
|
||||
select decompress_chunk(ch1.schema_name|| '.' || ch1.table_name)
|
||||
FROM _timescaledb_catalog.chunk ch1, _timescaledb_catalog.hypertable ht where ch1.hypertable_id = ht.id and ht.table_name like 'conditions';
|
||||
select add_compress_chunks_policy('conditions', '1day'::interval);
|
||||
select add_compression_policy('conditions', '1day'::interval);
|
||||
|
||||
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
|
||||
select add_compress_chunks_policy('conditions', '1day'::interval);
|
||||
select add_compression_policy('conditions', '1day'::interval);
|
||||
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER_2
|
||||
--try dropping policy
|
||||
select remove_compress_chunks_policy('conditions', true);
|
||||
select remove_compression_policy('conditions', true);
|
||||
|
||||
--Tests for GRANTS.
|
||||
-- as owner grant select , compress chunk and check SELECT works
|
||||
|
@ -52,7 +52,7 @@ ts_test_auto_reorder(PG_FUNCTION_ARGS)
|
||||
"that cannot accept type record")));
|
||||
}
|
||||
|
||||
execute_reorder_policy(job->fd.id, job->fd.config, dummy_reorder_func, false);
|
||||
policy_reorder_execute(job->fd.id, job->fd.config, dummy_reorder_func, false);
|
||||
|
||||
values[0] = ObjectIdGetDatum(chunk_oid);
|
||||
values[1] = ObjectIdGetDatum(index_oid);
|
||||
@ -80,5 +80,5 @@ ts_test_auto_compress_chunks(PG_FUNCTION_ARGS)
|
||||
/*since we didn't come through the scheduler, need to mark job
|
||||
* as started to create a job_stat record */
|
||||
ts_bgw_job_stat_mark_start(job_id);
|
||||
return execute_compress_chunks_policy(job);
|
||||
return policy_compression_execute(job->fd.id, job->fd.config);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user