Add policy_recompression procedure

This patch adds a recompress procedure that may be used as custom
job when compression and recompression should run as separate
background jobs.
This commit is contained in:
Sven Klemm 2021-05-22 23:24:28 +02:00 committed by gayyappan
parent 426918c59f
commit fe872cb684
14 changed files with 418 additions and 12 deletions

View File

@ -14,6 +14,10 @@ CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression(job_id INTE
AS '@MODULE_PATHNAME@', 'ts_policy_compression_proc'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_recompression(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_recompression_proc'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_refresh_continuous_aggregate(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_proc'
LANGUAGE C;

View File

@ -24,6 +24,7 @@
CROSSMODULE_WRAPPER(policy_compression_add);
CROSSMODULE_WRAPPER(policy_compression_proc);
CROSSMODULE_WRAPPER(policy_compression_remove);
CROSSMODULE_WRAPPER(policy_recompression_proc);
CROSSMODULE_WRAPPER(policy_refresh_cagg_add);
CROSSMODULE_WRAPPER(policy_refresh_cagg_proc);
CROSSMODULE_WRAPPER(policy_refresh_cagg_remove);
@ -309,6 +310,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.policy_compression_add = error_no_default_fn_pg_community,
.policy_compression_proc = error_no_default_fn_pg_community,
.policy_compression_remove = error_no_default_fn_pg_community,
.policy_recompression_proc = error_no_default_fn_pg_community,
.policy_refresh_cagg_add = error_no_default_fn_pg_community,
.policy_refresh_cagg_proc = error_no_default_fn_pg_community,
.policy_refresh_cagg_remove = error_no_default_fn_pg_community,

View File

@ -44,6 +44,7 @@ typedef struct CrossModuleFunctions
PGFunction policy_compression_add;
PGFunction policy_compression_proc;
PGFunction policy_compression_remove;
PGFunction policy_recompression_proc;
PGFunction policy_refresh_cagg_add;
PGFunction policy_refresh_cagg_proc;
PGFunction policy_refresh_cagg_remove;

View File

@ -931,12 +931,20 @@ ts_dimension_slice_oldest_valid_chunk_for_reorder(int32 job_id, int32 dimension_
return info.chunk_id;
}
typedef struct CompressChunkSearch
{
int32 chunk_id;
bool compress;
bool recompress;
} CompressChunkSearch;
static ScanTupleResult
dimension_slice_check_is_chunk_uncompressed_tuple_found(TupleInfo *ti, void *data)
{
ListCell *lc;
DimensionSlice *slice = dimension_slice_from_slot(ti->slot);
List *chunk_ids = NIL;
CompressChunkSearch *d = data;
ts_chunk_constraint_scan_by_dimension_slice_to_list(slice, &chunk_ids, CurrentMemoryContext);
@ -944,12 +952,13 @@ dimension_slice_check_is_chunk_uncompressed_tuple_found(TupleInfo *ti, void *dat
{
int32 chunk_id = lfirst_int(lc);
ChunkCompressionStatus st = ts_chunk_get_compression_status(chunk_id);
if (st == CHUNK_COMPRESS_NONE || st == CHUNK_COMPRESS_UNORDERED)
if ((d->compress && st == CHUNK_COMPRESS_NONE) ||
(d->recompress && st == CHUNK_COMPRESS_UNORDERED))
{
/* found a chunk that is not compressed or needs recompress
* caller needs to check the correct chunk status
*/
*((int32 *) data) = chunk_id;
d->chunk_id = chunk_id;
return SCAN_DONE;
}
}
@ -960,18 +969,20 @@ dimension_slice_check_is_chunk_uncompressed_tuple_found(TupleInfo *ti, void *dat
int32
ts_dimension_slice_get_chunkid_to_compress(int32 dimension_id, StrategyNumber start_strategy,
int64 start_value, StrategyNumber end_strategy,
int64 end_value)
int64 end_value, bool compress, bool recompress)
{
int32 chunk_id_ret = INVALID_CHUNK_ID;
CompressChunkSearch data = { .compress = compress,
.recompress = recompress,
.chunk_id = INVALID_CHUNK_ID };
dimension_slice_scan_with_strategies(dimension_id,
start_strategy,
start_value,
end_strategy,
end_value,
&chunk_id_ret,
&data,
dimension_slice_check_is_chunk_uncompressed_tuple_found,
-1,
NULL);
return chunk_id_ret;
return data.chunk_id;
}

View File

@ -77,11 +77,9 @@ extern TSDLLEXPORT int
ts_dimension_slice_oldest_valid_chunk_for_reorder(int32 job_id, int32 dimension_id,
StrategyNumber start_strategy, int64 start_value,
StrategyNumber end_strategy, int64 end_value);
extern TSDLLEXPORT int32 ts_dimension_slice_get_chunkid_to_compress(int32 dimension_id,
StrategyNumber start_strategy,
int64 start_value,
StrategyNumber end_strategy,
int64 end_value);
extern TSDLLEXPORT int32 ts_dimension_slice_get_chunkid_to_compress(
int32 dimension_id, StrategyNumber start_strategy, int64 start_value,
StrategyNumber end_strategy, int64 end_value, bool compress, bool recompress);
#define dimension_slice_insert(slice) ts_dimension_slice_insert_multi(&(slice), 1)
#define dimension_slice_scan(dimension_id, coordinate, tuplock) \

View File

@ -171,6 +171,24 @@ ts_jsonb_get_time_field(const Jsonb *jsonb, const char *key, bool *field_found)
return DatumGetTimestampTz(time_datum);
}
bool
ts_jsonb_get_bool_field(const Jsonb *json, const char *key, bool *field_found)
{
Datum bool_datum;
char *bool_str = ts_jsonb_get_str_field(json, key);
if (bool_str == NULL)
{
*field_found = false;
return false;
}
bool_datum = DirectFunctionCall1(boolin, CStringGetDatum(bool_str));
*field_found = true;
return DatumGetBool(bool_datum);
}
int32
ts_jsonb_get_int32_field(const Jsonb *json, const char *key, bool *field_found)
{

View File

@ -31,6 +31,8 @@ extern TSDLLEXPORT char *ts_jsonb_get_str_field(const Jsonb *jsonb, const char *
extern TSDLLEXPORT Interval *ts_jsonb_get_interval_field(const Jsonb *jsonb, const char *key);
extern TSDLLEXPORT TimestampTz ts_jsonb_get_time_field(const Jsonb *jsonb, const char *key,
bool *field_found);
extern TSDLLEXPORT bool ts_jsonb_get_bool_field(const Jsonb *json, const char *key,
bool *field_found);
extern TSDLLEXPORT int32 ts_jsonb_get_int32_field(const Jsonb *json, const char *key,
bool *field_found);
extern TSDLLEXPORT int64 ts_jsonb_get_int64_field(const Jsonb *json, const char *key,

View File

@ -37,8 +37,20 @@
DatumGetIntervalP(DirectFunctionCall3(interval_in, CStringGetDatum("1 hour"), InvalidOid, -1))
#define POLICY_COMPRESSION_PROC_NAME "policy_compression"
#define POLICY_RECOMPRESSION_PROC_NAME "policy_recompression"
#define CONFIG_KEY_HYPERTABLE_ID "hypertable_id"
#define CONFIG_KEY_COMPRESS_AFTER "compress_after"
#define CONFIG_KEY_RECOMPRESS_AFTER "recompress_after"
#define CONFIG_KEY_RECOMPRESS "recompress"
bool
policy_compression_get_recompress(const Jsonb *config)
{
bool found;
bool recompress = ts_jsonb_get_bool_field(config, CONFIG_KEY_RECOMPRESS, &found);
return found ? recompress : true;
}
int32
policy_compression_get_hypertable_id(const Jsonb *config)
@ -81,6 +93,33 @@ policy_compression_get_compress_after_interval(const Jsonb *config)
return interval;
}
int64
policy_recompression_get_recompress_after_int(const Jsonb *config)
{
bool found;
int64 compress_after = ts_jsonb_get_int64_field(config, CONFIG_KEY_RECOMPRESS_AFTER, &found);
if (!found)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for job", CONFIG_KEY_RECOMPRESS_AFTER)));
return compress_after;
}
Interval *
policy_recompression_get_recompress_after_interval(const Jsonb *config)
{
Interval *interval = ts_jsonb_get_interval_field(config, CONFIG_KEY_RECOMPRESS_AFTER);
if (interval == NULL)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for job", CONFIG_KEY_RECOMPRESS_AFTER)));
return interval;
}
Datum
policy_compression_proc(PG_FUNCTION_ARGS)
{
@ -94,6 +133,19 @@ policy_compression_proc(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
Datum
policy_recompression_proc(PG_FUNCTION_ARGS)
{
if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1))
PG_RETURN_VOID();
TS_PREVENT_FUNC_IF_READ_ONLY();
policy_recompression_execute(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1));
PG_RETURN_VOID();
}
Datum
policy_compression_add(PG_FUNCTION_ARGS)
{

View File

@ -16,8 +16,13 @@ extern Datum policy_compression_add(PG_FUNCTION_ARGS);
extern Datum policy_compression_remove(PG_FUNCTION_ARGS);
extern Datum policy_compression_proc(PG_FUNCTION_ARGS);
extern Datum policy_recompression_proc(PG_FUNCTION_ARGS);
int32 policy_compression_get_hypertable_id(const Jsonb *config);
int64 policy_compression_get_compress_after_int(const Jsonb *config);
Interval *policy_compression_get_compress_after_interval(const Jsonb *config);
bool policy_compression_get_recompress(const Jsonb *config);
int64 policy_recompression_get_recompress_after_int(const Jsonb *config);
Interval *policy_recompression_get_recompress_after_interval(const Jsonb *config);
#endif /* TIMESCALEDB_TSL_BGW_POLICY_COMPRESSION_API_H */

View File

@ -132,6 +132,7 @@ get_chunk_to_compress(const Dimension *dim, const Jsonb *config)
{
Oid partitioning_type = ts_dimension_get_partition_type(dim);
StrategyNumber end_strategy = BTLessStrategyNumber;
bool recompress = policy_compression_get_recompress(config);
Datum boundary = get_window_boundary(dim,
config,
@ -143,7 +144,30 @@ get_chunk_to_compress(const Dimension *dim, const Jsonb *config)
-1, /*start_value*/
end_strategy,
ts_time_value_to_internal(boundary,
partitioning_type));
partitioning_type),
true,
recompress);
}
static int32
get_chunk_to_recompress(const Dimension *dim, const Jsonb *config)
{
Oid partitioning_type = ts_dimension_get_partition_type(dim);
StrategyNumber end_strategy = BTLessStrategyNumber;
Datum boundary = get_window_boundary(dim,
config,
policy_recompression_get_recompress_after_int,
policy_recompression_get_recompress_after_interval);
return ts_dimension_slice_get_chunkid_to_compress(dim->fd.id,
InvalidStrategy, /*start_strategy*/
-1, /*start_value*/
end_strategy,
ts_time_value_to_internal(boundary,
partitioning_type),
false,
true);
}
static void
@ -551,6 +575,61 @@ policy_compression_read_and_validate_config(Jsonb *config, PolicyCompressionData
}
}
void
policy_recompression_read_and_validate_config(Jsonb *config, PolicyCompressionData *policy_data)
{
Oid table_relid = ts_hypertable_id_to_relid(policy_compression_get_hypertable_id(config));
Cache *hcache;
Hypertable *hypertable =
ts_hypertable_cache_get_cache_and_entry(table_relid, CACHE_FLAG_NONE, &hcache);
if (policy_data)
{
policy_data->hypertable = hypertable;
policy_data->hcache = hcache;
}
}
bool
policy_recompression_execute(int32 job_id, Jsonb *config)
{
int32 chunkid;
Dimension *dim;
PolicyCompressionData policy_data;
policy_recompression_read_and_validate_config(config, &policy_data);
dim = hyperspace_get_open_dimension(policy_data.hypertable->space, 0);
chunkid = get_chunk_to_recompress(dim, config);
if (chunkid == INVALID_CHUNK_ID)
elog(NOTICE,
"no chunks for hypertable \"%s.%s\" that satisfy recompress chunk policy",
policy_data.hypertable->fd.schema_name.data,
policy_data.hypertable->fd.table_name.data);
if (chunkid != INVALID_CHUNK_ID)
{
Chunk *chunk = ts_chunk_get_by_id(chunkid, true);
if (hypertable_is_distributed(policy_data.hypertable))
policy_invoke_recompress_chunk(chunk);
else
tsl_recompress_chunk_wrapper(chunk);
elog(LOG,
"completed recompressing chunk \"%s.%s\"",
NameStr(chunk->fd.schema_name),
NameStr(chunk->fd.table_name));
}
chunkid = get_chunk_to_recompress(dim, config);
if (chunkid != INVALID_CHUNK_ID)
enable_fast_restart(job_id, "recompression");
ts_cache_release(policy_data.hcache);
elog(DEBUG1, "job %d completed recompressing chunk", job_id);
return true;
}
static void
job_execute_function(FuncExpr *funcexpr)
{

View File

@ -49,6 +49,7 @@ extern bool policy_reorder_execute(int32 job_id, Jsonb *config);
extern bool policy_retention_execute(int32 job_id, Jsonb *config);
extern bool policy_refresh_cagg_execute(int32 job_id, Jsonb *config);
extern bool policy_compression_execute(int32 job_id, Jsonb *config);
extern bool policy_recompression_execute(int32 job_id, Jsonb *config);
extern void policy_reorder_read_and_validate_config(Jsonb *config, PolicyReorderData *policy_data);
extern void policy_retention_read_and_validate_config(Jsonb *config,
PolicyRetentionData *policy_data);
@ -56,6 +57,8 @@ extern void policy_refresh_cagg_read_and_validate_config(Jsonb *config,
PolicyContinuousAggData *policy_data);
extern void policy_compression_read_and_validate_config(Jsonb *config,
PolicyCompressionData *policy_data);
extern void policy_recompression_read_and_validate_config(Jsonb *config,
PolicyCompressionData *policy_data);
extern bool job_execute(BgwJob *job);
#endif /* TIMESCALEDB_TSL_BGW_POLICY_JOB_H */

View File

@ -91,6 +91,7 @@ CrossModuleFunctions tsl_cm_functions = {
.policy_compression_add = policy_compression_add,
.policy_compression_proc = policy_compression_proc,
.policy_compression_remove = policy_compression_remove,
.policy_recompression_proc = policy_recompression_proc,
.policy_refresh_cagg_add = policy_refresh_cagg_add,
.policy_refresh_cagg_proc = policy_refresh_cagg_proc,
.policy_refresh_cagg_remove = policy_refresh_cagg_remove,

View File

@ -378,3 +378,145 @@ SELECT decompress_chunk(:'CHUNK_NAME'::regclass);
SELECT recompress_chunk(:'CHUNK_NAME'::regclass);
psql:include/recompress_basic.sql:100: ERROR: call compress_chunk instead of recompress_chunk
\set ON_ERROR_STOP 1
-- test recompress policy
CREATE TABLE metrics(time timestamptz NOT NULL);
SELECT create_hypertable('metrics','time');
create_hypertable
-----------------------
(12,public,metrics,t)
(1 row)
ALTER TABLE metrics SET (timescaledb.compress);
-- create chunk with some data and compress
INSERT INTO metrics SELECT '2000-01-01' FROM generate_series(1,10);
-- create custom compression job without recompress boolean
SELECT add_job('_timescaledb_internal.policy_compression','1w','{"hypertable_id": 12, "compress_after": "@ 7 days"}') AS "JOB_COMPRESS" \gset
-- first call should compress
CALL run_job(:JOB_COMPRESS);
-- 2nd call should do nothing
CALL run_job(:JOB_COMPRESS);
psql:include/recompress_basic.sql:117: NOTICE: no chunks for hypertable public.metrics that satisfy compress chunk policy
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
1
(1 row)
-- do an INSERT so recompress has something to do
INSERT INTO metrics SELECT '2000-01-01';
---- status should be 3
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
3
(1 row)
-- should recompress
CALL run_job(:JOB_COMPRESS);
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
1
(1 row)
-- disable recompress in compress job
SELECT alter_job(id,config:=jsonb_set(config,'{recompress}','false')) FROM _timescaledb_config.bgw_job WHERE id = :JOB_COMPRESS;
alter_job
--------------------------------------------------------------------------------------------------------------------------------------
(1004,"@ 7 days","@ 0",-1,"@ 5 mins",t,"{""recompress"": false, ""hypertable_id"": 12, ""compress_after"": ""@ 7 days""}",-infinity)
(1 row)
-- nothing to do
CALL run_job(:JOB_COMPRESS);
psql:include/recompress_basic.sql:138: NOTICE: no chunks for hypertable public.metrics that satisfy compress chunk policy
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
1
(1 row)
-- do an INSERT so recompress has something to do
INSERT INTO metrics SELECT '2000-01-01';
---- status should be 3
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
3
(1 row)
-- still nothing to do since we disabled recompress
CALL run_job(:JOB_COMPRESS);
psql:include/recompress_basic.sql:150: NOTICE: no chunks for hypertable public.metrics that satisfy compress chunk policy
---- status should be 3
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
3
(1 row)
-- reenable recompress in compress job
SELECT alter_job(id,config:=jsonb_set(config,'{recompress}','true')) FROM _timescaledb_config.bgw_job WHERE id = :JOB_COMPRESS;
alter_job
-------------------------------------------------------------------------------------------------------------------------------------
(1004,"@ 7 days","@ 0",-1,"@ 5 mins",t,"{""recompress"": true, ""hypertable_id"": 12, ""compress_after"": ""@ 7 days""}",-infinity)
(1 row)
-- should recompress now
CALL run_job(:JOB_COMPRESS);
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
1
(1 row)
SELECT delete_job(:JOB_COMPRESS);
delete_job
------------
(1 row)
SELECT add_job('_timescaledb_internal.policy_recompression','1w','{"hypertable_id": 12, "recompress_after": "@ 7 days"}') AS "JOB_RECOMPRESS" \gset
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
1
(1 row)
---- nothing to do yet
CALL run_job(:JOB_RECOMPRESS);
psql:include/recompress_basic.sql:172: NOTICE: no chunks for hypertable "public.metrics" that satisfy recompress chunk policy
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
1
(1 row)
-- create some work for recompress
INSERT INTO metrics SELECT '2000-01-01';
-- status should be 3
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
3
(1 row)
CALL run_job(:JOB_RECOMPRESS);
-- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
--------------
1
(1 row)
SELECT delete_job(:JOB_RECOMPRESS);
delete_job
------------
(1 row)

View File

@ -99,3 +99,91 @@ SELECT recompress_chunk(:'CHUNK_NAME'::regclass, false);
SELECT decompress_chunk(:'CHUNK_NAME'::regclass);
SELECT recompress_chunk(:'CHUNK_NAME'::regclass);
\set ON_ERROR_STOP 1
-- test recompress policy
CREATE TABLE metrics(time timestamptz NOT NULL);
SELECT create_hypertable('metrics','time');
ALTER TABLE metrics SET (timescaledb.compress);
-- create chunk with some data and compress
INSERT INTO metrics SELECT '2000-01-01' FROM generate_series(1,10);
-- create custom compression job without recompress boolean
SELECT add_job('_timescaledb_internal.policy_compression','1w','{"hypertable_id": 12, "compress_after": "@ 7 days"}') AS "JOB_COMPRESS" \gset
-- first call should compress
CALL run_job(:JOB_COMPRESS);
-- 2nd call should do nothing
CALL run_job(:JOB_COMPRESS);
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
-- do an INSERT so recompress has something to do
INSERT INTO metrics SELECT '2000-01-01';
---- status should be 3
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
-- should recompress
CALL run_job(:JOB_COMPRESS);
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
-- disable recompress in compress job
SELECT alter_job(id,config:=jsonb_set(config,'{recompress}','false')) FROM _timescaledb_config.bgw_job WHERE id = :JOB_COMPRESS;
-- nothing to do
CALL run_job(:JOB_COMPRESS);
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
-- do an INSERT so recompress has something to do
INSERT INTO metrics SELECT '2000-01-01';
---- status should be 3
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
-- still nothing to do since we disabled recompress
CALL run_job(:JOB_COMPRESS);
---- status should be 3
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
-- reenable recompress in compress job
SELECT alter_job(id,config:=jsonb_set(config,'{recompress}','true')) FROM _timescaledb_config.bgw_job WHERE id = :JOB_COMPRESS;
-- should recompress now
CALL run_job(:JOB_COMPRESS);
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
SELECT delete_job(:JOB_COMPRESS);
SELECT add_job('_timescaledb_internal.policy_recompression','1w','{"hypertable_id": 12, "recompress_after": "@ 7 days"}') AS "JOB_RECOMPRESS" \gset
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
---- nothing to do yet
CALL run_job(:JOB_RECOMPRESS);
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
-- create some work for recompress
INSERT INTO metrics SELECT '2000-01-01';
-- status should be 3
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
CALL run_job(:JOB_RECOMPRESS);
-- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
SELECT delete_job(:JOB_RECOMPRESS);