Keep locks after reading job status

When reading the job status table `bgw_job_stat` and after that
updating it, locks where released after the read, allowing a competing
session to update the job status and trigger a concurrent update error
either in a session doing the update or in the scheduler. Since the
scheduler does not recover after aborting with an error, this caused
the background worker subsystem to stop and not start new jobs.

This commit fixes this by upgrading `RowExclusiveLock` to
`ShareRowExclusiveLock` to ensure that not two sessions tries to update
the row at the same time, remove an initial speculative lock
that are taken when a job status row can be added, and also keeps the
lock until the end of the transaction to prevent other sessions to
update. Since these updating transactions are short, it should not
cause other threads to block long.

Fixes #4293
This commit is contained in:
Mats Kindahl 2022-06-17 13:10:46 +02:00 committed by Mats Kindahl
parent f6dd55a191
commit c0e193dd81
4 changed files with 34 additions and 35 deletions

View File

@ -591,7 +591,6 @@ ts_bgw_job_find(int32 bgw_job_id, MemoryContext mctx, bool fail_if_not_found)
elog(ERROR, "job %d not found", bgw_job_id);
return job;
;
}
static void
@ -1210,6 +1209,6 @@ ts_bgw_job_insert_relation(Name application_name, Interval *schedule_interval,
ts_catalog_insert_values(rel, desc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
table_close(rel, RowExclusiveLock);
table_close(rel, NoLock);
return values[AttrNumberGetAttrOffset(Anum_bgw_job_id)];
}

View File

@ -49,6 +49,7 @@ bgw_job_stat_scan_one(int indexid, ScanKeyData scankey[], int nkeys, tuple_found
.index = catalog_get_index(catalog, BGW_JOB_STAT, indexid),
.nkeys = nkeys,
.scankey = scankey,
.flags = SCANNER_F_KEEPLOCK,
.tuple_found = tuple_found,
.filter = tuple_filter,
.data = data,
@ -104,7 +105,11 @@ bgw_job_stat_tuple_delete(TupleInfo *ti, void *const data)
void
ts_bgw_job_stat_delete(int32 bgw_job_id)
{
bgw_job_stat_scan_job_id(bgw_job_id, bgw_job_stat_tuple_delete, NULL, NULL, RowExclusiveLock);
bgw_job_stat_scan_job_id(bgw_job_id,
bgw_job_stat_tuple_delete,
NULL,
NULL,
ShareRowExclusiveLock);
}
/* Mark the start of a job. This should be done in a separate transaction by the scheduler
@ -441,25 +446,21 @@ bgw_job_stat_insert_relation(Relation rel, int32 bgw_job_id, bool mark_start,
void
ts_bgw_job_stat_mark_start(int32 bgw_job_id)
{
/* Use double-check locking */
/* We grab a ShareRowExclusiveLock here because we need to ensure that no
* job races and adds a job when we insert the relation as well since that
* can trigger a failure when inserting a row for the job. We use the
* RowExclusiveLock in the scan since we cannot use NoLock (relation_open
* requires a lock that it not NoLock). */
Relation rel =
table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock);
if (!bgw_job_stat_scan_job_id(bgw_job_id,
bgw_job_stat_tuple_mark_start,
NULL,
NULL,
RowExclusiveLock))
{
Relation rel =
table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock);
/* Recheck while having a self-exclusive lock */
if (!bgw_job_stat_scan_job_id(bgw_job_id,
bgw_job_stat_tuple_mark_start,
NULL,
NULL,
RowExclusiveLock))
bgw_job_stat_insert_relation(rel, bgw_job_id, true, DT_NOBEGIN);
table_close(rel, ShareRowExclusiveLock);
pgstat_report_activity(STATE_IDLE, NULL);
}
bgw_job_stat_insert_relation(rel, bgw_job_id, true, DT_NOBEGIN);
table_close(rel, NoLock);
pgstat_report_activity(STATE_IDLE, NULL);
}
void
@ -474,7 +475,7 @@ ts_bgw_job_stat_mark_end(BgwJob *job, JobResult result)
bgw_job_stat_tuple_mark_end,
NULL,
&res,
RowExclusiveLock))
ShareRowExclusiveLock))
elog(ERROR, "unable to find job statistics for job %d", job->fd.id);
pgstat_report_activity(STATE_IDLE, NULL);
}
@ -496,7 +497,7 @@ ts_bgw_job_stat_set_next_start(int32 job_id, TimestampTz next_start)
bgw_job_stat_tuple_set_next_start,
NULL,
&next_start,
RowExclusiveLock))
ShareRowExclusiveLock))
elog(ERROR, "unable to find job statistics for job %d", job_id);
}
@ -513,7 +514,7 @@ ts_bgw_job_stat_update_next_start(int32 job_id, TimestampTz next_start, bool all
bgw_job_stat_tuple_set_next_start,
NULL,
&next_start,
RowExclusiveLock);
ShareRowExclusiveLock);
return found;
}
@ -524,24 +525,20 @@ ts_bgw_job_stat_upsert_next_start(int32 bgw_job_id, TimestampTz next_start)
if (next_start == DT_NOBEGIN)
elog(ERROR, "cannot set next start to -infinity");
/* Use double-check locking */
/* We grab a ShareRowExclusiveLock here because we need to ensure that no
* job races and adds a job when we insert the relation as well since that
* can trigger a failure when inserting a row for the job. We use the
* RowExclusiveLock in the scan since we cannot use NoLock (relation_open
* requires a lock that it not NoLock). */
Relation rel =
table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock);
if (!bgw_job_stat_scan_job_id(bgw_job_id,
bgw_job_stat_tuple_set_next_start,
NULL,
&next_start,
RowExclusiveLock))
{
Relation rel =
table_open(catalog_get_table_id(ts_catalog_get(), BGW_JOB_STAT), ShareRowExclusiveLock);
/* Recheck while having a self-exclusive lock */
if (!bgw_job_stat_scan_job_id(bgw_job_id,
bgw_job_stat_tuple_set_next_start,
NULL,
&next_start,
RowExclusiveLock))
bgw_job_stat_insert_relation(rel, bgw_job_id, false, next_start);
table_close(rel, ShareRowExclusiveLock);
}
bgw_job_stat_insert_relation(rel, bgw_job_id, false, next_start);
table_close(rel, NoLock);
}
bool

View File

@ -134,7 +134,10 @@ ts_bgw_start_worker(const char *name, const BgwParams *bgw_params)
/* handle needs to be allocated in long-lived memory context */
MemoryContextSwitchTo(scheduler_mctx);
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
elog(NOTICE, "unable to register background worker");
handle = NULL;
}
MemoryContextSwitchTo(scratch_mctx);
return handle;

View File

@ -69,8 +69,8 @@ ts_bgw_db_scheduler_test_main(PG_FUNCTION_ARGS)
memcpy(&bgw_params, MyBgworkerEntry->bgw_extra, sizeof(bgw_params));
elog(WARNING, "scheduler user id %u", bgw_params.user_oid);
elog(WARNING, "running a test in the background: db=%u ttl=%d", db_oid, bgw_params.ttl);
elog(NOTICE, "scheduler user id %u", bgw_params.user_oid);
elog(NOTICE, "running a test in the background: db=%u ttl=%d", db_oid, bgw_params.ttl);
BackgroundWorkerInitializeConnectionByOid(db_oid, bgw_params.user_oid, 0);