Fix background worker scheduler memory consumption

This patch changes how the scheduler handles memory contexts.
Previously only memory allocated during transactions would get
freed and everything else remained allocated.

The scheduler now uses 2 memory contexts for its operation: scheduler_mctx
for long-lived objects and scratch_mctx for short-lived objects.
After every iteration of the scheduling main loop scratch_mctx gets
reset. Special care needs to be taken in regards to memory contexts
since StartTransactionCommand creates and switches to a transaction
memory context which gets deleted on CommitTransactionCommand which
switches CurrentMemoryContext back to TopMemoryContext. So operations
wrapped in Start/CommitTransactionCommit will not happen in scratch_mctx
but will get freed on CommitTransactionCommand.
This commit is contained in:
Sven Klemm 2020-08-02 19:42:13 +02:00 committed by Sven Klemm
parent 393e5b9c1a
commit 02dae3a5fb
5 changed files with 82 additions and 29 deletions

View File

@ -33,6 +33,7 @@
#include "bgw_policy/chunk_stats.h"
#include "bgw_policy/policy.h"
#include "scan_iterator.h"
#include "bgw/scheduler.h"
#include <cross_module_fn.h>
@ -77,30 +78,6 @@ typedef enum JobLockLifetime
TXN_LOCK,
} JobLockLifetime;
BackgroundWorkerHandle *
ts_bgw_start_worker(const char *function, const char *name, const char *extra)
{
BackgroundWorker worker = {
.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
.bgw_start_time = BgWorkerStart_RecoveryFinished,
.bgw_restart_time = BGW_NEVER_RESTART,
.bgw_notify_pid = MyProcPid,
.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId),
};
BackgroundWorkerHandle *handle = NULL;
StrNCpy(worker.bgw_name, name, BGW_MAXLEN);
StrNCpy(worker.bgw_library_name, ts_extension_get_so_name(), BGW_MAXLEN);
StrNCpy(worker.bgw_function_name, function, BGW_MAXLEN);
Assert(strlen(extra) < BGW_EXTRALEN);
StrNCpy(worker.bgw_extra, extra, BGW_EXTRALEN);
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
return NULL;
return handle;
}
Oid
ts_bgw_job_owner(BgwJob *job)
{

View File

@ -36,9 +36,6 @@ typedef bool job_main_func(void);
typedef bool (*unknown_job_type_hook_type)(BgwJob *job);
typedef Oid (*unknown_job_type_owner_hook_type)(BgwJob *job);
extern BackgroundWorkerHandle *ts_bgw_start_worker(const char *function, const char *name,
const char *extra);
extern BackgroundWorkerHandle *ts_bgw_job_start(BgwJob *job, Oid user_oid);
extern List *ts_bgw_job_get_scheduled(size_t alloc_size, MemoryContext mctx);

View File

@ -60,6 +60,9 @@ static bool jobs_list_needs_update;
/* has to be global to shutdown jobs on exit */
static List *scheduled_jobs = NIL;
static MemoryContext scheduler_mctx;
static MemoryContext scratch_mctx;
/* See the README for a state transition diagram */
typedef enum JobState
{
@ -108,6 +111,34 @@ static void on_failure_to_start_job(ScheduledBgwJob *sjob);
static volatile sig_atomic_t got_SIGHUP = false;
BackgroundWorkerHandle *
ts_bgw_start_worker(const char *function, const char *name, const char *extra)
{
BackgroundWorker worker = {
.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
.bgw_start_time = BgWorkerStart_RecoveryFinished,
.bgw_restart_time = BGW_NEVER_RESTART,
.bgw_notify_pid = MyProcPid,
.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId),
};
BackgroundWorkerHandle *handle = NULL;
StrNCpy(worker.bgw_name, name, BGW_MAXLEN);
StrNCpy(worker.bgw_library_name, ts_extension_get_so_name(), BGW_MAXLEN);
StrNCpy(worker.bgw_function_name, function, BGW_MAXLEN);
Assert(strlen(extra) < BGW_EXTRALEN);
StrNCpy(worker.bgw_extra, extra, BGW_EXTRALEN);
/* handle needs to be allocated in long-lived memory context */
MemoryContextSwitchTo(scheduler_mctx);
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
handle = NULL;
MemoryContextSwitchTo(scratch_mctx);
return handle;
}
#if USE_ASSERT_CHECKING
static void
assert_that_worker_has_stopped(ScheduledBgwJob *sjob)
@ -245,6 +276,7 @@ scheduled_bgw_job_transition_state_to(ScheduledBgwJob *sjob, JobState new_state)
sjob->job.fd.id);
ts_bgw_job_cache_invalidate_callback();
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
return;
}
@ -258,6 +290,7 @@ scheduled_bgw_job_transition_state_to(ScheduledBgwJob *sjob, JobState new_state)
NameStr(sjob->job.fd.application_name));
scheduled_bgw_job_transition_state_to(sjob, JOB_STATE_SCHEDULED);
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
return;
}
@ -274,6 +307,7 @@ scheduled_bgw_job_transition_state_to(ScheduledBgwJob *sjob, JobState new_state)
owner_uid = ts_bgw_job_owner(&sjob->job);
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
elog(DEBUG1,
"launching job %d \"%s\"",
@ -322,6 +356,7 @@ on_failure_to_start_job(ScheduledBgwJob *sjob)
}
scheduled_bgw_job_transition_state_to(sjob, JOB_STATE_SCHEDULED);
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
}
static inline void
@ -373,6 +408,7 @@ scheduled_ts_bgw_job_start(ScheduledBgwJob *sjob,
StartTransactionCommand();
scheduled_bgw_job_transition_state_to(sjob, JOB_STATE_SCHEDULED);
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
break;
case BGWH_NOT_YET_STARTED:
/* should not be possible */
@ -471,6 +507,7 @@ ts_update_scheduled_jobs_list(List *cur_jobs_list, MemoryContext mctx)
}
#ifdef TS_DEBUG
/* Only used by test code */
void
ts_populate_scheduled_job_tuple(ScheduledBgwJob *sjob, Datum *values)
@ -513,6 +550,7 @@ static void
start_scheduled_jobs(register_background_worker_callback_type bgw_register)
{
ListCell *lc;
Assert(CurrentMemoryContext == scratch_mctx);
/* Order jobs by increasing next_start */
List *ordered_scheduled_jobs = list_qsort(scheduled_jobs, cmp_next_start);
@ -653,6 +691,7 @@ check_for_stopped_and_timed_out_jobs()
StartTransactionCommand();
scheduled_bgw_job_transition_state_to(sjob, JOB_STATE_SCHEDULED);
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
Assert(sjob->state != JOB_STATE_STARTED);
break;
}
@ -664,6 +703,16 @@ check_for_stopped_and_timed_out_jobs()
* the loop will exit). This functionality is used to ease testing.
* In production, ttl_ms should be < 0 to signal that the loop should
* run forever (or until the process gets a signal).
*
* The scheduler uses 2 memory contexts for its operation: scheduler_mctx
* for long-lived objects and scratch_mctx for short-lived objects.
* After every iteration of the scheduling main loop scratch_mctx gets
* reset. Special care needs to be taken in regards to memory contexts
* since StartTransactionCommand creates and switches to a transaction
* memory context which gets deleted on CommitTransactionCommand which
* switches CurrentMemoryContext back to TopMemoryContext. So operations
* wrapped in Start/CommitTransactionCommit will not happen in scratch_mctx
* but will get freed on CommitTransactionCommand.
*/
void
ts_bgw_scheduler_process(int32 run_for_interval_ms,
@ -671,12 +720,12 @@ ts_bgw_scheduler_process(int32 run_for_interval_ms,
{
TimestampTz start = ts_timer_get_current_timestamp();
TimestampTz quit_time = DT_NOEND;
MemoryContext scheduler_mctx = CurrentMemoryContext;
/* txn to read the list of jobs from the DB */
StartTransactionCommand();
scheduled_jobs = ts_update_scheduled_jobs_list(scheduled_jobs, scheduler_mctx);
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
jobs_list_needs_update = false;
@ -694,6 +743,7 @@ ts_bgw_scheduler_process(int32 run_for_interval_ms,
while (quit_time > ts_timer_get_current_timestamp() && !ProcDiePending && !ts_shutdown_bgw)
{
TimestampTz next_wakeup = quit_time;
Assert(CurrentMemoryContext == scratch_mctx);
/* start jobs, and then check when to next wake up */
start_scheduled_jobs(bgw_register);
@ -716,16 +766,19 @@ ts_bgw_scheduler_process(int32 run_for_interval_ms,
*/
AcceptInvalidationMessages();
/* txn to read the list of jobs from the DB */
if (jobs_list_needs_update)
{
StartTransactionCommand();
Assert(CurrentMemoryContext == CurTransactionContext);
scheduled_jobs = ts_update_scheduled_jobs_list(scheduled_jobs, scheduler_mctx);
CommitTransactionCommand();
MemoryContextSwitchTo(scratch_mctx);
jobs_list_needs_update = false;
}
check_for_stopped_and_timed_out_jobs();
MemoryContextReset(scratch_mctx);
}
#ifdef TS_DEBUG
@ -751,6 +804,18 @@ ts_bgw_scheduler_setup_callbacks()
before_shmem_exit(bgw_scheduler_before_shmem_exit_callback, PointerGetDatum(NULL));
}
/* some of the scheduler mock code calls functions from this file without going through
* the main loop so we need a way to setup the memory contexts
*/
void
ts_bgw_scheduler_setup_mctx()
{
scheduler_mctx = AllocSetContextCreate(TopMemoryContext, "Scheduler", ALLOCSET_DEFAULT_SIZES);
scratch_mctx =
AllocSetContextCreate(scheduler_mctx, "SchedulerScratch", ALLOCSET_DEFAULT_SIZES);
MemoryContextSwitchTo(scratch_mctx);
}
static void handle_sigterm(SIGNAL_ARGS)
{
/*
@ -806,8 +871,14 @@ ts_bgw_scheduler_main(PG_FUNCTION_ARGS)
pgstat_report_appname(SCHEDULER_APPNAME);
ts_bgw_scheduler_setup_mctx();
ts_bgw_scheduler_process(-1, NULL);
Assert(scheduled_jobs == NIL);
MemoryContextSwitchTo(TopMemoryContext);
MemoryContextDelete(scheduler_mctx);
PG_RETURN_VOID();
};

View File

@ -31,5 +31,9 @@ extern void ts_bgw_scheduler_setup_callbacks(void);
extern void ts_bgw_job_cache_invalidate_callback(void);
extern void ts_bgw_scheduler_register_signal_handlers(void);
extern void ts_bgw_scheduler_setup_mctx(void);
extern BackgroundWorkerHandle *ts_bgw_start_worker(const char *function, const char *name,
const char *extra);
#endif /* BGW_SCHEDULER_H */

View File

@ -151,6 +151,8 @@ ts_bgw_db_scheduler_test_main(PG_FUNCTION_ARGS)
pgstat_report_appname("DB Scheduler Test");
ts_bgw_scheduler_setup_mctx();
ts_bgw_scheduler_process(ttl, ts_timer_mock_register_bgw_handle);
PG_RETURN_VOID();
@ -163,6 +165,8 @@ start_test_scheduler(char *params)
* This is where we would increment the number of bgw used, if we
* decide to do so
*/
ts_bgw_scheduler_setup_mctx();
return ts_bgw_start_worker("ts_bgw_db_scheduler_test_main",
"ts_bgw_db_scheduler_test_main",
params);