Fix alter_job not updating new job fields

Fix bgw_job_tuple_update_by_id to also update scheduled and config
field of bgw_job.
This commit is contained in:
Sven Klemm 2020-09-16 03:16:35 +02:00 committed by Sven Klemm
parent 17cc6f6bd7
commit 6d7d99a588
3 changed files with 104 additions and 25 deletions

View File

@ -901,31 +901,35 @@ ts_bgw_job_insert_relation(Name application_name, Name job_type, Interval *sched
return values[AttrNumberGetAttrOffset(Anum_bgw_job_id)]; return values[AttrNumberGetAttrOffset(Anum_bgw_job_id)];
} }
/* This function only updates the fields modifiable with alter_job. */
static ScanTupleResult static ScanTupleResult
bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data) bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
{ {
BgwJob *updated_job = (BgwJob *) data; BgwJob *updated_job = (BgwJob *) data;
bool should_free; bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
HeapTuple new_tuple = heap_copytuple(tuple); HeapTuple new_tuple;
FormData_bgw_job *fd = (FormData_bgw_job *) GETSTRUCT(new_tuple);
TimestampTz next_start;
if (should_free) Datum values[Natts_bgw_job] = { 0 };
heap_freetuple(tuple); bool isnull[Natts_bgw_job] = { 0 };
bool repl[Natts_bgw_job] = { 0 };
Datum old_schedule_interval =
slot_getattr(ti->slot, Anum_bgw_job_schedule_interval, &isnull[0]);
Assert(!isnull[0]);
ts_bgw_job_permission_check(updated_job); ts_bgw_job_permission_check(updated_job);
/* when we update the schedule interval, modify the next start time as well*/ /* when we update the schedule interval, modify the next start time as well*/
if (!DatumGetBool(DirectFunctionCall2(interval_eq, if (!DatumGetBool(DirectFunctionCall2(interval_eq,
IntervalPGetDatum(&fd->schedule_interval), old_schedule_interval,
IntervalPGetDatum(&updated_job->fd.schedule_interval)))) IntervalPGetDatum(&updated_job->fd.schedule_interval))))
{ {
BgwJobStat *stat = ts_bgw_job_stat_find(fd->id); BgwJobStat *stat = ts_bgw_job_stat_find(updated_job->fd.id);
if (stat != NULL) if (stat != NULL)
{ {
next_start = DatumGetTimestampTz( TimestampTz next_start = DatumGetTimestampTz(
DirectFunctionCall2(timestamptz_pl_interval, DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(stat->fd.last_finish), TimestampTzGetDatum(stat->fd.last_finish),
IntervalPGetDatum(&updated_job->fd.schedule_interval))); IntervalPGetDatum(&updated_job->fd.schedule_interval)));
@ -934,21 +938,54 @@ bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
* This means the value is counted as unset which is what we want */ * This means the value is counted as unset which is what we want */
ts_bgw_job_stat_update_next_start(updated_job->fd.id, next_start, true); ts_bgw_job_stat_update_next_start(updated_job->fd.id, next_start, true);
} }
fd->schedule_interval = updated_job->fd.schedule_interval; values[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] =
IntervalPGetDatum(&updated_job->fd.schedule_interval);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] = true;
} }
fd->max_runtime = updated_job->fd.max_runtime;
fd->max_retries = updated_job->fd.max_retries; values[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] =
fd->retry_period = updated_job->fd.retry_period; IntervalPGetDatum(&updated_job->fd.max_runtime);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] = true;
values[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] =
Int32GetDatum(updated_job->fd.max_retries);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] = true;
values[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] =
IntervalPGetDatum(&updated_job->fd.retry_period);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] = true;
values[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] =
BoolGetDatum(updated_job->fd.scheduled);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] = true;
repl[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
if (updated_job->fd.config)
values[AttrNumberGetAttrOffset(Anum_bgw_job_config)] =
JsonbPGetDatum(updated_job->fd.config);
else
isnull[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, repl);
ts_catalog_update(ti->scanrel, new_tuple); ts_catalog_update(ti->scanrel, new_tuple);
heap_freetuple(new_tuple); heap_freetuple(new_tuple);
if (should_free)
heap_freetuple(tuple);
return SCAN_DONE; return SCAN_DONE;
} }
static bool /*
bgw_job_update_scan(ScanKeyData *scankey, void *data) * Overwrite job with specified job_id with the given fields
*
* This function only updates the fields modifiable with alter_job.
*/
void
ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
{ {
ScanKeyData scankey[1];
Catalog *catalog = ts_catalog_get(); Catalog *catalog = ts_catalog_get();
ScanTupLock scantuplock = { ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock, .waitpolicy = LockWaitBlock,
@ -958,7 +995,7 @@ bgw_job_update_scan(ScanKeyData *scankey, void *data)
.index = catalog_get_index(catalog, BGW_JOB, BGW_JOB_PKEY_IDX), .index = catalog_get_index(catalog, BGW_JOB, BGW_JOB_PKEY_IDX),
.nkeys = 1, .nkeys = 1,
.scankey = scankey, .scankey = scankey,
.data = data, .data = job,
.limit = 1, .limit = 1,
.tuple_found = bgw_job_tuple_update_by_id, .tuple_found = bgw_job_tuple_update_by_id,
.lockmode = RowExclusiveLock, .lockmode = RowExclusiveLock,
@ -966,20 +1003,11 @@ bgw_job_update_scan(ScanKeyData *scankey, void *data)
.result_mctx = CurrentMemoryContext, .result_mctx = CurrentMemoryContext,
.tuplock = &scantuplock }; .tuplock = &scantuplock };
return ts_scanner_scan(&scanctx);
}
/* Overwrite job with specified job_id with the given fields */
void
ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
{
ScanKeyData scankey[1];
ScanKeyInit(&scankey[0], ScanKeyInit(&scankey[0],
Anum_bgw_job_pkey_idx_id, Anum_bgw_job_pkey_idx_id,
BTEqualStrategyNumber, BTEqualStrategyNumber,
F_INT4EQ, F_INT4EQ,
Int32GetDatum(job_id)); Int32GetDatum(job_id));
bgw_job_update_scan(scankey, (void *) job); ts_scanner_scan(&scanctx);
} }

View File

@ -116,3 +116,46 @@ SELECT job_id FROM alter_job(1,scheduled:=false);
1 1
(1 row) (1 row)
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+--------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | |
(1 row)
-- test updating job settings
SELECT job_id FROM alter_job(1,config:='{"test":"test"}');
job_id
--------
1
(1 row)
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | | {"test": "test"}
(1 row)
SELECT job_id FROM alter_job(1,scheduled:=true);
job_id
--------
1
(1 row)
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | t | | {"test": "test"}
(1 row)
SELECT job_id FROM alter_job(1,scheduled:=false);
job_id
--------
1
(1 row)
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config
----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------
1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | | {"test": "test"}
(1 row)

View File

@ -59,4 +59,12 @@ SELECT * FROM _timescaledb_config.bgw_job WHERE id >= 1000;
\c :TEST_DBNAME :ROLE_SUPERUSER \c :TEST_DBNAME :ROLE_SUPERUSER
-- test altering job with NULL config -- test altering job with NULL config
SELECT job_id FROM alter_job(1,scheduled:=false); SELECT job_id FROM alter_job(1,scheduled:=false);
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
-- test updating job settings
SELECT job_id FROM alter_job(1,config:='{"test":"test"}');
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
SELECT job_id FROM alter_job(1,scheduled:=true);
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;
SELECT job_id FROM alter_job(1,scheduled:=false);
SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;