mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-14 17:43:34 +08:00
Copy scheduled_jobs list before sorting it
The start_scheduled_jobs function mistakenly sorts the scheduled_jobs list in-place. As a result, when the ts_update_scheduled_jobs_list function compares the updated list of scheduled jobs with the existing scheduled jobs list, it is comparing a list that is sorted by job_id to one that is sorted by next_start time. Fix that by properly copying the scheduled_jobs list into a new list and use that for sorting. Fixes #5537
This commit is contained in:
parent
b10139ba48
commit
a383c8dd4f
@ -37,6 +37,7 @@ accidentally triggering the load of a previous DB version.**
|
|||||||
* #5544 Fix refresh from beginning of Continuous Aggregate with variable time bucket
|
* #5544 Fix refresh from beginning of Continuous Aggregate with variable time bucket
|
||||||
* #5556 Fix duplicated entries on timescaledb_experimental.policies view
|
* #5556 Fix duplicated entries on timescaledb_experimental.policies view
|
||||||
* #5433 Fix join rte in CAggs with joins
|
* #5433 Fix join rte in CAggs with joins
|
||||||
|
* #5543 Copy scheduled_jobs list before sorting it
|
||||||
|
|
||||||
**Thanks**
|
**Thanks**
|
||||||
* @nikolaps for reporting an issue with the COPY fetcher
|
* @nikolaps for reporting an issue with the COPY fetcher
|
||||||
|
@ -575,7 +575,7 @@ start_scheduled_jobs(register_background_worker_callback_type bgw_register)
|
|||||||
ordered_scheduled_jobs = list_qsort(scheduled_jobs, cmp_next_start);
|
ordered_scheduled_jobs = list_qsort(scheduled_jobs, cmp_next_start);
|
||||||
#else
|
#else
|
||||||
/* PG13 does in-place sort */
|
/* PG13 does in-place sort */
|
||||||
ordered_scheduled_jobs = scheduled_jobs;
|
ordered_scheduled_jobs = list_copy(scheduled_jobs);
|
||||||
list_sort(ordered_scheduled_jobs, cmp_next_start);
|
list_sort(ordered_scheduled_jobs, cmp_next_start);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
@ -588,9 +588,7 @@ start_scheduled_jobs(register_background_worker_callback_type bgw_register)
|
|||||||
scheduled_ts_bgw_job_start(sjob, bgw_register);
|
scheduled_ts_bgw_job_start(sjob, bgw_register);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if PG13_LT
|
|
||||||
list_free(ordered_scheduled_jobs);
|
list_free(ordered_scheduled_jobs);
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns the earliest time the scheduler should start a job that is waiting to be started */
|
/* Returns the earliest time the scheduler should start a job that is waiting to be started */
|
||||||
|
@ -971,3 +971,88 @@ SELECT count(*) = 0
|
|||||||
|
|
||||||
-- cleanup
|
-- cleanup
|
||||||
DROP TABLE sensor_data;
|
DROP TABLE sensor_data;
|
||||||
|
-- Github issue #5537
|
||||||
|
-- Proc that waits until the given job enters the expected state
|
||||||
|
CREATE OR REPLACE PROCEDURE wait_for_job_status(job_param_id INTEGER, expected_status TEXT, spins INTEGER=:TEST_SPINWAIT_ITERS)
|
||||||
|
LANGUAGE PLPGSQL AS $$
|
||||||
|
DECLARE
|
||||||
|
jobstatus TEXT;
|
||||||
|
BEGIN
|
||||||
|
FOR i in 1..spins
|
||||||
|
LOOP
|
||||||
|
SELECT job_status FROM timescaledb_information.job_stats WHERE job_id = job_param_id INTO jobstatus;
|
||||||
|
IF jobstatus = expected_status THEN
|
||||||
|
RETURN;
|
||||||
|
END IF;
|
||||||
|
PERFORM pg_sleep(0.1);
|
||||||
|
ROLLBACK;
|
||||||
|
END LOOP;
|
||||||
|
RAISE EXCEPTION 'wait_for_job_status(%): timeout after % tries', job_param_id, spins;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
-- Proc that sleeps for 1m - to keep the test jobs in running state
|
||||||
|
CREATE OR REPLACE PROCEDURE proc_that_sleeps(job_id INT, config JSONB)
|
||||||
|
LANGUAGE PLPGSQL AS
|
||||||
|
$$
|
||||||
|
BEGIN
|
||||||
|
PERFORM pg_sleep(60);
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
-- create new jobs and ensure that the second one gets scheduled
|
||||||
|
-- before the first one by adjusting the initial_start values
|
||||||
|
SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz + interval '2s') AS job_id_1 \gset
|
||||||
|
SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz - interval '2s') AS job_id_2 \gset
|
||||||
|
-- wait for the jobs to start running job_2 will start running first
|
||||||
|
CALL wait_for_job_status(:job_id_2, 'Running');
|
||||||
|
CALL wait_for_job_status(:job_id_1, 'Running');
|
||||||
|
-- add a new job and wait for it to start
|
||||||
|
SELECT add_job('proc_that_sleeps', '1h') AS job_id_3 \gset
|
||||||
|
CALL wait_for_job_status(:job_id_3, 'Running');
|
||||||
|
-- verify that none of the jobs crashed
|
||||||
|
SELECT job_id, job_status, next_start,
|
||||||
|
total_runs, total_successes, total_failures
|
||||||
|
FROM timescaledb_information.job_stats
|
||||||
|
WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3)
|
||||||
|
ORDER BY job_id;
|
||||||
|
job_id | job_status | next_start | total_runs | total_successes | total_failures
|
||||||
|
--------+------------+------------+------------+-----------------+----------------
|
||||||
|
1015 | Running | -infinity | 1 | 0 | 0
|
||||||
|
1016 | Running | -infinity | 1 | 0 | 0
|
||||||
|
1017 | Running | -infinity | 1 | 0 | 0
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
SELECT job_id, err_message
|
||||||
|
FROM timescaledb_information.job_errors
|
||||||
|
WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3);
|
||||||
|
job_id | err_message
|
||||||
|
--------+-------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- cleanup
|
||||||
|
SELECT _timescaledb_internal.stop_background_workers();
|
||||||
|
stop_background_workers
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL wait_for_job_status(:job_id_1, 'Scheduled');
|
||||||
|
CALL wait_for_job_status(:job_id_2, 'Scheduled');
|
||||||
|
CALL wait_for_job_status(:job_id_3, 'Scheduled');
|
||||||
|
SELECT delete_job(:job_id_1);
|
||||||
|
delete_job
|
||||||
|
------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT delete_job(:job_id_2);
|
||||||
|
delete_job
|
||||||
|
------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT delete_job(:job_id_3);
|
||||||
|
delete_job
|
||||||
|
------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
@ -626,3 +626,65 @@ SELECT count(*) = 0
|
|||||||
|
|
||||||
-- cleanup
|
-- cleanup
|
||||||
DROP TABLE sensor_data;
|
DROP TABLE sensor_data;
|
||||||
|
|
||||||
|
-- Github issue #5537
|
||||||
|
-- Proc that waits until the given job enters the expected state
|
||||||
|
CREATE OR REPLACE PROCEDURE wait_for_job_status(job_param_id INTEGER, expected_status TEXT, spins INTEGER=:TEST_SPINWAIT_ITERS)
|
||||||
|
LANGUAGE PLPGSQL AS $$
|
||||||
|
DECLARE
|
||||||
|
jobstatus TEXT;
|
||||||
|
BEGIN
|
||||||
|
FOR i in 1..spins
|
||||||
|
LOOP
|
||||||
|
SELECT job_status FROM timescaledb_information.job_stats WHERE job_id = job_param_id INTO jobstatus;
|
||||||
|
IF jobstatus = expected_status THEN
|
||||||
|
RETURN;
|
||||||
|
END IF;
|
||||||
|
PERFORM pg_sleep(0.1);
|
||||||
|
ROLLBACK;
|
||||||
|
END LOOP;
|
||||||
|
RAISE EXCEPTION 'wait_for_job_status(%): timeout after % tries', job_param_id, spins;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
-- Proc that sleeps for 1m - to keep the test jobs in running state
|
||||||
|
CREATE OR REPLACE PROCEDURE proc_that_sleeps(job_id INT, config JSONB)
|
||||||
|
LANGUAGE PLPGSQL AS
|
||||||
|
$$
|
||||||
|
BEGIN
|
||||||
|
PERFORM pg_sleep(60);
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|
||||||
|
-- create new jobs and ensure that the second one gets scheduled
|
||||||
|
-- before the first one by adjusting the initial_start values
|
||||||
|
SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz + interval '2s') AS job_id_1 \gset
|
||||||
|
SELECT add_job('proc_that_sleeps', '1h', initial_start => now()::timestamptz - interval '2s') AS job_id_2 \gset
|
||||||
|
|
||||||
|
-- wait for the jobs to start running job_2 will start running first
|
||||||
|
CALL wait_for_job_status(:job_id_2, 'Running');
|
||||||
|
CALL wait_for_job_status(:job_id_1, 'Running');
|
||||||
|
|
||||||
|
-- add a new job and wait for it to start
|
||||||
|
SELECT add_job('proc_that_sleeps', '1h') AS job_id_3 \gset
|
||||||
|
CALL wait_for_job_status(:job_id_3, 'Running');
|
||||||
|
|
||||||
|
-- verify that none of the jobs crashed
|
||||||
|
SELECT job_id, job_status, next_start,
|
||||||
|
total_runs, total_successes, total_failures
|
||||||
|
FROM timescaledb_information.job_stats
|
||||||
|
WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3)
|
||||||
|
ORDER BY job_id;
|
||||||
|
SELECT job_id, err_message
|
||||||
|
FROM timescaledb_information.job_errors
|
||||||
|
WHERE job_id IN (:job_id_1, :job_id_2, :job_id_3);
|
||||||
|
|
||||||
|
-- cleanup
|
||||||
|
SELECT _timescaledb_internal.stop_background_workers();
|
||||||
|
CALL wait_for_job_status(:job_id_1, 'Scheduled');
|
||||||
|
CALL wait_for_job_status(:job_id_2, 'Scheduled');
|
||||||
|
CALL wait_for_job_status(:job_id_3, 'Scheduled');
|
||||||
|
SELECT delete_job(:job_id_1);
|
||||||
|
SELECT delete_job(:job_id_2);
|
||||||
|
SELECT delete_job(:job_id_3);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user