Add compress chunks policy support

Add and drop compress chunks policy using bgw
infrastructure.
This commit is contained in:
gayyappan 2019-08-21 13:01:05 -04:00 committed by Matvey Arye
parent 5c891f732e
commit 6e60d2614c
42 changed files with 1005 additions and 81 deletions

View File

@ -30,6 +30,11 @@ CREATE OR REPLACE FUNCTION add_reorder_policy(hypertable REGCLASS, index_name NA
AS '@MODULE_PATHNAME@', 'ts_add_reorder_policy'
LANGUAGE C VOLATILE STRICT;
CREATE OR REPLACE FUNCTION add_compress_chunks_policy(hypertable REGCLASS, older_than "any", if_not_exists BOOL = false)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_add_compress_chunks_policy'
LANGUAGE C VOLATILE STRICT;
CREATE OR REPLACE FUNCTION remove_drop_chunks_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_remove_drop_chunks_policy'
LANGUAGE C VOLATILE STRICT;
@ -38,6 +43,10 @@ CREATE OR REPLACE FUNCTION remove_reorder_policy(hypertable REGCLASS, if_exists
AS '@MODULE_PATHNAME@', 'ts_remove_reorder_policy'
LANGUAGE C VOLATILE STRICT;
CREATE OR REPLACE FUNCTION remove_compress_chunks_policy(hypertable REGCLASS, if_exists BOOL = false) RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_remove_compress_chunks_policy'
LANGUAGE C VOLATILE STRICT;
-- Returns the updated job schedule values
CREATE OR REPLACE FUNCTION alter_job_schedule(
job_id INTEGER,

View File

@ -174,7 +174,7 @@ CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_job (
max_runtime INTERVAL NOT NULL,
max_retries INT NOT NULL,
retry_period INTERVAL NOT NULL,
CONSTRAINT valid_job_type CHECK (job_type IN ('telemetry_and_version_check_if_enabled', 'reorder', 'drop_chunks', 'continuous_aggregate'))
CONSTRAINT valid_job_type CHECK (job_type IN ('telemetry_and_version_check_if_enabled', 'reorder', 'drop_chunks', 'continuous_aggregate', 'compress_chunks'))
);
ALTER SEQUENCE _timescaledb_config.bgw_job_id_seq OWNED BY _timescaledb_config.bgw_job.id;
@ -332,14 +332,14 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_chunk_size (
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', '');
CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_compress_chunks_policy(
hypertable_id INTEGER REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE,
older_than BIGINT NOT NULL,
job_id SMALLINT REFERENCES _timescaledb_config.bgw_job(id),
UNIQUE (hypertable_id, job_id)
CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_policy_compress_chunks(
job_id INTEGER PRIMARY KEY REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE,
hypertable_id INTEGER UNIQUE NOT NULL REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE,
older_than _timescaledb_catalog.ts_interval NOT NULL,
CONSTRAINT valid_older_than CHECK(_timescaledb_internal.valid_ts_interval(older_than))
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_compress_chunks_policy', '');
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_policy_compress_chunks', '');
-- Set table permissions

View File

@ -156,20 +156,19 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.compression_chunk_size (
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', '');
CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_compress_chunks_policy(
hypertable_id INTEGER REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE,
older_than BIGINT NOT NULL,
job_id SMALLINT REFERENCES _timescaledb_config.bgw_job(id),
UNIQUE (hypertable_id, job_id)
CREATE TABLE IF NOT EXISTS _timescaledb_config.bgw_policy_compress_chunks(
job_id INTEGER PRIMARY KEY REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE,
hypertable_id INTEGER UNIQUE NOT NULL REFERENCES _timescaledb_catalog.hypertable(id) ON DELETE CASCADE,
older_than _timescaledb_catalog.ts_interval NOT NULL,
CONSTRAINT valid_older_than CHECK(_timescaledb_internal.valid_ts_interval(older_than))
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_compress_chunks_policy', '');
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_config.bgw_policy_compress_chunks', '');
GRANT SELECT ON _timescaledb_catalog.compression_algorithm TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.hypertable_compression TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.compression_chunk_size TO PUBLIC;
GRANT SELECT ON _timescaledb_config.bgw_compress_chunks_policy TO PUBLIC;
GRANT SELECT ON _timescaledb_config.bgw_policy_compress_chunks TO PUBLIC;
CREATE TYPE _timescaledb_internal.compressed_data;

View File

@ -27,6 +27,7 @@
#include "telemetry/telemetry.h"
#include "bgw_policy/chunk_stats.h"
#include "bgw_policy/drop_chunks.h"
#include "bgw_policy/compress_chunks.h"
#include "bgw_policy/reorder.h"
#include "scan_iterator.h"
@ -39,6 +40,7 @@ static const char *job_type_names[_MAX_JOB_TYPE] = {
[JOB_TYPE_REORDER] = "reorder",
[JOB_TYPE_DROP_CHUNKS] = "drop_chunks",
[JOB_TYPE_CONTINUOUS_AGGREGATE] = "continuous_aggregate",
[JOB_TYPE_COMPRESS_CHUNKS] = "compress_chunks",
[JOB_TYPE_UNKNOWN] = "unknown",
};
@ -110,6 +112,13 @@ ts_bgw_job_owner(BgwJob *job)
return ts_rel_get_owner(ts_continuous_agg_get_user_view_oid(ca));
}
case JOB_TYPE_COMPRESS_CHUNKS:
{
BgwPolicyCompressChunks *policy = ts_bgw_policy_compress_chunks_find_by_job(job->fd.id);
if (policy == NULL)
elog(ERROR, "compress chunks policy for job with id \"%d\" not found", job->fd.id);
return ts_rel_get_owner(ts_hypertable_id_to_relid(policy->fd.hypertable_id));
}
case JOB_TYPE_UNKNOWN:
if (unknown_job_type_owner_hook != NULL)
return unknown_job_type_owner_hook(job);
@ -473,6 +482,7 @@ ts_bgw_job_execute(BgwJob *job)
case JOB_TYPE_REORDER:
case JOB_TYPE_DROP_CHUNKS:
case JOB_TYPE_CONTINUOUS_AGGREGATE:
case JOB_TYPE_COMPRESS_CHUNKS:
return ts_cm_functions->bgw_policy_job_execute(job);
case JOB_TYPE_UNKNOWN:
if (unknown_job_type_hook != NULL)

View File

@ -19,6 +19,7 @@ typedef enum JobType
JOB_TYPE_REORDER,
JOB_TYPE_DROP_CHUNKS,
JOB_TYPE_CONTINUOUS_AGGREGATE,
JOB_TYPE_COMPRESS_CHUNKS,
/* end of real jobs */
JOB_TYPE_UNKNOWN,
_MAX_JOB_TYPE

View File

@ -22,7 +22,7 @@ typedef enum JobResult
extern TSDLLEXPORT BgwJobStat *ts_bgw_job_stat_find(int job_id);
extern void ts_bgw_job_stat_delete(int job_id);
extern void ts_bgw_job_stat_mark_start(int32 bgw_job_id);
extern TSDLLEXPORT void ts_bgw_job_stat_mark_start(int32 bgw_job_id);
extern void ts_bgw_job_stat_mark_end(BgwJob *job, JobResult result);
extern bool ts_bgw_job_stat_end_was_marked(BgwJobStat *jobstat);

View File

@ -1,6 +1,7 @@
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/reorder.c
${CMAKE_CURRENT_SOURCE_DIR}/drop_chunks.c
${CMAKE_CURRENT_SOURCE_DIR}/compress_chunks.c
${CMAKE_CURRENT_SOURCE_DIR}/policy.c
${CMAKE_CURRENT_SOURCE_DIR}/chunk_stats.c
)

View File

@ -0,0 +1,157 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include <funcapi.h>
#include "bgw/job.h"
#include "catalog.h"
#include "compress_chunks.h"
#include "hypertable.h"
#include "interval.h"
#include "policy.h"
#include "scanner.h"
#include "scan_iterator.h"
#include "utils.h"
static ScanTupleResult
bgw_policy_compress_chunks_tuple_found(TupleInfo *ti, void *const data)
{
BgwPolicyCompressChunks **policy = data;
bool nulls[Natts_bgw_policy_compress_chunks];
Datum values[Natts_bgw_policy_compress_chunks];
heap_deform_tuple(ti->tuple, ti->desc, values, nulls);
*policy = MemoryContextAllocZero(ti->mctx, sizeof(BgwPolicyCompressChunks));
Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)]);
(*policy)->fd.job_id =
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)]);
(*policy)->fd.hypertable_id = DatumGetInt32(
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)]);
(*policy)->fd.older_than = *ts_interval_from_tuple(
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)]);
return SCAN_CONTINUE;
}
static ScanTupleResult
compress_policy_delete_row_tuple_found(TupleInfo *ti, void *const data)
{
CatalogSecurityContext sec_ctx;
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_delete(ti->scanrel, ti->tuple);
ts_catalog_restore_user(&sec_ctx);
return SCAN_CONTINUE;
}
/*deletes only from the compress_chunks policy table. need to remove the job separately */
bool
ts_bgw_policy_compress_chunks_delete(int32 job_id)
{
ScanKeyData scankey[1];
ScanKeyInit(&scankey[0],
Anum_bgw_policy_compress_chunks_pkey_job_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(job_id));
return ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS,
BGW_POLICY_COMPRESS_CHUNKS_PKEY,
scankey,
1,
compress_policy_delete_row_tuple_found,
RowExclusiveLock,
BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME,
NULL);
}
BgwPolicyCompressChunks *
ts_bgw_policy_compress_chunks_find_by_hypertable(int32 hypertable_id)
{
ScanKeyData scankey[1];
BgwPolicyCompressChunks *ret = NULL;
ScanKeyInit(&scankey[0],
Anum_bgw_policy_compress_chunks_hypertable_id_key_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(hypertable_id));
ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS,
BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY,
scankey,
1,
bgw_policy_compress_chunks_tuple_found,
RowExclusiveLock,
BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME,
(void *) &ret);
return ret;
}
void
ts_bgw_policy_compress_chunks_insert(BgwPolicyCompressChunks *policy)
{
TupleDesc tupdesc;
CatalogSecurityContext sec_ctx;
Datum values[Natts_bgw_policy_compress_chunks];
bool nulls[Natts_bgw_policy_compress_chunks] = { false };
HeapTuple ht_older_than;
Catalog *catalog = ts_catalog_get();
Relation rel =
heap_open(catalog_get_table_id(catalog, BGW_POLICY_COMPRESS_CHUNKS), RowExclusiveLock);
tupdesc = RelationGetDescr(rel);
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_job_id)] =
Int32GetDatum(policy->fd.job_id);
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_hypertable_id)] =
Int32GetDatum(policy->fd.hypertable_id);
ht_older_than = ts_interval_form_heaptuple(&policy->fd.older_than);
values[AttrNumberGetAttrOffset(Anum_bgw_policy_compress_chunks_older_than)] =
HeapTupleGetDatum(ht_older_than);
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, tupdesc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
heap_freetuple(ht_older_than);
heap_close(rel, RowExclusiveLock);
}
BgwPolicyCompressChunks *
ts_bgw_policy_compress_chunks_find_by_job(int32 job_id)
{
ScanKeyData scankey[1];
BgwPolicyCompressChunks *ret = NULL;
ScanKeyInit(&scankey[0],
Anum_bgw_policy_compress_chunks_pkey_job_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(job_id));
ts_catalog_scan_one(BGW_POLICY_COMPRESS_CHUNKS,
BGW_POLICY_COMPRESS_CHUNKS_PKEY,
scankey,
1,
bgw_policy_compress_chunks_tuple_found,
RowExclusiveLock,
BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME,
(void *) &ret);
return ret;
}

View File

@ -0,0 +1,23 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#ifndef TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H
#define TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H
#include "catalog.h"
#include "export.h"
typedef struct BgwPolicyCompressChunks
{
FormData_bgw_policy_compress_chunks fd;
} BgwPolicyCompressChunks;
extern TSDLLEXPORT BgwPolicyCompressChunks *ts_bgw_policy_compress_chunks_find_by_job(int32 job_id);
extern TSDLLEXPORT BgwPolicyCompressChunks *
ts_bgw_policy_compress_chunks_find_by_hypertable(int32 hypertable_id);
extern TSDLLEXPORT void ts_bgw_policy_compress_chunks_insert(BgwPolicyCompressChunks *policy);
extern TSDLLEXPORT bool ts_bgw_policy_compress_chunks_delete(int32 job_id);
#endif /* TIMESCALEDB_BGW_POLICY_COMPRESS_CHUNKS_H */

View File

@ -106,9 +106,9 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = COMPRESSION_CHUNK_SIZE_TABLE_NAME,
},
[BGW_COMPRESS_CHUNKS_POLICY] = {
[BGW_POLICY_COMPRESS_CHUNKS] = {
.schema_name = CONFIG_SCHEMA_NAME,
.table_name = BGW_COMPRESS_CHUNKS_POLICY_TABLE_NAME,
.table_name = BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME,
},
[_MAX_CATALOG_TABLES] = {
.schema_name = "invalid schema",
@ -250,10 +250,11 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
[COMPRESSION_CHUNK_SIZE_PKEY] = "compression_chunk_size_pkey",
},
},
[BGW_COMPRESS_CHUNKS_POLICY] = {
.length = _MAX_BGW_COMPRESS_CHUNKS_POLICY_INDEX,
[BGW_POLICY_COMPRESS_CHUNKS] = {
.length = _MAX_BGW_POLICY_COMPRESS_CHUNKS_INDEX,
.names = (char *[]) {
[BGW_COMPRESS_CHUNKS_POLICY_HYPERTABLE_ID_JOB_ID_KEY] = "bgw_compress_chunks_policy_hypertable_id_job_id_key",
[BGW_POLICY_COMPRESS_CHUNKS_PKEY] = "bgw_policy_compress_chunks_pkey",
[BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY] = "bgw_policy_compress_chunks_hypertable_id_key",
},
},
};
@ -276,7 +277,7 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = {
[CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG] = NULL,
[HYPERTABLE_COMPRESSION] = NULL,
[COMPRESSION_CHUNK_SIZE] = NULL,
[BGW_COMPRESS_CHUNKS_POLICY] = NULL,
[BGW_POLICY_COMPRESS_CHUNKS] = NULL,
};
typedef struct InternalFunctionDef

View File

@ -54,7 +54,7 @@ typedef enum CatalogTable
CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG,
HYPERTABLE_COMPRESSION,
COMPRESSION_CHUNK_SIZE,
BGW_COMPRESS_CHUNKS_POLICY,
BGW_POLICY_COMPRESS_CHUNKS,
_MAX_CATALOG_TABLES,
} CatalogTable;
@ -1132,40 +1132,49 @@ typedef enum Anum_compression_chunk_size_pkey
#define Natts_compression_chunk_size_pkey (_Anum_compression_chunk_size_pkey_max - 1)
#define BGW_COMPRESS_CHUNKS_POLICY_TABLE_NAME "bgw_compress_chunks_policy"
typedef enum Anum_bgw_compress_chunks_policy
#define BGW_POLICY_COMPRESS_CHUNKS_TABLE_NAME "bgw_policy_compress_chunks"
typedef enum Anum_bgw_policy_compress_chunks
{
Anum_bgw_compress_chunks_policy_hypertable_id = 1,
Anum_bgw_compress_chunks_policy_older_than,
Anum_bgw_compress_chunks_policy_job_id,
_Anum_bgw_compress_chunks_policy_max,
} Anum_bgw_compress_chunks_policy;
Anum_bgw_policy_compress_chunks_job_id = 1,
Anum_bgw_policy_compress_chunks_hypertable_id,
Anum_bgw_policy_compress_chunks_older_than,
_Anum_bgw_policy_compress_chunks_max,
} Anum_bgw_policy_compress_chunks;
#define Natts_bgw_compress_chunks_policy (_Anum_bgw_compress_chunks_policy_max - 1)
#define Natts_bgw_policy_compress_chunks (_Anum_bgw_policy_compress_chunks_max - 1)
typedef struct FormData_bgw_compress_chunks_policy
typedef struct FormData_bgw_policy_compress_chunks
{
int32 job_id;
int32 hypertable_id;
int64 older_than;
int16 job_id;
} FormData_bgw_compress_chunks_policy;
FormData_ts_interval older_than;
} FormData_bgw_policy_compress_chunks;
typedef FormData_bgw_compress_chunks_policy *Form_bgw_compress_chunks_policy;
typedef FormData_bgw_policy_compress_chunks *Form_bgw_policy_compress_chunks;
enum
{
BGW_COMPRESS_CHUNKS_POLICY_HYPERTABLE_ID_JOB_ID_KEY = 0,
_MAX_BGW_COMPRESS_CHUNKS_POLICY_INDEX,
BGW_POLICY_COMPRESS_CHUNKS_HYPERTABLE_ID_KEY = 0,
BGW_POLICY_COMPRESS_CHUNKS_PKEY,
_MAX_BGW_POLICY_COMPRESS_CHUNKS_INDEX,
};
typedef enum Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key
{
Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key_hypertable_id = 1,
Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key_job_id,
_Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key_max,
} Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key;
#define Natts_bgw_compress_chunks_policy_hypertable_id_job_id_key \
(_Anum_bgw_compress_chunks_policy_hypertable_id_job_id_key_max - 1)
typedef enum Anum_bgw_policy_compress_chunks_hypertable_id_key
{
Anum_bgw_policy_compress_chunks_hypertable_id_key_hypertable_id = 1,
_Anum_bgw_policy_compress_chunks_hypertable_id_key_max,
} Anum_bgw_policy_compress_chunks_hypertable_id_key;
#define Natts_bgw_policy_compress_chunks_hypertable_id_key \
(_Anum_bgw_policy_compress_chunks_hypertable_id_key_max - 1)
typedef enum Anum_bgw_policy_compress_chunks_pkey
{
Anum_bgw_policy_compress_chunks_pkey_job_id = 1,
_Anum_bgw_policy_compress_chunks_pkey_max,
} Anum_bgw_policy_compress_chunks_pkey;
#define Natts_bgw_policy_compress_chunks_pkey (_Anum_bgw_policy_compress_chunks_pkey_max - 1)
/*
* The maximum number of indexes a catalog table can have.

View File

@ -2310,3 +2310,27 @@ ts_chunks_in(PG_FUNCTION_ARGS)
"with AND operator")));
pg_unreachable();
}
/* has chunk ,specifieid by chunk_id, been compressed */
bool
ts_chunk_is_compressed(int32 chunk_id)
{
bool compressed = false;
ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
iterator.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX);
ts_scan_iterator_scan_key_init(&iterator,
Anum_chunk_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(chunk_id));
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
bool isnull;
heap_getattr(ti->tuple, Anum_chunk_compressed_chunk_id, ti->desc, &isnull);
compressed = !isnull; // isnull is false when compress chunk id is set
}
ts_scan_iterator_close(&iterator);
return compressed;
}

View File

@ -119,6 +119,7 @@ extern TSDLLEXPORT List *ts_chunk_do_drop_chunks(Oid table_relid, Datum older_th
Oid newer_than_type, bool cascade,
bool cascades_to_materializations,
int32 log_level);
extern TSDLLEXPORT bool ts_chunk_is_compressed(int32 chunk_id);
#define chunk_get_by_name(schema_name, table_name, num_constraints, fail_if_not_found) \
ts_chunk_get_by_name_with_memory_context(schema_name, \

View File

@ -15,8 +15,10 @@
TS_FUNCTION_INFO_V1(ts_add_drop_chunks_policy);
TS_FUNCTION_INFO_V1(ts_add_reorder_policy);
TS_FUNCTION_INFO_V1(ts_add_compress_chunks_policy);
TS_FUNCTION_INFO_V1(ts_remove_drop_chunks_policy);
TS_FUNCTION_INFO_V1(ts_remove_reorder_policy);
TS_FUNCTION_INFO_V1(ts_remove_compress_chunks_policy);
TS_FUNCTION_INFO_V1(ts_alter_job_schedule);
TS_FUNCTION_INFO_V1(ts_reorder_chunk);
TS_FUNCTION_INFO_V1(ts_move_chunk);
@ -55,6 +57,12 @@ ts_add_reorder_policy(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(ts_cm_functions->add_reorder_policy(fcinfo));
}
Datum
ts_add_compress_chunks_policy(PG_FUNCTION_ARGS)
{
PG_RETURN_DATUM(ts_cm_functions->add_compress_chunks_policy(fcinfo));
}
Datum
ts_remove_drop_chunks_policy(PG_FUNCTION_ARGS)
{
@ -67,6 +75,12 @@ ts_remove_reorder_policy(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(ts_cm_functions->remove_reorder_policy(fcinfo));
}
Datum
ts_remove_compress_chunks_policy(PG_FUNCTION_ARGS)
{
return ts_cm_functions->remove_compress_chunks_policy(fcinfo);
}
Datum
ts_alter_job_schedule(PG_FUNCTION_ARGS)
{
@ -388,8 +402,10 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.continuous_agg_materialize = cagg_materialize_default_fn,
.add_drop_chunks_policy = error_no_default_fn_pg_enterprise,
.add_reorder_policy = error_no_default_fn_pg_enterprise,
.add_compress_chunks_policy = error_no_default_fn_pg_enterprise,
.remove_drop_chunks_policy = error_no_default_fn_pg_enterprise,
.remove_reorder_policy = error_no_default_fn_pg_enterprise,
.remove_compress_chunks_policy = error_no_default_fn_pg_enterprise,
.create_upper_paths_hook = NULL,
.set_rel_pathlist_hook = NULL,
.gapfill_marker = error_no_default_fn_pg_community,

View File

@ -40,8 +40,10 @@ typedef struct CrossModuleFunctions
bool (*continuous_agg_materialize)(int32 materialization_id, bool verbose);
Datum (*add_drop_chunks_policy)(PG_FUNCTION_ARGS);
Datum (*add_reorder_policy)(PG_FUNCTION_ARGS);
Datum (*add_compress_chunks_policy)(PG_FUNCTION_ARGS);
Datum (*remove_drop_chunks_policy)(PG_FUNCTION_ARGS);
Datum (*remove_reorder_policy)(PG_FUNCTION_ARGS);
Datum (*remove_compress_chunks_policy)(PG_FUNCTION_ARGS);
void (*create_upper_paths_hook)(PlannerInfo *, UpperRelationKind, RelOptInfo *, RelOptInfo *);
void (*set_rel_pathlist_hook)(PlannerInfo *, RelOptInfo *, Index, RangeTblEntry *,
Hypertable *);

View File

@ -778,3 +778,44 @@ ts_dimension_slice_oldest_chunk_without_executed_job(int32 job_id, int32 dimensi
return info.chunk_id;
}
static ScanTupleResult
dimension_slice_check_is_chunk_uncompressed_tuple_found(TupleInfo *ti, void *data)
{
ListCell *lc;
DimensionSlice *slice = dimension_slice_from_tuple(ti->tuple);
List *chunk_ids = NIL;
ts_chunk_constraint_scan_by_dimension_slice_to_list(slice, &chunk_ids, CurrentMemoryContext);
foreach (lc, chunk_ids)
{
int32 chunk_id = lfirst_int(lc);
if (!ts_chunk_is_compressed(chunk_id))
{
/* found a chunk that has not yet been compressed */
*((int32 *) data) = chunk_id;
return SCAN_DONE;
}
}
return SCAN_CONTINUE;
}
int32
ts_dimension_slice_get_chunkid_to_compress(int32 dimension_id, StrategyNumber start_strategy,
int64 start_value, StrategyNumber end_strategy,
int64 end_value)
{
int32 chunk_id_ret = INVALID_CHUNK_ID;
dimension_slice_scan_with_strategies(dimension_id,
start_strategy,
start_value,
end_strategy,
end_value,
&chunk_id_ret,
dimension_slice_check_is_chunk_uncompressed_tuple_found,
-1);
return chunk_id_ret;
}

View File

@ -76,7 +76,11 @@ extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_nth_latest_slice(int32 dim
extern TSDLLEXPORT int ts_dimension_slice_oldest_chunk_without_executed_job(
int32 job_id, int32 dimension_id, StrategyNumber start_strategy, int64 start_value,
StrategyNumber end_strategy, int64 end_value);
extern TSDLLEXPORT int32 ts_dimension_slice_get_chunkid_to_compress(int32 dimension_id,
StrategyNumber start_strategy,
int64 start_value,
StrategyNumber end_strategy,
int64 end_value);
#define dimension_slice_insert(slice) ts_dimension_slice_insert_multi(&(slice), 1)
#define dimension_slice_scan(dimension_id, coordinate) \

View File

@ -45,6 +45,7 @@
#include "hypertable_cache.h"
#include "trigger.h"
#include "scanner.h"
#include "scan_iterator.h"
#include "catalog.h"
#include "dimension_slice.h"
#include "dimension_vector.h"
@ -2136,3 +2137,27 @@ ts_hypertable_create_compressed(Oid table_relid, int32 hypertable_id)
heap_close(rel, NoLock);
return true;
}
/* is this a internal hypertable created for compression */
TSDLLEXPORT bool
ts_hypertable_is_compressed_internal(int32 compressed_hypertable_id)
{
bool compressed = false;
ScanIterator iterator =
ts_scan_iterator_create(HYPERTABLE, AccessShareLock, CurrentMemoryContext);
iterator.ctx.index = catalog_get_index(ts_catalog_get(), HYPERTABLE, HYPERTABLE_ID_INDEX);
ts_scan_iterator_scan_key_init(&iterator,
Anum_hypertable_pkey_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(compressed_hypertable_id));
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
bool isnull;
compressed =
DatumGetBool(heap_getattr(ti->tuple, Anum_hypertable_compressed, ti->desc, &isnull));
}
return compressed;
}

View File

@ -112,6 +112,7 @@ extern List *ts_hypertable_get_all_by_name(Name schema_name, Name table_name, Me
extern bool ts_is_partitioning_column(Hypertable *ht, Index column_attno);
extern TSDLLEXPORT bool ts_hypertable_set_compressed_id(Hypertable *ht,
int32 compressed_hypertable_id);
extern TSDLLEXPORT bool ts_hypertable_is_compressed_internal(int32 compressed_hypertable_id);
#define hypertable_scan(schema, table, tuple_found, data, lockmode, tuplock) \
ts_hypertable_scan_with_memory_context(schema, \

View File

@ -17,5 +17,6 @@ hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Da
bool *nulls);
extern TSDLLEXPORT bool hypertable_compression_delete_by_hypertable_id(int32 htid);
extern TSDLLEXPORT bool ts_is_compression_hypertable(int32 hypertable_id);
#endif

View File

@ -14,6 +14,7 @@ WHERE OID IN (
ORDER BY proname;
proname
----------------------------------
add_compress_chunks_policy
add_dimension
add_drop_chunks_policy
add_reorder_policy
@ -39,6 +40,7 @@ ORDER BY proname;
last
locf
move_chunk
remove_compress_chunks_policy
remove_drop_chunks_policy
remove_reorder_policy
reorder_chunk
@ -52,5 +54,5 @@ ORDER BY proname;
time_bucket_gapfill
timescaledb_post_restore
timescaledb_pre_restore
(38 rows)
(40 rows)

View File

@ -1,6 +1,7 @@
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/reorder_api.c
${CMAKE_CURRENT_SOURCE_DIR}/drop_chunks_api.c
${CMAKE_CURRENT_SOURCE_DIR}/compress_chunks_api.c
${CMAKE_CURRENT_SOURCE_DIR}/job.c
)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})

View File

@ -0,0 +1,189 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <access/xact.h>
#include <miscadmin.h>
#include <utils/builtins.h>
#include "bgw_policy/compress_chunks.h"
#include "bgw/job.h"
#include "compress_chunks_api.h"
#include "errors.h"
#include "hypertable.h"
#include "hypertable_cache.h"
#include "interval.h"
#include "license.h"
#include "utils.h"
/*
* Default scheduled interval for compress jobs = default chunk length.
* If this is non-timestamp based hypertable, then default is 1 day
*/
#define DEFAULT_SCHEDULE_INTERVAL \
DatumGetIntervalP(DirectFunctionCall7(make_interval, \
Int32GetDatum(0), \
Int32GetDatum(0), \
Int32GetDatum(0), \
Int32GetDatum(1), \
Int32GetDatum(0), \
Int32GetDatum(0), \
Float8GetDatum(0)))
/* Default max runtime is unlimited for compress chunks */
#define DEFAULT_MAX_RUNTIME \
DatumGetIntervalP(DirectFunctionCall7(make_interval, \
Int32GetDatum(0), \
Int32GetDatum(0), \
Int32GetDatum(0), \
Int32GetDatum(0), \
Int32GetDatum(0), \
Int32GetDatum(0), \
Float8GetDatum(0)))
/* Right now, there is an infinite number of retries for compress_chunks jobs */
#define DEFAULT_MAX_RETRIES -1
/* Default retry period for reorder_jobs is currently 1 hour */
#define DEFAULT_RETRY_PERIOD \
DatumGetIntervalP(DirectFunctionCall7(make_interval, \
Int32GetDatum(0), \
Int32GetDatum(0), \
Int32GetDatum(0), \
Int32GetDatum(0), \
Int32GetDatum(1), \
Int32GetDatum(0), \
Float8GetDatum(0)))
Datum
compress_chunks_add_policy(PG_FUNCTION_ARGS)
{
NameData application_name;
NameData compress_chunks_name;
int32 job_id;
BgwPolicyCompressChunks *existing;
Oid ht_oid = PG_GETARG_OID(0);
Datum older_than_datum = PG_GETARG_DATUM(1);
Oid older_than_type = PG_ARGISNULL(1) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 1);
bool if_not_exists = PG_GETARG_BOOL(2);
Interval *default_schedule_interval = DEFAULT_SCHEDULE_INTERVAL;
BgwPolicyCompressChunks policy;
Hypertable *hypertable;
Cache *hcache;
Dimension *dim;
FormData_ts_interval *older_than;
ts_hypertable_permissions_check(ht_oid, GetUserId());
older_than = ts_interval_from_sql_input(ht_oid,
older_than_datum,
older_than_type,
"older_than",
"compress_chunks_add_policy");
/* check if this is a table with compression enabled */
hcache = ts_hypertable_cache_pin();
hypertable = ts_hypertable_cache_get_entry(hcache, ht_oid);
if (!hypertable || !TS_HYPERTABLE_HAS_COMPRESSION_ON(hypertable))
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("can add compress_chunks policy only on hypertables with compression "
"enabled")));
}
/* Make sure that an existing policy doesn't exist on this hypertable */
existing = ts_bgw_policy_compress_chunks_find_by_hypertable(hypertable->fd.id);
if (existing != NULL)
{
if (!if_not_exists)
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("compress chunks policy already exists for hypertable \"%s\"",
get_rel_name(ht_oid))));
}
if (ts_interval_equal(&existing->fd.older_than, older_than))
{
/* If all arguments are the same, do nothing */
ts_cache_release(hcache);
ereport(NOTICE,
(errmsg("compress chunks policy already exists on hypertable \"%s\", skipping",
get_rel_name(ht_oid))));
PG_RETURN_INT32(-1);
}
else
{
ts_cache_release(hcache);
elog(WARNING,
"could not add compress_chunks policy due to existing policy on hypertable with "
"different arguments");
PG_RETURN_INT32(-1);
}
}
dim = hyperspace_get_open_dimension(hypertable->space, 0);
if (dim && IS_TIMESTAMP_TYPE(ts_dimension_get_partition_type(dim)))
{
default_schedule_interval = DatumGetIntervalP(
ts_internal_to_interval_value(dim->fd.interval_length / 2, INTERVALOID));
}
/* insert a new job into jobs table */
namestrcpy(&application_name, "Compress Chunks Background Job");
namestrcpy(&compress_chunks_name, "compress_chunks");
job_id = ts_bgw_job_insert_relation(&application_name,
&compress_chunks_name,
default_schedule_interval,
DEFAULT_MAX_RUNTIME,
DEFAULT_MAX_RETRIES,
DEFAULT_RETRY_PERIOD);
policy = (BgwPolicyCompressChunks){ .fd = {
.job_id = job_id,
.hypertable_id = ts_hypertable_relid_to_id(ht_oid),
.older_than = *older_than,
} };
/* Now, insert a new row in the compress_chunks args table */
ts_bgw_policy_compress_chunks_insert(&policy);
ts_cache_release(hcache);
PG_RETURN_INT32(job_id);
}
Datum
compress_chunks_remove_policy(PG_FUNCTION_ARGS)
{
Oid hypertable_oid = PG_GETARG_OID(0);
bool if_exists = PG_GETARG_BOOL(1);
/* Remove the job, then remove the policy */
int ht_id = ts_hypertable_relid_to_id(hypertable_oid);
BgwPolicyCompressChunks *policy = ts_bgw_policy_compress_chunks_find_by_hypertable(ht_id);
ts_hypertable_permissions_check(hypertable_oid, GetUserId());
if (policy == NULL)
{
if (!if_exists)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("cannot remove compress chunks policy, no such policy exists")));
else
{
ereport(NOTICE,
(errmsg("compress chunks policy does not exist on hypertable \"%s\", skipping",
get_rel_name(hypertable_oid))));
PG_RETURN_BOOL(false);
}
}
ts_bgw_job_delete_by_id(policy->fd.job_id);
/* Now, delete from policy table */
ts_bgw_policy_compress_chunks_delete(policy->fd.job_id);
PG_RETURN_BOOL(true);
}

View File

@ -0,0 +1,15 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#ifndef TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H
#define TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H
#include <postgres.h>
/* User-facing API functions */
extern Datum compress_chunks_add_policy(PG_FUNCTION_ARGS);
extern Datum compress_chunks_remove_policy(PG_FUNCTION_ARGS);
#endif /* TIMESCALEDB_TSL_BGW_POLICY_COMPRESS_CHUNKS_API_H */

View File

@ -88,7 +88,7 @@ drop_chunks_add_policy(PG_FUNCTION_ARGS)
ereport(NOTICE,
(errmsg("drop chunks policy already exists on hypertable \"%s\", skipping",
get_rel_name(ht_oid))));
return -1;
PG_RETURN_INT32(-1);
}
else
{
@ -96,7 +96,7 @@ drop_chunks_add_policy(PG_FUNCTION_ARGS)
elog(WARNING,
"could not add drop_chunks policy due to existing policy on hypertable with "
"different arguments");
return -1;
PG_RETURN_INT32(-1);
}
}

View File

@ -15,10 +15,13 @@
#include <utils/snapmgr.h>
#include "bgw/timer.h"
#include "bgw/job.h"
#include "bgw/job_stat.h"
#include "bgw_policy/chunk_stats.h"
#include "bgw_policy/drop_chunks.h"
#include "bgw_policy/compress_chunks.h"
#include "bgw_policy/reorder.h"
#include "compression/compress_utils.h"
#include "continuous_aggs/materialize.h"
#include "continuous_aggs/job.h"
@ -77,6 +80,21 @@ get_chunk_id_to_reorder(int32 job_id, Hypertable *ht)
-1);
}
static int32
get_chunk_to_compress(Hypertable *ht, FormData_ts_interval *older_than)
{
Dimension *open_dim = hyperspace_get_open_dimension(ht->space, 0);
StrategyNumber end_strategy = BTLessStrategyNumber;
Oid partitioning_type = ts_dimension_get_partition_type(open_dim);
int64 end_value = ts_time_value_to_internal(ts_interval_subtract_from_now(older_than, open_dim),
partitioning_type);
return ts_dimension_slice_get_chunkid_to_compress(open_dim->fd.id,
InvalidStrategy, /*start_strategy*/
-1, /*start_value*/
end_strategy,
end_value);
}
bool
execute_reorder_policy(BgwJob *job, reorder_func reorder, bool fast_continue)
{
@ -241,6 +259,77 @@ execute_materialize_continuous_aggregate(BgwJob *job)
return true;
}
bool
execute_compress_chunks_policy(BgwJob *job)
{
bool started = false;
BgwPolicyCompressChunks *args;
Oid table_relid;
Hypertable *ht;
Cache *hcache;
int32 chunkid;
Chunk *chunk = NULL;
int job_id = job->fd.id;
if (!IsTransactionOrTransactionBlock())
{
started = true;
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
}
/* Get the arguments from the compress_chunks_policy table */
args = ts_bgw_policy_compress_chunks_find_by_job(job_id);
if (args == NULL)
ereport(ERROR,
(errcode(ERRCODE_TS_INTERNAL_ERROR),
errmsg("could not run compress_chunks policy #%d because no args in policy table",
job_id)));
table_relid = ts_hypertable_id_to_relid(args->fd.hypertable_id);
hcache = ts_hypertable_cache_pin();
ht = ts_hypertable_cache_get_entry(hcache, table_relid);
/* First verify that the hypertable corresponds to a valid table */
if (ht == NULL)
ereport(ERROR,
(errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
errmsg("could not run compress_chunks policy #%d because \"%s\" is not a "
"hypertable",
job_id,
get_rel_name(table_relid))));
chunkid = get_chunk_to_compress(ht, &args->fd.older_than);
if (chunkid == INVALID_CHUNK_ID)
{
elog(NOTICE,
"no chunks for hypertable %s.%s that satisfy compress chunk policy",
ht->fd.schema_name.data,
ht->fd.table_name.data);
}
else
{
chunk = ts_chunk_get_by_id(chunkid, 0, true);
tsl_compress_chunk_wrapper(chunk->table_id);
elog(LOG,
"completed compressing chunk %s.%s",
NameStr(chunk->fd.schema_name),
NameStr(chunk->fd.table_name));
}
chunkid = get_chunk_to_compress(ht, &args->fd.older_than);
if (chunkid != INVALID_CHUNK_ID)
enable_fast_restart(job, "compress_chunks");
ts_cache_release(hcache);
if (started)
{
PopActiveSnapshot();
CommitTransactionCommand();
}
return true;
}
static bool
bgw_policy_job_requires_enterprise_license(BgwJob *job)
{
@ -254,6 +343,8 @@ bgw_policy_job_requires_enterprise_license(BgwJob *job)
return true;
case JOB_TYPE_CONTINUOUS_AGGREGATE:
return false;
case JOB_TYPE_COMPRESS_CHUNKS:
return true;
default:
elog(ERROR,
"scheduler could not determine the license type for job type: \"%s\"",
@ -277,6 +368,8 @@ tsl_bgw_policy_job_execute(BgwJob *job)
return execute_drop_chunks_policy(job->fd.id);
case JOB_TYPE_CONTINUOUS_AGGREGATE:
return execute_materialize_continuous_aggregate(job);
case JOB_TYPE_COMPRESS_CHUNKS:
return execute_compress_chunks_policy(job);
default:
elog(ERROR,
"scheduler tried to run an invalid job type: \"%s\"",

View File

@ -20,7 +20,7 @@ typedef void (*reorder_func)(Oid tableOid, Oid indexOid, bool verbose, Oid wait_
/* Functions exposed only for testing */
extern bool execute_reorder_policy(BgwJob *job, reorder_func reorder, bool fast_continue);
extern bool execute_drop_chunks_policy(int32 job_id);
extern bool execute_compress_chunks_policy(BgwJob *job);
extern bool tsl_bgw_policy_job_execute(BgwJob *job);
extern Datum bgw_policy_alter_job_schedule(PG_FUNCTION_ARGS);

View File

@ -120,13 +120,13 @@ reorder_add_policy(PG_FUNCTION_ARGS)
elog(WARNING,
"could not add reorder policy due to existing policy on hypertable with different "
"arguments");
return -1;
PG_RETURN_INT32(-1);
}
/* If all arguments are the same, do nothing */
ereport(NOTICE,
(errmsg("reorder policy already exists on hypertable \"%s\", skipping",
get_rel_name(ht_oid))));
return -1;
PG_RETURN_INT32(-1);
}
/* Next, insert a new job into jobs table */

View File

@ -281,20 +281,23 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_
ts_cache_release(hcache);
}
Datum
tsl_compress_chunk(PG_FUNCTION_ARGS)
void
tsl_compress_chunk_wrapper(Oid chunk_relid)
{
Oid chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
Chunk *srcchunk = ts_chunk_get_by_relid(chunk_id, 0, true);
Chunk *srcchunk = ts_chunk_get_by_relid(chunk_relid, 0, true);
if (srcchunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
{
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("chunk is already compressed")));
}
license_enforce_enterprise_enabled();
license_print_expiration_warning_if_needed();
compress_chunk_impl(srcchunk->hypertable_relid, chunk_relid);
}
compress_chunk_impl(srcchunk->hypertable_relid, chunk_id);
Datum
tsl_compress_chunk(PG_FUNCTION_ARGS)
{
Oid chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
tsl_compress_chunk_wrapper(chunk_id);
PG_RETURN_VOID();
}
@ -306,9 +309,6 @@ tsl_decompress_chunk(PG_FUNCTION_ARGS)
if (NULL == uncompressed_chunk)
elog(ERROR, "unkown chunk id %d", uncompressed_chunk_id);
license_enforce_enterprise_enabled();
license_print_expiration_warning_if_needed();
decompress_chunk_impl(uncompressed_chunk->hypertable_relid, uncompressed_chunk_id);
PG_RETURN_VOID();
}

View File

@ -8,5 +8,6 @@
extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_decompress_chunk(PG_FUNCTION_ARGS);
extern void tsl_compress_chunk_wrapper(Oid chunk_relid);
#endif // TIMESCALEDB_TSL_COMPRESSION_UTILS_H

View File

@ -4,7 +4,6 @@
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <miscadmin.h>
#include <access/heapam.h>
#include <access/reloptions.h>
#include <access/tupdesc.h>
@ -13,24 +12,24 @@
#include <catalog/toasting.h>
#include <commands/tablecmds.h>
#include <commands/tablespace.h>
#include <miscadmin.h>
#include <nodes/makefuncs.h>
#include <storage/lmgr.h>
#include <utils/builtins.h>
#include <utils/rel.h>
#include "catalog.h"
#include "compat.h"
#include "create.h"
#include "chunk.h"
#include "chunk_index.h"
#include "trigger.h"
#include "scan_iterator.h"
#include "hypertable_cache.h"
#include "continuous_agg.h"
#include "compression_with_clause.h"
#include "compression.h"
#include "hypertable_cache.h"
#include "hypertable_compression.h"
#include "custom_type_cache.h"
#include "license.h"
#include "trigger.h"
/* entrypoint
* tsl_process_compress_table : is the entry point.
@ -489,9 +488,25 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
List *orderby_cols;
bool compression_already_enabled;
bool compressed_chunks_exist;
ContinuousAggHypertableStatus caggstat;
license_enforce_enterprise_enabled();
license_print_expiration_warning_if_needed();
/*check this is not a special internally created hypertable
* continuous agg table
* compression hypertable
*/
caggstat = ts_continuous_agg_hypertable_status(ht->fd.id);
if (!(caggstat == HypertableIsRawTable || caggstat == HypertableIsNotContinuousAgg))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("continuous aggregate tables do not support compression")));
}
if (ht->fd.compressed)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot compress internal compression hypertable")));
}
/* Lock the uncompressed ht in exclusive mode and keep till end of txn */
LockRelationOid(ht->main_table_relid, AccessExclusiveLock);
@ -502,7 +517,7 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
segmentby_cols = ts_compress_hypertable_parse_segment_by(with_clause_options, ht);
orderby_cols = ts_compress_hypertable_parse_order_by(with_clause_options, ht);
orderby_cols = add_time_to_order_by_if_not_included(orderby_cols, segmentby_cols, ht);
compression_already_enabled = ht->fd.compressed_hypertable_id != INVALID_HYPERTABLE_ID;
compression_already_enabled = TS_HYPERTABLE_HAS_COMPRESSION_ON(ht);
compressed_chunks_exist =
compression_already_enabled && ts_chunk_exists_with_compression(ht->fd.id);

View File

@ -20,6 +20,7 @@
#include "bgw_policy/job.h"
#include "bgw_policy/reorder_api.h"
#include "bgw_policy/drop_chunks_api.h"
#include "bgw_policy/compress_chunks_api.h"
#include "compression/compression.h"
#include "compression/dictionary.h"
#include "compression/gorilla.h"
@ -73,8 +74,10 @@ CrossModuleFunctions tsl_cm_functions = {
.continuous_agg_materialize = continuous_agg_materialize,
.add_drop_chunks_policy = drop_chunks_add_policy,
.add_reorder_policy = reorder_add_policy,
.add_compress_chunks_policy = compress_chunks_add_policy,
.remove_drop_chunks_policy = drop_chunks_remove_policy,
.remove_reorder_policy = reorder_remove_policy,
.remove_compress_chunks_policy = compress_chunks_remove_policy,
.create_upper_paths_hook = tsl_create_upper_paths_hook,
.set_rel_pathlist_hook = tsl_set_rel_pathlist_hook,
.gapfill_marker = gapfill_marker,

View File

@ -17,7 +17,6 @@ insert into foo values( 10 , 10 , 20, 120);
insert into foo values( 20 , 11 , 20, 13);
insert into foo values( 30 , 12 , 20, 14);
alter table foo set (timescaledb.compress, timescaledb.compress_segmentby = 'a,b', timescaledb.compress_orderby = 'c desc, d asc nulls last');
WARNING: Timescale License expired
select id, schema_name, table_name, compressed, compressed_hypertable_id from
_timescaledb_catalog.hypertable order by id;
id | schema_name | table_name | compressed | compressed_hypertable_id

View File

@ -0,0 +1,186 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT _timescaledb_internal.stop_background_workers();
stop_background_workers
-------------------------
t
(1 row)
SELECT _timescaledb_internal.enterprise_enabled();
enterprise_enabled
--------------------
t
(1 row)
CREATE OR REPLACE FUNCTION test_compress_chunks_policy(job_id INTEGER)
RETURNS VOID
AS :TSL_MODULE_PATHNAME, 'ts_test_auto_compress_chunks'
LANGUAGE C VOLATILE STRICT;
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
location TEXT NOT NULL,
location2 char(10) NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days'::interval);
create_hypertable
-------------------------
(1,public,conditions,t)
(1 row)
--TEST 1--
--cannot set policy without enabling compression --
\set ON_ERROR_STOP 0
select add_compress_chunks_policy('conditions', '60d'::interval);
ERROR: can add compress_chunks policy only on hypertables with compression enabled
\set ON_ERROR_STOP 1
-- TEST2 --
--add a policy to compress chunks --
alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby = 'location', timescaledb.compress_orderby = 'time');
insert into conditions
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75;
select add_compress_chunks_policy('conditions', '60d'::interval);
add_compress_chunks_policy
----------------------------
1000
(1 row)
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks;
compressjob_id | hypertable_id | older_than
----------------+---------------+------------------
1000 | 1 | (t,"@ 60 days",)
(1 row)
\gset
select * from _timescaledb_config.bgw_job where job_type like 'compress%';
id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period
------+--------------------------------+-----------------+--------------------+-------------+-------------+--------------
1000 | Compress Chunks Background Job | compress_chunks | @ 15 days 12 hours | @ 0 | -1 | @ 1 hour
(1 row)
select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s');
WARNING: Timescale License expired
job_id | schedule_interval | max_runtime | max_retries | retry_period
--------+-------------------+-------------+-------------+--------------
1000 | @ 1 sec | @ 0 | -1 | @ 1 hour
(1 row)
select * from _timescaledb_config.bgw_job where job_type like 'compress%';
id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period
------+--------------------------------+-----------------+-------------------+-------------+-------------+--------------
1000 | Compress Chunks Background Job | compress_chunks | @ 1 sec | @ 0 | -1 | @ 1 hour
(1 row)
insert into conditions
select generate_series(now()::timestamp, now()::timestamp+'1day', '1 min'), 'TOK', 'sony', 55, 75;
-- TEST3 --
--only the old chunks will get compressed when policy is executed--
select test_compress_chunks_policy(:compressjob_id);
test_compress_chunks_policy
-----------------------------
(1 row)
select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_bytes from timescaledb_information.compressed_chunk_size order by chunk_name;
hypertable_name | chunk_name | uncompressed_total_bytes | compressed_total_bytes
-----------------+----------------------------------------+--------------------------+------------------------
conditions | _timescaledb_internal._hyper_1_1_chunk | 32 kB | 16 kB
(1 row)
-- TEST 4 --
--cannot set another policy
\set ON_ERROR_STOP 0
select add_compress_chunks_policy('conditions', '60d'::interval, if_not_exists=>true);
NOTICE: compress chunks policy already exists on hypertable "conditions", skipping
add_compress_chunks_policy
----------------------------
-1
(1 row)
select add_compress_chunks_policy('conditions', '60d'::interval);
ERROR: compress chunks policy already exists for hypertable "conditions"
select add_compress_chunks_policy('conditions', '30d'::interval, if_not_exists=>true);
WARNING: could not add compress_chunks policy due to existing policy on hypertable with different arguments
add_compress_chunks_policy
----------------------------
-1
(1 row)
\set ON_ERROR_STOP 1
--TEST 5 --
-- drop the policy --
select remove_compress_chunks_policy('conditions');
remove_compress_chunks_policy
-------------------------------
t
(1 row)
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks;
compressjob_id | hypertable_id | older_than
----------------+---------------+------------
(0 rows)
--TEST 6 --
-- try to execute the policy after it has been dropped --
\set ON_ERROR_STOP 0
select test_compress_chunks_policy(:compressjob_id);
ERROR: bgw job not found
\set ON_ERROR_STOP 1
--TEST 7
--compress chunks policy for integer based partition hypertable
CREATE TABLE test_table_int(time bigint, val int);
SELECT create_hypertable('test_table_int', 'time', chunk_time_interval => 1);
NOTICE: adding not-null constraint to column "time"
create_hypertable
-----------------------------
(3,public,test_table_int,t)
(1 row)
create or replace function dummy_now() returns BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 5::BIGINT';
select set_integer_now_func('test_table_int', 'dummy_now');
set_integer_now_func
----------------------
(1 row)
insert into test_table_int select generate_series(1,5), 10;
alter table test_table_int set (timescaledb.compress);
select add_compress_chunks_policy('test_table_int', 2::int);
add_compress_chunks_policy
----------------------------
1001
(1 row)
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks
where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name like 'test_table_int');
compressjob_id | hypertable_id | older_than
----------------+---------------+------------
1001 | 3 | (f,,2)
(1 row)
\gset
select test_compress_chunks_policy(:compressjob_id);
test_compress_chunks_policy
-----------------------------
(1 row)
select test_compress_chunks_policy(:compressjob_id);
test_compress_chunks_policy
-----------------------------
(1 row)
select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_bytes from timescaledb_information.compressed_chunk_size
where hypertable_name::text like 'test_table_int'
order by chunk_name;
hypertable_name | chunk_name | uncompressed_total_bytes | compressed_total_bytes
-----------------+----------------------------------------+--------------------------+------------------------
test_table_int | _timescaledb_internal._hyper_3_5_chunk | 24 kB | 16 kB
test_table_int | _timescaledb_internal._hyper_3_6_chunk | 24 kB | 16 kB
(2 rows)

View File

@ -31,7 +31,6 @@ insert into non_compressed values( 3 , 16 , 20, 4);
ALTER TABLE foo2 set (timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c');
ERROR: must set the 'compress' boolean option when setting compression options
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'c');
WARNING: Timescale License expired
ERROR: cannot use the same column c in compress_orderby and compress_segmentby
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd DESC');
ALTER TABLE foo2 set (timescaledb.compress, timescaledb.compress_segmentby = '"bacB toD",c' , timescaledb.compress_orderby = 'd');

View File

@ -38,7 +38,6 @@ NOTICE: adding not-null constraint to column "Time"
INSERT INTO test1 SELECT t, gen_rand_minstd(), gen_rand_minstd(), gen_rand_minstd()::text FROM generate_series('2018-03-02 1:00'::TIMESTAMPTZ, '2018-03-28 1:00', '1 hour') t;
ALTER TABLE test1 set (timescaledb.compress, timescaledb.compress_segmentby = '', timescaledb.compress_orderby = '"Time" DESC');
WARNING: Timescale License expired
SELECT
$$
SELECT * FROM test1 ORDER BY "Time"
@ -68,7 +67,6 @@ pg_dump: Consider using a full dump instead of a --data-only dump to avoid this
Number of rows different between original and query on compressed data (expect 0) | 0
(1 row)
psql:include/compression_test_hypertable.sql:41: WARNING: Timescale License expired
count_decompressed
--------------------
27
@ -144,7 +142,6 @@ pg_dump: Consider using a full dump instead of a --data-only dump to avoid this
Number of rows different between original and query on compressed data (expect 0) | 0
(1 row)
psql:include/compression_test_hypertable.sql:41: WARNING: Timescale License expired
count_decompressed
--------------------
5
@ -253,7 +250,6 @@ pg_dump: Consider using a full dump instead of a --data-only dump to avoid this
Number of rows different between original and query on compressed data (expect 0) | 0
(1 row)
psql:include/compression_test_hypertable.sql:41: WARNING: Timescale License expired
count_decompressed
--------------------
1
@ -327,7 +323,6 @@ pg_dump: Consider using a full dump instead of a --data-only dump to avoid this
Number of rows different between original and query on compressed data (expect 0) | 0
(1 row)
psql:include/compression_test_hypertable.sql:41: WARNING: Timescale License expired
count_decompressed
--------------------
10

View File

@ -44,7 +44,6 @@ ANALYZE metrics_space;
\set ECHO none
-- compress first and last chunk on the hypertable
ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device_id');
WARNING: Timescale License expired
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
compress_chunk
----------------

View File

@ -44,7 +44,6 @@ ANALYZE metrics_space;
\set ECHO none
-- compress first and last chunk on the hypertable
ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device_id');
WARNING: Timescale License expired
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
compress_chunk
----------------

View File

@ -44,7 +44,6 @@ ANALYZE metrics_space;
\set ECHO none
-- compress first and last chunk on the hypertable
ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device_id');
WARNING: Timescale License expired
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
compress_chunk
----------------

View File

@ -19,6 +19,7 @@ set(TEST_FILES_DEBUG
compression_errors.sql
compression_hypertable.sql
compression_segment_meta.sql
compression_bgw.sql
continuous_aggs.sql
continuous_aggs_bgw.sql
continuous_aggs_materialize.sql

View File

@ -0,0 +1,89 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT _timescaledb_internal.stop_background_workers();
SELECT _timescaledb_internal.enterprise_enabled();
CREATE OR REPLACE FUNCTION test_compress_chunks_policy(job_id INTEGER)
RETURNS VOID
AS :TSL_MODULE_PATHNAME, 'ts_test_auto_compress_chunks'
LANGUAGE C VOLATILE STRICT;
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
location TEXT NOT NULL,
location2 char(10) NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
select create_hypertable( 'conditions', 'time', chunk_time_interval=> '31days'::interval);
--TEST 1--
--cannot set policy without enabling compression --
\set ON_ERROR_STOP 0
select add_compress_chunks_policy('conditions', '60d'::interval);
\set ON_ERROR_STOP 1
-- TEST2 --
--add a policy to compress chunks --
alter table conditions set (timescaledb.compress, timescaledb.compress_segmentby = 'location', timescaledb.compress_orderby = 'time');
insert into conditions
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75;
select add_compress_chunks_policy('conditions', '60d'::interval);
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks;
\gset
select * from _timescaledb_config.bgw_job where job_type like 'compress%';
select * from alter_job_schedule(:compressjob_id, schedule_interval=>'1s');
select * from _timescaledb_config.bgw_job where job_type like 'compress%';
insert into conditions
select generate_series(now()::timestamp, now()::timestamp+'1day', '1 min'), 'TOK', 'sony', 55, 75;
-- TEST3 --
--only the old chunks will get compressed when policy is executed--
select test_compress_chunks_policy(:compressjob_id);
select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_bytes from timescaledb_information.compressed_chunk_size order by chunk_name;
-- TEST 4 --
--cannot set another policy
\set ON_ERROR_STOP 0
select add_compress_chunks_policy('conditions', '60d'::interval, if_not_exists=>true);
select add_compress_chunks_policy('conditions', '60d'::interval);
select add_compress_chunks_policy('conditions', '30d'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1
--TEST 5 --
-- drop the policy --
select remove_compress_chunks_policy('conditions');
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks;
--TEST 6 --
-- try to execute the policy after it has been dropped --
\set ON_ERROR_STOP 0
select test_compress_chunks_policy(:compressjob_id);
\set ON_ERROR_STOP 1
--TEST 7
--compress chunks policy for integer based partition hypertable
CREATE TABLE test_table_int(time bigint, val int);
SELECT create_hypertable('test_table_int', 'time', chunk_time_interval => 1);
create or replace function dummy_now() returns BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 5::BIGINT';
select set_integer_now_func('test_table_int', 'dummy_now');
insert into test_table_int select generate_series(1,5), 10;
alter table test_table_int set (timescaledb.compress);
select add_compress_chunks_policy('test_table_int', 2::int);
select job_id as compressjob_id, hypertable_id, older_than from _timescaledb_config.bgw_policy_compress_chunks
where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name like 'test_table_int');
\gset
select test_compress_chunks_policy(:compressjob_id);
select test_compress_chunks_policy(:compressjob_id);
select hypertable_name, chunk_name, uncompressed_total_bytes, compressed_total_bytes from timescaledb_information.compressed_chunk_size
where hypertable_name::text like 'test_table_int'
order by chunk_name;

View File

@ -11,6 +11,7 @@
#include <access/htup_details.h>
#include "bgw_policy/job.h"
#include "bgw/job_stat.h"
#include "chunk.h"
#include "reorder.h"
@ -18,6 +19,7 @@
TS_FUNCTION_INFO_V1(ts_test_auto_reorder);
TS_FUNCTION_INFO_V1(ts_test_auto_drop_chunks);
TS_FUNCTION_INFO_V1(ts_test_auto_compress_chunks);
static Oid chunk_oid;
static Oid index_oid;
@ -67,3 +69,15 @@ ts_test_auto_drop_chunks(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}
/* Call the real compress_chunks policy */
Datum
ts_test_auto_compress_chunks(PG_FUNCTION_ARGS)
{
int32 job_id = PG_GETARG_INT32(0);
BgwJob *job = ts_bgw_job_find(job_id, CurrentMemoryContext, true);
/*since we didn't come through the scheduler, need to mark job
* as started to create a job_stat record */
ts_bgw_job_stat_mark_start(job_id);
return execute_compress_chunks_policy(job);
}