Pass parameters to workers as a struct

Non-functional change.

Parameters to workers were passed in as a serialized string, which then
needs to be serialized and deserialized using dedicated functions.

This commit refactors code to pass parameters to workers as a struct,
which is then just copied into the `bgw_extra` field of
`BackgroundWorker`. The struct contains simple values and can therefore
be copied using memcpy(3c).
This commit is contained in:
Mats Kindahl 2022-05-24 10:47:57 +02:00 committed by Mats Kindahl
parent 5c0110cbbf
commit 533e849c57
5 changed files with 99 additions and 97 deletions

View File

@ -56,22 +56,15 @@ typedef enum JobLockLifetime
} JobLockLifetime;
BackgroundWorkerHandle *
ts_bgw_job_start(BgwJob *job, Oid user_uid)
ts_bgw_job_start(BgwJob *job, Oid user_oid)
{
int32 job_id = Int32GetDatum(job->fd.id);
StringInfo si = makeStringInfo();
BackgroundWorkerHandle *bgw_handle;
BgwParams bgw_params = {
.job_id = Int32GetDatum(job->fd.id),
.user_oid = user_oid,
};
strlcpy(bgw_params.bgw_main, job_entrypoint_function_name, sizeof(bgw_params.bgw_main));
/* Changing this requires changes to ts_bgw_job_entrypoint */
appendStringInfo(si, "%u %d", user_uid, job_id);
bgw_handle = ts_bgw_start_worker(job_entrypoint_function_name,
NameStr(job->fd.application_name),
si->data);
pfree(si->data);
pfree(si);
return bgw_handle;
return ts_bgw_start_worker(NameStr(job->fd.application_name), &bgw_params);
}
static BgwJob *
@ -825,14 +818,13 @@ extern Datum
ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
{
Oid db_oid = DatumGetObjectId(MyBgworkerEntry->bgw_main_arg);
Oid user_uid;
int32 job_id;
BgwParams params;
BgwJob *job;
JobResult res = JOB_FAILURE;
bool got_lock;
if (sscanf(MyBgworkerEntry->bgw_extra, "%u %d", &user_uid, &job_id) != 2)
elog(ERROR, "job entrypoint got invalid bgw_extra");
memcpy(&params, MyBgworkerEntry->bgw_extra, sizeof(BgwParams));
Assert(params.user_oid != 0 && params.job_id != 0);
BackgroundWorkerBlockSignals();
/* Setup any signal handlers here */
@ -844,16 +836,14 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
elog(DEBUG1, "started background job %d", job_id);
BackgroundWorkerInitializeConnectionByOid(db_oid, user_uid, 0);
BackgroundWorkerInitializeConnectionByOid(db_oid, params.user_oid, 0);
ts_license_enable_module_loading();
StartTransactionCommand();
/* Grab a session lock on the job row to prevent concurrent deletes. Lock is released
* when the job process exits */
job = ts_bgw_job_find_with_lock(job_id,
job = ts_bgw_job_find_with_lock(params.job_id,
TopMemoryContext,
RowShareLock,
SESSION_LOCK,
@ -862,7 +852,7 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
CommitTransactionCommand();
if (job == NULL)
elog(ERROR, "job %d not found when running the background worker", job_id);
elog(ERROR, "job %d not found when running the background worker", params.job_id);
pgstat_report_appname(NameStr(job->fd.application_name));
@ -906,7 +896,7 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
* removed the session lock. Don't block and only record if the lock was actually
* obtained.
*/
job = ts_bgw_job_find_with_lock(job_id,
job = ts_bgw_job_find_with_lock(params.job_id,
TopMemoryContext,
RowShareLock,
TXN_LOCK,
@ -925,7 +915,7 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
* the rethrow will log the error; but also log which job threw the
* error
*/
elog(LOG, "job %d threw an error", job_id);
elog(LOG, "job %d threw an error", params.job_id);
PG_RE_THROW();
}
PG_END_TRY();
@ -948,7 +938,10 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
job = NULL;
}
elog(DEBUG1, "exiting job %d with %s", job_id, (res == JOB_SUCCESS ? "success" : "failure"));
elog(DEBUG1,
"exiting job %d with %s",
params.job_id,
(res == JOB_SUCCESS ? "success" : "failure"));
PG_RETURN_VOID();
}

View File

@ -39,6 +39,7 @@
#include "scheduler.h"
#include "timer.h"
#include "version.h"
#include "worker.h"
#define SCHEDULER_APPNAME "TimescaleDB Background Worker Scheduler"
#define START_RETRY_MS (1 * INT64CONST(1000)) /* 1 seconds */
@ -112,7 +113,7 @@ static void on_failure_to_start_job(ScheduledBgwJob *sjob);
static volatile sig_atomic_t got_SIGHUP = false;
BackgroundWorkerHandle *
ts_bgw_start_worker(const char *function, const char *name, const char *extra)
ts_bgw_start_worker(const char *name, const BgwParams *bgw_params)
{
BackgroundWorker worker = {
.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
@ -125,10 +126,9 @@ ts_bgw_start_worker(const char *function, const char *name, const char *extra)
strlcpy(worker.bgw_name, name, BGW_MAXLEN);
strlcpy(worker.bgw_library_name, ts_extension_get_so_name(), BGW_MAXLEN);
strlcpy(worker.bgw_function_name, function, BGW_MAXLEN);
strlcpy(worker.bgw_function_name, bgw_params->bgw_main, sizeof(worker.bgw_function_name));
Assert(strlen(extra) < BGW_EXTRALEN);
strlcpy(worker.bgw_extra, extra, BGW_EXTRALEN);
memcpy(worker.bgw_extra, bgw_params, sizeof(*bgw_params));
/* handle needs to be allocated in long-lived memory context */
MemoryContextSwitchTo(scheduler_mctx);

View File

@ -11,6 +11,7 @@
#include <postmaster/bgworker.h>
#include "timer.h"
#include "worker.h"
typedef struct ScheduledBgwJob ScheduledBgwJob;
@ -33,7 +34,6 @@ extern void ts_bgw_job_cache_invalidate_callback(void);
extern void ts_bgw_scheduler_register_signal_handlers(void);
extern void ts_bgw_scheduler_setup_mctx(void);
extern BackgroundWorkerHandle *ts_bgw_start_worker(const char *function, const char *name,
const char *extra);
extern BackgroundWorkerHandle *ts_bgw_start_worker(const char *name, const BgwParams *bgw_params);
#endif /* BGW_SCHEDULER_H */

58
src/bgw/worker.h Normal file
View File

@ -0,0 +1,58 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#ifndef BGW_WORKER_H
#define BGW_WORKER_H
#include <postgres.h>
#include <postmaster/bgworker.h>
/**
* Parameters to background workers.
*
* Do not add data here that cannot be simply copied to the background worker
* using memcpy(3). If it is necessary to add fields that cannot simply be
* copied, we need to start using the send and recv functions for the types.
*
* Only one of `job_id` and `ttl` is passed currently, with `job_id` being used
* for normal jobs and `ttl` being used for tests.
*
* The `bgw_main` is the function to execute when starting the job and is
* different depending on whether this is a test runner or the real runner.
*
* @see ts_bgw_db_scheduler_test_main
* @see ts_bgw_job_entrypoint
*/
typedef struct BgwParams
{
/** User oid to run the job as. Used when initializing the database
* connection. */
Oid user_oid;
/** Job id to use for the worker when executing the job */
int32 job_id;
/** Time to live. Only used in tests. */
int32 ttl;
/** Name of function to call when starting the background worker. */
char bgw_main[NAMEDATALEN];
} BgwParams;
/**
* Compile-time check that the size of BgwParams fit into the bgw_extra field
* of BackgroundWorker. Relies on the fact that you cannot have arrays with
* negative size.
*
* We cannot use StaticAssertDecl (yet) since it does not exist in PG12 and
* not checking this for PG12 could potentially generate hard-to-find
* problems.
*/
static char length_check[sizeof(((BackgroundWorker *) 0)->bgw_extra) -
sizeof(BgwParams)] pg_attribute_unused();
#endif /* BGW_WORKER_H */

View File

@ -55,62 +55,11 @@ static const char *test_job_type_names[_MAX_TEST_JOB_TYPE] = {
[TEST_JOB_TYPE_JOB_4] = "bgw_test_job_4",
};
static char *
serialize_test_parameters(int32 ttl)
{
JsonbValue *result;
JsonbValue ttl_value;
JsonbParseState *parse_state = NULL;
Jsonb *jb;
StringInfo jtext = makeStringInfo();
JsonbValue user_oid;
user_oid.type = jbvNumeric;
user_oid.val.numeric =
DatumGetNumeric(DirectFunctionCall1(int4_numeric, Int32GetDatum((int32) GetUserId())));
ttl_value.type = jbvNumeric;
ttl_value.val.numeric = DatumGetNumeric(DirectFunctionCall1(int4_numeric, Int32GetDatum(ttl)));
result = pushJsonbValue(&parse_state, WJB_BEGIN_ARRAY, NULL);
result = pushJsonbValue(&parse_state, WJB_ELEM, &ttl_value);
result = pushJsonbValue(&parse_state, WJB_ELEM, &user_oid);
result = pushJsonbValue(&parse_state, WJB_END_ARRAY, NULL);
jb = JsonbValueToJsonb(result);
(void) JsonbToCString(jtext, &jb->root, VARSIZE(jb));
TestAssertTrue(jtext->len < BGW_EXTRALEN);
return jtext->data;
}
static void
deserialize_test_parameters(char *params, int32 *ttl, Oid *user_oid)
{
Jsonb *jb = (Jsonb *) DatumGetPointer(DirectFunctionCall1(jsonb_in, CStringGetDatum(params)));
JsonbValue *ttl_v = getIthJsonbValueFromContainer(&jb->root, 0);
JsonbValue *user_v = getIthJsonbValueFromContainer(&jb->root, 1);
Numeric ttl_numeric;
Numeric user_numeric;
TestAssertTrue(ttl_v->type == jbvNumeric);
TestAssertTrue(user_v->type == jbvNumeric);
ttl_numeric = ttl_v->val.numeric;
user_numeric = user_v->val.numeric;
*ttl = DatumGetInt32(DirectFunctionCall1(numeric_int4, NumericGetDatum(ttl_numeric)));
*user_oid =
(Oid) DatumGetInt32(DirectFunctionCall1(numeric_int4, NumericGetDatum(user_numeric)));
}
extern Datum
ts_bgw_db_scheduler_test_main(PG_FUNCTION_ARGS)
{
Oid db_oid = DatumGetObjectId(MyBgworkerEntry->bgw_main_arg);
int32 ttl;
Oid user_oid;
BgwParams bgw_params;
BackgroundWorkerBlockSignals();
/* Setup any signal handlers here */
@ -118,12 +67,12 @@ ts_bgw_db_scheduler_test_main(PG_FUNCTION_ARGS)
BackgroundWorkerUnblockSignals();
ts_bgw_scheduler_setup_callbacks();
deserialize_test_parameters(MyBgworkerEntry->bgw_extra, &ttl, &user_oid);
memcpy(&bgw_params, MyBgworkerEntry->bgw_extra, sizeof(bgw_params));
elog(WARNING, "scheduler user id %u", user_oid);
elog(WARNING, "running a test in the background: db=%u ttl=%d", db_oid, ttl);
elog(WARNING, "scheduler user id %u", bgw_params.user_oid);
elog(WARNING, "running a test in the background: db=%u ttl=%d", db_oid, bgw_params.ttl);
BackgroundWorkerInitializeConnectionByOid(db_oid, user_oid, 0);
BackgroundWorkerInitializeConnectionByOid(db_oid, bgw_params.user_oid, 0);
StartTransactionCommand();
ts_params_get();
@ -141,33 +90,36 @@ ts_bgw_db_scheduler_test_main(PG_FUNCTION_ARGS)
ts_bgw_scheduler_setup_mctx();
ts_bgw_scheduler_process(ttl, ts_timer_mock_register_bgw_handle);
ts_bgw_scheduler_process(bgw_params.ttl, ts_timer_mock_register_bgw_handle);
PG_RETURN_VOID();
}
static BackgroundWorkerHandle *
start_test_scheduler(char *params)
start_test_scheduler(int32 ttl, Oid user_oid)
{
const BgwParams bgw_params = {
.bgw_main = "ts_bgw_db_scheduler_test_main",
.ttl = ttl,
.user_oid = user_oid,
};
/*
* This is where we would increment the number of bgw used, if we
* decide to do so
*/
ts_bgw_scheduler_setup_mctx();
return ts_bgw_start_worker("ts_bgw_db_scheduler_test_main",
"ts_bgw_db_scheduler_test_main",
params);
return ts_bgw_start_worker("ts_bgw_db_scheduler_test_main", &bgw_params);
}
extern Datum
ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(PG_FUNCTION_ARGS)
{
char *params = serialize_test_parameters(PG_GETARG_INT32(0));
BackgroundWorkerHandle *worker_handle;
pid_t pid;
worker_handle = start_test_scheduler(params);
worker_handle = start_test_scheduler(PG_GETARG_INT32(0), GetUserId());
if (worker_handle != NULL)
{
@ -190,13 +142,12 @@ static BackgroundWorkerHandle *current_handle = NULL;
extern Datum
ts_bgw_db_scheduler_test_run(PG_FUNCTION_ARGS)
{
char *params = serialize_test_parameters(PG_GETARG_INT32(0));
pid_t pid;
MemoryContext old_ctx;
BgwHandleStatus status;
old_ctx = MemoryContextSwitchTo(TopMemoryContext);
current_handle = start_test_scheduler(params);
current_handle = start_test_scheduler(PG_GETARG_INT32(0), GetUserId());
MemoryContextSwitchTo(old_ctx);
status = WaitForBackgroundWorkerStartup(current_handle, &pid);