From c0e193dd810d11e66d75a987b7b1291e6fe7a9f9 Mon Sep 17 00:00:00 2001 From: Mats Kindahl Date: Fri, 17 Jun 2022 13:10:46 +0200 Subject: [PATCH] Keep locks after reading job status When reading the job status table `bgw_job_stat` and after that updating it, locks where released after the read, allowing a competing session to update the job status and trigger a concurrent update error either in a session doing the update or in the scheduler. Since the scheduler does not recover after aborting with an error, this caused the background worker subsystem to stop and not start new jobs. This commit fixes this by upgrading `RowExclusiveLock` to `ShareRowExclusiveLock` to ensure that not two sessions tries to update the row at the same time, remove an initial speculative lock that are taken when a job status row can be added, and also keeps the lock until the end of the transaction to prevent other sessions to update. Since these updating transactions are short, it should not cause other threads to block long. Fixes #4293 --- src/bgw/job.c | 3 +- src/bgw/job_stat.c | 59 +++++++++++++++++------------------ src/bgw/scheduler.c | 3 ++ test/src/bgw/scheduler_mock.c | 4 +-- 4 files changed, 34 insertions(+), 35 deletions(-) diff --git a/src/bgw/job.c b/src/bgw/job.c index 19570c140..66943576c 100644 --- a/src/bgw/job.c +++ b/src/bgw/job.c @@ -591,7 +591,6 @@ ts_bgw_job_find(int32 bgw_job_id, MemoryContext mctx, bool fail_if_not_found) elog(ERROR, "job %d not found", bgw_job_id); return job; - ; } static void @@ -1210,6 +1209,6 @@ ts_bgw_job_insert_relation(Name application_name, Interval *schedule_interval, ts_catalog_insert_values(rel, desc, values, nulls); ts_catalog_restore_user(&sec_ctx); - table_close(rel, RowExclusiveLock); + table_close(rel, NoLock); return values[AttrNumberGetAttrOffset(Anum_bgw_job_id)]; } diff --git a/src/bgw/job_stat.c b/src/bgw/job_stat.c index 6e30b6144..206ddf794 100644 --- a/src/bgw/job_stat.c +++ b/src/bgw/job_stat.c @@ -49,6 +49,7 @@ bgw_job_stat_scan_one(int indexid, ScanKeyData scankey[], int nkeys, tuple_found .index = catalog_get_index(catalog, BGW_JOB_STAT, indexid), .nkeys = nkeys, .scankey = scankey, + .flags = SCANNER_F_KEEPLOCK, .tuple_found = tuple_found, .filter = tuple_filter, .data = data, @@ -104,7 +105,11 @@ bgw_job_stat_tuple_delete(TupleInfo *ti, void *const data) void ts_bgw_job_stat_delete(int32 bgw_job_id) { - bgw_job_stat_scan_job_id(bgw_job_id, bgw_job_stat_tuple_delete, NULL, NULL, RowExclusiveLock); + bgw_job_stat_scan_job_id(bgw_job_id, + bgw_job_stat_tuple_delete, + NULL, + NULL, + ShareRowExclusiveLock); } /* Mark the start of a job. This should be done in a separate transaction by the scheduler @@ -441,25 +446,21 @@ bgw_job_stat_insert_relation(Relation rel, int32 bgw_job_id, bool mark_start, void ts_bgw_job_stat_mark_start(int32 bgw_job_id) { - /* Use double-check locking */ + /* We grab a ShareRowExclusiveLock here because we need to ensure that no + * job races and adds a job when we insert the relation as well since that + * can trigger a failure when inserting a row for the job. We use the + * RowExclusiveLock in the scan since we cannot use NoLock (relation_open + * requires a lock that it not NoLock). */ + Relation rel = + table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock); if (!bgw_job_stat_scan_job_id(bgw_job_id, bgw_job_stat_tuple_mark_start, NULL, NULL, RowExclusiveLock)) - { - Relation rel = - table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock); - /* Recheck while having a self-exclusive lock */ - if (!bgw_job_stat_scan_job_id(bgw_job_id, - bgw_job_stat_tuple_mark_start, - NULL, - NULL, - RowExclusiveLock)) - bgw_job_stat_insert_relation(rel, bgw_job_id, true, DT_NOBEGIN); - table_close(rel, ShareRowExclusiveLock); - pgstat_report_activity(STATE_IDLE, NULL); - } + bgw_job_stat_insert_relation(rel, bgw_job_id, true, DT_NOBEGIN); + table_close(rel, NoLock); + pgstat_report_activity(STATE_IDLE, NULL); } void @@ -474,7 +475,7 @@ ts_bgw_job_stat_mark_end(BgwJob *job, JobResult result) bgw_job_stat_tuple_mark_end, NULL, &res, - RowExclusiveLock)) + ShareRowExclusiveLock)) elog(ERROR, "unable to find job statistics for job %d", job->fd.id); pgstat_report_activity(STATE_IDLE, NULL); } @@ -496,7 +497,7 @@ ts_bgw_job_stat_set_next_start(int32 job_id, TimestampTz next_start) bgw_job_stat_tuple_set_next_start, NULL, &next_start, - RowExclusiveLock)) + ShareRowExclusiveLock)) elog(ERROR, "unable to find job statistics for job %d", job_id); } @@ -513,7 +514,7 @@ ts_bgw_job_stat_update_next_start(int32 job_id, TimestampTz next_start, bool all bgw_job_stat_tuple_set_next_start, NULL, &next_start, - RowExclusiveLock); + ShareRowExclusiveLock); return found; } @@ -524,24 +525,20 @@ ts_bgw_job_stat_upsert_next_start(int32 bgw_job_id, TimestampTz next_start) if (next_start == DT_NOBEGIN) elog(ERROR, "cannot set next start to -infinity"); - /* Use double-check locking */ + /* We grab a ShareRowExclusiveLock here because we need to ensure that no + * job races and adds a job when we insert the relation as well since that + * can trigger a failure when inserting a row for the job. We use the + * RowExclusiveLock in the scan since we cannot use NoLock (relation_open + * requires a lock that it not NoLock). */ + Relation rel = + table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock); if (!bgw_job_stat_scan_job_id(bgw_job_id, bgw_job_stat_tuple_set_next_start, NULL, &next_start, RowExclusiveLock)) - { - Relation rel = - table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock); - /* Recheck while having a self-exclusive lock */ - if (!bgw_job_stat_scan_job_id(bgw_job_id, - bgw_job_stat_tuple_set_next_start, - NULL, - &next_start, - RowExclusiveLock)) - bgw_job_stat_insert_relation(rel, bgw_job_id, false, next_start); - table_close(rel, ShareRowExclusiveLock); - } + bgw_job_stat_insert_relation(rel, bgw_job_id, false, next_start); + table_close(rel, NoLock); } bool diff --git a/src/bgw/scheduler.c b/src/bgw/scheduler.c index 79679500a..2dbca0568 100644 --- a/src/bgw/scheduler.c +++ b/src/bgw/scheduler.c @@ -134,7 +134,10 @@ ts_bgw_start_worker(const char *name, const BgwParams *bgw_params) /* handle needs to be allocated in long-lived memory context */ MemoryContextSwitchTo(scheduler_mctx); if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + elog(NOTICE, "unable to register background worker"); handle = NULL; + } MemoryContextSwitchTo(scratch_mctx); return handle; diff --git a/test/src/bgw/scheduler_mock.c b/test/src/bgw/scheduler_mock.c index 011be0c6e..f1b0107a5 100644 --- a/test/src/bgw/scheduler_mock.c +++ b/test/src/bgw/scheduler_mock.c @@ -69,8 +69,8 @@ ts_bgw_db_scheduler_test_main(PG_FUNCTION_ARGS) memcpy(&bgw_params, MyBgworkerEntry->bgw_extra, sizeof(bgw_params)); - elog(WARNING, "scheduler user id %u", bgw_params.user_oid); - elog(WARNING, "running a test in the background: db=%u ttl=%d", db_oid, bgw_params.ttl); + elog(NOTICE, "scheduler user id %u", bgw_params.user_oid); + elog(NOTICE, "running a test in the background: db=%u ttl=%d", db_oid, bgw_params.ttl); BackgroundWorkerInitializeConnectionByOid(db_oid, bgw_params.user_oid, 0);