mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 02:23:49 +08:00
Fix reorder policy job to skip compressed chunks
Reorder policy does not skip compressed chunks when selecting next chunk to apply reordering. This causes an error in job execution since it's not possible to reorder compressed chunks. With this fix the job excludes compressed chunks from selection. Fixes #1810
This commit is contained in:
parent
853e37efbf
commit
b368563e3f
@ -732,6 +732,8 @@ typedef struct ChunkStatInfo
|
||||
int32 job_id;
|
||||
} ChunkStatInfo;
|
||||
|
||||
/* Check that a a) job has not already been executed for the chunk and b) chunk is not compressed
|
||||
* (a compressed chunk should not be reordered).*/
|
||||
static ScanTupleResult
|
||||
dimension_slice_check_chunk_stats_tuple_found(TupleInfo *ti, void *data)
|
||||
{
|
||||
@ -744,13 +746,16 @@ dimension_slice_check_chunk_stats_tuple_found(TupleInfo *ti, void *data)
|
||||
|
||||
foreach (lc, chunk_ids)
|
||||
{
|
||||
BgwPolicyChunkStats *chunk_stat =
|
||||
ts_bgw_policy_chunk_stats_find(info->job_id, lfirst_int(lc));
|
||||
/* Look for a chunk that a) doesn't have a job stat (reorder ) and b) is not compressed
|
||||
* (should not reorder a compressed chunk) */
|
||||
int chunk_id = lfirst_int(lc);
|
||||
BgwPolicyChunkStats *chunk_stat = ts_bgw_policy_chunk_stats_find(info->job_id, chunk_id);
|
||||
|
||||
if (chunk_stat == NULL || chunk_stat->fd.num_times_job_run == 0)
|
||||
if ((chunk_stat == NULL || chunk_stat->fd.num_times_job_run == 0) &&
|
||||
ts_chunk_can_be_compressed(chunk_id))
|
||||
{
|
||||
/* Save the chunk_id */
|
||||
info->chunk_id = lfirst_int(lc);
|
||||
info->chunk_id = chunk_id;
|
||||
return SCAN_DONE;
|
||||
}
|
||||
}
|
||||
@ -759,10 +764,9 @@ dimension_slice_check_chunk_stats_tuple_found(TupleInfo *ti, void *data)
|
||||
}
|
||||
|
||||
int
|
||||
ts_dimension_slice_oldest_chunk_without_executed_job(int32 job_id, int32 dimension_id,
|
||||
StrategyNumber start_strategy,
|
||||
int64 start_value, StrategyNumber end_strategy,
|
||||
int64 end_value)
|
||||
ts_dimension_slice_oldest_valid_chunk_for_reorder(int32 job_id, int32 dimension_id,
|
||||
StrategyNumber start_strategy, int64 start_value,
|
||||
StrategyNumber end_strategy, int64 end_value)
|
||||
{
|
||||
ChunkStatInfo info = {
|
||||
.job_id = job_id,
|
||||
|
@ -71,9 +71,10 @@ extern int ts_dimension_slice_cmp(const DimensionSlice *left, const DimensionSli
|
||||
extern int ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64 coord);
|
||||
|
||||
extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_nth_latest_slice(int32 dimension_id, int n);
|
||||
extern TSDLLEXPORT int ts_dimension_slice_oldest_chunk_without_executed_job(
|
||||
int32 job_id, int32 dimension_id, StrategyNumber start_strategy, int64 start_value,
|
||||
StrategyNumber end_strategy, int64 end_value);
|
||||
extern TSDLLEXPORT int
|
||||
ts_dimension_slice_oldest_valid_chunk_for_reorder(int32 job_id, int32 dimension_id,
|
||||
StrategyNumber start_strategy, int64 start_value,
|
||||
StrategyNumber end_strategy, int64 end_value);
|
||||
extern TSDLLEXPORT int32 ts_dimension_slice_get_chunkid_to_compress(int32 dimension_id,
|
||||
StrategyNumber start_strategy,
|
||||
int64 start_value,
|
||||
|
@ -55,8 +55,9 @@ enable_fast_restart(BgwJob *job, const char *job_name)
|
||||
/*
|
||||
* Returns the ID of a chunk to reorder. Eligible chunks must be at least the
|
||||
* 3rd newest chunk in the hypertable (not entirely exact because we use the number
|
||||
* of dimension slices as a proxy for the number of chunks) and hasn't been
|
||||
* reordered recently. For this version of automatic reordering, "not reordered
|
||||
* of dimension slices as a proxy for the number of chunks),
|
||||
* not compressed, not dropped and hasn't been reordered recently.
|
||||
* For this version of automatic reordering, "not reordered
|
||||
* recently" means the chunk has not been reordered at all. This information
|
||||
* is available in the bgw_policy_chunk_stats metadata table.
|
||||
*/
|
||||
@ -73,12 +74,12 @@ get_chunk_id_to_reorder(int32 job_id, Hypertable *ht)
|
||||
|
||||
Assert(time_dimension != NULL);
|
||||
|
||||
return ts_dimension_slice_oldest_chunk_without_executed_job(job_id,
|
||||
time_dimension->fd.id,
|
||||
BTLessEqualStrategyNumber,
|
||||
nth_dimension->fd.range_start,
|
||||
InvalidStrategy,
|
||||
-1);
|
||||
return ts_dimension_slice_oldest_valid_chunk_for_reorder(job_id,
|
||||
time_dimension->fd.id,
|
||||
BTLessEqualStrategyNumber,
|
||||
nth_dimension->fd.range_start,
|
||||
InvalidStrategy,
|
||||
-1);
|
||||
}
|
||||
|
||||
static int32
|
||||
|
@ -173,3 +173,113 @@ WHERE uncomp_hyper.table_name like 'test_drop_chunks_table';
|
||||
3
|
||||
(1 row)
|
||||
|
||||
------------------------------
|
||||
-- Test reorder policy runs on compressed tables. Reorder policy job must skip compressed chunks
|
||||
-- (see issue https://github.com/timescale/timescaledb/issues/1810).
|
||||
-- More tests for reorder policy can be found at bgw_reorder_drop_chunks.sql
|
||||
------------------------------
|
||||
CREATE TABLE test_reorder_chunks_table(time int not null, chunk_id int);
|
||||
CREATE INDEX test_reorder_chunks_table_time_idx ON test_reorder_chunks_table(time);
|
||||
SELECT create_hypertable('test_reorder_chunks_table', 'time', chunk_time_interval => 1);
|
||||
create_hypertable
|
||||
----------------------------------------
|
||||
(3,public,test_reorder_chunks_table,t)
|
||||
(1 row)
|
||||
|
||||
-- These inserts should create 6 different chunks
|
||||
INSERT INTO test_reorder_chunks_table VALUES (1, 1);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (2, 2);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (3, 3);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (4, 4);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (5, 5);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (6, 6);
|
||||
-- Enable compression
|
||||
ALTER TABLE test_reorder_chunks_table set (timescaledb.compress, timescaledb.compress_orderby = 'time DESC');
|
||||
-- Compress 2 chunks:
|
||||
SELECT compress_chunk(show_chunks('test_reorder_chunks_table', newer_than => 2, older_than => 4));
|
||||
compress_chunk
|
||||
-----------------------------------------
|
||||
_timescaledb_internal._hyper_3_12_chunk
|
||||
_timescaledb_internal._hyper_3_13_chunk
|
||||
(2 rows)
|
||||
|
||||
-- make sure we have total of 6 chunks:
|
||||
SELECT count(*) as count_chunks_uncompressed
|
||||
FROM _timescaledb_catalog.chunk chunk
|
||||
INNER JOIN _timescaledb_catalog.hypertable hypertable ON (chunk.hypertable_id = hypertable.id)
|
||||
WHERE hypertable.table_name like 'test_reorder_chunks_table';
|
||||
count_chunks_uncompressed
|
||||
---------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
-- and 2 compressed ones:
|
||||
SELECT count(*) as count_chunks_compressed
|
||||
FROM _timescaledb_catalog.chunk chunk
|
||||
INNER JOIN _timescaledb_catalog.hypertable comp_hyper ON (chunk.hypertable_id = comp_hyper.id)
|
||||
INNER JOIN _timescaledb_catalog.hypertable uncomp_hyper ON (comp_hyper.id = uncomp_hyper.compressed_hypertable_id)
|
||||
WHERE uncomp_hyper.table_name like 'test_reorder_chunks_table';
|
||||
count_chunks_compressed
|
||||
-------------------------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
-- enable reorder policy
|
||||
SELECT add_reorder_policy('test_reorder_chunks_table', 'test_reorder_chunks_table_time_idx') AS reorder_job_id \gset
|
||||
-- nothing is clustered yet
|
||||
SELECT indexrelid::regclass, indisclustered
|
||||
FROM pg_index
|
||||
WHERE indisclustered = true ORDER BY 1;
|
||||
indexrelid | indisclustered
|
||||
------------+----------------
|
||||
(0 rows)
|
||||
|
||||
-- run first time
|
||||
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
|
||||
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
|
||||
------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
||||
FROM _timescaledb_internal.bgw_job_stat
|
||||
where job_id=:reorder_job_id;
|
||||
job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes
|
||||
--------+------------------+------------+-----------------+----------------+---------------
|
||||
1001 | t | 1 | 1 | 0 | 0
|
||||
(1 row)
|
||||
|
||||
-- first chunk reordered
|
||||
SELECT indexrelid::regclass, indisclustered
|
||||
FROM pg_index
|
||||
WHERE indisclustered = true ORDER BY 1;
|
||||
indexrelid | indisclustered
|
||||
----------------------------------------------------------------------------+----------------
|
||||
_timescaledb_internal._hyper_3_11_chunk_test_reorder_chunks_table_time_idx | t
|
||||
(1 row)
|
||||
|
||||
-- second call to scheduler
|
||||
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
|
||||
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish
|
||||
------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
||||
FROM _timescaledb_internal.bgw_job_stat
|
||||
where job_id=:reorder_job_id;
|
||||
job_id | last_run_success | total_runs | total_successes | total_failures | total_crashes
|
||||
--------+------------------+------------+-----------------+----------------+---------------
|
||||
1001 | t | 2 | 2 | 0 | 0
|
||||
(1 row)
|
||||
|
||||
-- two chunks clustered, skips the compressed chunks
|
||||
SELECT indexrelid::regclass, indisclustered
|
||||
FROM pg_index
|
||||
WHERE indisclustered = true ORDER BY 1;
|
||||
indexrelid | indisclustered
|
||||
----------------------------------------------------------------------------+----------------
|
||||
_timescaledb_internal._hyper_3_11_chunk_test_reorder_chunks_table_time_idx | t
|
||||
_timescaledb_internal._hyper_3_14_chunk_test_reorder_chunks_table_time_idx | t
|
||||
(2 rows)
|
||||
|
@ -58,7 +58,7 @@ if (${PG_VERSION_MAJOR} GREATER "9")
|
||||
compression_hypertable.sql
|
||||
compression_segment_meta.sql
|
||||
compression_bgw.sql
|
||||
compress_bgw_drop_chunks.sql
|
||||
compress_bgw_reorder_drop_chunks.sql
|
||||
transparent_decompression_queries.sql
|
||||
)
|
||||
|
||||
|
@ -96,3 +96,75 @@ FROM _timescaledb_catalog.chunk chunk
|
||||
INNER JOIN _timescaledb_catalog.hypertable comp_hyper ON (chunk.hypertable_id = comp_hyper.id)
|
||||
INNER JOIN _timescaledb_catalog.hypertable uncomp_hyper ON (comp_hyper.id = uncomp_hyper.compressed_hypertable_id)
|
||||
WHERE uncomp_hyper.table_name like 'test_drop_chunks_table';
|
||||
|
||||
------------------------------
|
||||
-- Test reorder policy runs on compressed tables. Reorder policy job must skip compressed chunks
|
||||
-- (see issue https://github.com/timescale/timescaledb/issues/1810).
|
||||
-- More tests for reorder policy can be found at bgw_reorder_drop_chunks.sql
|
||||
------------------------------
|
||||
|
||||
CREATE TABLE test_reorder_chunks_table(time int not null, chunk_id int);
|
||||
CREATE INDEX test_reorder_chunks_table_time_idx ON test_reorder_chunks_table(time);
|
||||
SELECT create_hypertable('test_reorder_chunks_table', 'time', chunk_time_interval => 1);
|
||||
|
||||
-- These inserts should create 6 different chunks
|
||||
INSERT INTO test_reorder_chunks_table VALUES (1, 1);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (2, 2);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (3, 3);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (4, 4);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (5, 5);
|
||||
INSERT INTO test_reorder_chunks_table VALUES (6, 6);
|
||||
|
||||
-- Enable compression
|
||||
ALTER TABLE test_reorder_chunks_table set (timescaledb.compress, timescaledb.compress_orderby = 'time DESC');
|
||||
|
||||
-- Compress 2 chunks:
|
||||
SELECT compress_chunk(show_chunks('test_reorder_chunks_table', newer_than => 2, older_than => 4));
|
||||
|
||||
-- make sure we have total of 6 chunks:
|
||||
SELECT count(*) as count_chunks_uncompressed
|
||||
FROM _timescaledb_catalog.chunk chunk
|
||||
INNER JOIN _timescaledb_catalog.hypertable hypertable ON (chunk.hypertable_id = hypertable.id)
|
||||
WHERE hypertable.table_name like 'test_reorder_chunks_table';
|
||||
|
||||
-- and 2 compressed ones:
|
||||
SELECT count(*) as count_chunks_compressed
|
||||
FROM _timescaledb_catalog.chunk chunk
|
||||
INNER JOIN _timescaledb_catalog.hypertable comp_hyper ON (chunk.hypertable_id = comp_hyper.id)
|
||||
INNER JOIN _timescaledb_catalog.hypertable uncomp_hyper ON (comp_hyper.id = uncomp_hyper.compressed_hypertable_id)
|
||||
WHERE uncomp_hyper.table_name like 'test_reorder_chunks_table';
|
||||
|
||||
-- enable reorder policy
|
||||
SELECT add_reorder_policy('test_reorder_chunks_table', 'test_reorder_chunks_table_time_idx') AS reorder_job_id \gset
|
||||
|
||||
-- nothing is clustered yet
|
||||
SELECT indexrelid::regclass, indisclustered
|
||||
FROM pg_index
|
||||
WHERE indisclustered = true ORDER BY 1;
|
||||
|
||||
-- run first time
|
||||
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
|
||||
|
||||
SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
||||
FROM _timescaledb_internal.bgw_job_stat
|
||||
where job_id=:reorder_job_id;
|
||||
|
||||
-- first chunk reordered
|
||||
SELECT indexrelid::regclass, indisclustered
|
||||
FROM pg_index
|
||||
WHERE indisclustered = true ORDER BY 1;
|
||||
|
||||
-- second call to scheduler
|
||||
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
|
||||
|
||||
SELECT job_id, last_run_success, total_runs, total_successes, total_failures, total_crashes
|
||||
FROM _timescaledb_internal.bgw_job_stat
|
||||
where job_id=:reorder_job_id;
|
||||
|
||||
-- two chunks clustered, skips the compressed chunks
|
||||
SELECT indexrelid::regclass, indisclustered
|
||||
FROM pg_index
|
||||
WHERE indisclustered = true ORDER BY 1;
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user