diff --git a/src/bgw/job.c b/src/bgw/job.c index ec9799ae0..c2a5ed58a 100644 --- a/src/bgw/job.c +++ b/src/bgw/job.c @@ -901,31 +901,35 @@ ts_bgw_job_insert_relation(Name application_name, Name job_type, Interval *sched return values[AttrNumberGetAttrOffset(Anum_bgw_job_id)]; } +/* This function only updates the fields modifiable with alter_job. */ static ScanTupleResult bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data) { BgwJob *updated_job = (BgwJob *) data; bool should_free; HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); - HeapTuple new_tuple = heap_copytuple(tuple); - FormData_bgw_job *fd = (FormData_bgw_job *) GETSTRUCT(new_tuple); - TimestampTz next_start; + HeapTuple new_tuple; - if (should_free) - heap_freetuple(tuple); + Datum values[Natts_bgw_job] = { 0 }; + bool isnull[Natts_bgw_job] = { 0 }; + bool repl[Natts_bgw_job] = { 0 }; + + Datum old_schedule_interval = + slot_getattr(ti->slot, Anum_bgw_job_schedule_interval, &isnull[0]); + Assert(!isnull[0]); ts_bgw_job_permission_check(updated_job); /* when we update the schedule interval, modify the next start time as well*/ if (!DatumGetBool(DirectFunctionCall2(interval_eq, - IntervalPGetDatum(&fd->schedule_interval), + old_schedule_interval, IntervalPGetDatum(&updated_job->fd.schedule_interval)))) { - BgwJobStat *stat = ts_bgw_job_stat_find(fd->id); + BgwJobStat *stat = ts_bgw_job_stat_find(updated_job->fd.id); if (stat != NULL) { - next_start = DatumGetTimestampTz( + TimestampTz next_start = DatumGetTimestampTz( DirectFunctionCall2(timestamptz_pl_interval, TimestampTzGetDatum(stat->fd.last_finish), IntervalPGetDatum(&updated_job->fd.schedule_interval))); @@ -934,21 +938,54 @@ bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data) * This means the value is counted as unset which is what we want */ ts_bgw_job_stat_update_next_start(updated_job->fd.id, next_start, true); } - fd->schedule_interval = updated_job->fd.schedule_interval; + values[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] = + IntervalPGetDatum(&updated_job->fd.schedule_interval); + repl[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] = true; } - fd->max_runtime = updated_job->fd.max_runtime; - fd->max_retries = updated_job->fd.max_retries; - fd->retry_period = updated_job->fd.retry_period; + + values[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] = + IntervalPGetDatum(&updated_job->fd.max_runtime); + repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] = true; + + values[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] = + Int32GetDatum(updated_job->fd.max_retries); + repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] = true; + + values[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] = + IntervalPGetDatum(&updated_job->fd.retry_period); + repl[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] = true; + + values[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] = + BoolGetDatum(updated_job->fd.scheduled); + repl[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] = true; + + repl[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true; + if (updated_job->fd.config) + values[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = + JsonbPGetDatum(updated_job->fd.config); + else + isnull[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true; + + new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, repl); ts_catalog_update(ti->scanrel, new_tuple); + heap_freetuple(new_tuple); + if (should_free) + heap_freetuple(tuple); return SCAN_DONE; } -static bool -bgw_job_update_scan(ScanKeyData *scankey, void *data) +/* + * Overwrite job with specified job_id with the given fields + * + * This function only updates the fields modifiable with alter_job. + */ +void +ts_bgw_job_update_by_id(int32 job_id, BgwJob *job) { + ScanKeyData scankey[1]; Catalog *catalog = ts_catalog_get(); ScanTupLock scantuplock = { .waitpolicy = LockWaitBlock, @@ -958,7 +995,7 @@ bgw_job_update_scan(ScanKeyData *scankey, void *data) .index = catalog_get_index(catalog, BGW_JOB, BGW_JOB_PKEY_IDX), .nkeys = 1, .scankey = scankey, - .data = data, + .data = job, .limit = 1, .tuple_found = bgw_job_tuple_update_by_id, .lockmode = RowExclusiveLock, @@ -966,20 +1003,11 @@ bgw_job_update_scan(ScanKeyData *scankey, void *data) .result_mctx = CurrentMemoryContext, .tuplock = &scantuplock }; - return ts_scanner_scan(&scanctx); -} - -/* Overwrite job with specified job_id with the given fields */ -void -ts_bgw_job_update_by_id(int32 job_id, BgwJob *job) -{ - ScanKeyData scankey[1]; - ScanKeyInit(&scankey[0], Anum_bgw_job_pkey_idx_id, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(job_id)); - bgw_job_update_scan(scankey, (void *) job); + ts_scanner_scan(&scanctx); } diff --git a/tsl/test/expected/bgw_custom.out b/tsl/test/expected/bgw_custom.out index 3a456a2db..e0b788f6d 100644 --- a/tsl/test/expected/bgw_custom.out +++ b/tsl/test/expected/bgw_custom.out @@ -116,3 +116,46 @@ SELECT job_id FROM alter_job(1,scheduled:=false); 1 (1 row) +SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config +----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+-------- + 1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | | +(1 row) + +-- test updating job settings +SELECT job_id FROM alter_job(1,config:='{"test":"test"}'); + job_id +-------- + 1 +(1 row) + +SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config +----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------ + 1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | | {"test": "test"} +(1 row) + +SELECT job_id FROM alter_job(1,scheduled:=true); + job_id +-------- + 1 +(1 row) + +SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config +----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------ + 1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | t | | {"test": "test"} +(1 row) + +SELECT job_id FROM alter_job(1,scheduled:=false); + job_id +-------- + 1 +(1 row) + +SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1; + id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | hypertable_id | config +----+------------------------+-------------------+-----------------+-------------+--------------+-----------------------+------------------+------------+-----------+---------------+------------------ + 1 | Telemetry Reporter [1] | @ 24 hours | @ 1 min 40 secs | -1 | @ 1 hour | _timescaledb_internal | policy_telemetry | super_user | f | | {"test": "test"} +(1 row) + diff --git a/tsl/test/sql/bgw_custom.sql b/tsl/test/sql/bgw_custom.sql index b4485f60b..95864a32d 100644 --- a/tsl/test/sql/bgw_custom.sql +++ b/tsl/test/sql/bgw_custom.sql @@ -59,4 +59,12 @@ SELECT * FROM _timescaledb_config.bgw_job WHERE id >= 1000; \c :TEST_DBNAME :ROLE_SUPERUSER -- test altering job with NULL config SELECT job_id FROM alter_job(1,scheduled:=false); +SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1; +-- test updating job settings +SELECT job_id FROM alter_job(1,config:='{"test":"test"}'); +SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1; +SELECT job_id FROM alter_job(1,scheduled:=true); +SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1; +SELECT job_id FROM alter_job(1,scheduled:=false); +SELECT * FROM _timescaledb_config.bgw_job WHERE id = 1;