Fix memory leaks part 1: BGWs

This commit fixes memory leaks in the Background Worker infrastructure.
Notably, it fixes a leak in the launcher and BGW scheduler where shared
message queue handles would not be freed, and it fixes a leak where
BackgroundWorkerHandles were not freed in the scheduler.
This commit is contained in:
Joshua Lockerman 2019-09-23 16:19:58 -04:00 committed by JLockerman
parent 7553cdcd54
commit 1cc1b7c14e
6 changed files with 75 additions and 9 deletions

View File

@ -17,3 +17,11 @@ leak:CheckMyDatabase
#pg_dump #pg_dump
leak:getSchemaData leak:getSchemaData
leak:dumpDumpableObject leak:dumpDumpableObject
#should live as long as the process
leak:ShmemInitHash
#test only functions
leak:deserialize_test_parameters
leak:ts_params_get
leak:test_job_dispatcher

View File

@ -125,13 +125,18 @@ ts_bgw_job_start(BgwJob *job, Oid user_uid)
{ {
int32 job_id = Int32GetDatum(job->fd.id); int32 job_id = Int32GetDatum(job->fd.id);
StringInfo si = makeStringInfo(); StringInfo si = makeStringInfo();
BackgroundWorkerHandle *bgw_handle;
/* Changing this requires changes to ts_bgw_job_entrypoint */ /* Changing this requires changes to ts_bgw_job_entrypoint */
appendStringInfo(si, "%u %d", user_uid, job_id); appendStringInfo(si, "%u %d", user_uid, job_id);
return ts_bgw_start_worker(job_entrypoint_function_name, bgw_handle = ts_bgw_start_worker(job_entrypoint_function_name,
NameStr(job->fd.application_name), NameStr(job->fd.application_name),
si->data); si->data);
pfree(si->data);
pfree(si);
return bgw_handle;
} }
static JobType static JobType
@ -443,6 +448,7 @@ ts_bgw_job_execute(BgwJob *job)
{ {
case JOB_TYPE_VERSION_CHECK: case JOB_TYPE_VERSION_CHECK:
{ {
bool next_start_set;
/* /*
* In the first 12 hours, we want telemetry to ping every * In the first 12 hours, we want telemetry to ping every
* hour. After that initial period, we default to the * hour. After that initial period, we default to the
@ -457,10 +463,12 @@ ts_bgw_job_execute(BgwJob *job)
Int32GetDatum(0), Int32GetDatum(0),
Float8GetDatum(0))); Float8GetDatum(0)));
return ts_bgw_job_run_and_set_next_start(job, next_start_set = ts_bgw_job_run_and_set_next_start(job,
ts_telemetry_main_wrapper, ts_telemetry_main_wrapper,
TELEMETRY_INITIAL_NUM_RUNS, TELEMETRY_INITIAL_NUM_RUNS,
one_hour); one_hour);
pfree(one_hour);
return next_start_set;
} }
case JOB_TYPE_REORDER: case JOB_TYPE_REORDER:
case JOB_TYPE_DROP_CHUNKS: case JOB_TYPE_DROP_CHUNKS:
@ -601,6 +609,16 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
AbortCurrentTransaction(); AbortCurrentTransaction();
StartTransactionCommand(); StartTransactionCommand();
/* Free the old job if it exists, it's no longer needed, and since it's
* in the TopMemoryContext it won't be freed otherwise.
*/
if (job != NULL)
{
pfree(job);
job = NULL;
}
/* /*
* Note that the mark_start happens in the scheduler right before the * Note that the mark_start happens in the scheduler right before the
* job is launched. Try to get a lock on the job again. Because the error * job is launched. Try to get a lock on the job again. Because the error
@ -613,7 +631,11 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
TXN_LOCK, TXN_LOCK,
/* block */ false); /* block */ false);
if (job != NULL) if (job != NULL)
{
ts_bgw_job_stat_mark_end(job, JOB_FAILURE); ts_bgw_job_stat_mark_end(job, JOB_FAILURE);
pfree(job);
job = NULL;
}
CommitTransactionCommand(); CommitTransactionCommand();
/* /*
@ -636,6 +658,12 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
ts_bgw_job_stat_mark_end(job, res); ts_bgw_job_stat_mark_end(job, res);
CommitTransactionCommand(); CommitTransactionCommand();
if (job != NULL)
{
pfree(job);
job = NULL;
}
elog(DEBUG1, "exiting job %d with %s", job_id, (res == JOB_SUCCESS ? "success" : "failure")); elog(DEBUG1, "exiting job %d with %s", job_id, (res == JOB_SUCCESS ? "success" : "failure"));
PG_RETURN_VOID(); PG_RETURN_VOID();

View File

@ -391,6 +391,7 @@ static void
scheduler_state_trans_disabled_to_enabled(DbHashEntry *entry) scheduler_state_trans_disabled_to_enabled(DbHashEntry *entry)
{ {
Assert(entry->state == DISABLED); Assert(entry->state == DISABLED);
Assert(entry->db_scheduler_handle == NULL);
scheduler_modify_state(entry, ENABLED); scheduler_modify_state(entry, ENABLED);
} }
@ -398,6 +399,7 @@ static void
scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry) scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry)
{ {
Assert(entry->state == ENABLED); Assert(entry->state == ENABLED);
Assert(entry->db_scheduler_handle == NULL);
/* Reserve a spot for this scheduler with BGW counter */ /* Reserve a spot for this scheduler with BGW counter */
if (!ts_bgw_total_workers_increment()) if (!ts_bgw_total_workers_increment())
{ {
@ -412,6 +414,11 @@ scheduler_state_trans_started_to_allocated(DbHashEntry *entry)
{ {
Assert(entry->state == STARTED); Assert(entry->state == STARTED);
Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED); Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED);
if (entry->db_scheduler_handle != NULL)
{
pfree(entry->db_scheduler_handle);
entry->db_scheduler_handle = NULL;
}
scheduler_modify_state(entry, ALLOCATED); scheduler_modify_state(entry, ALLOCATED);
} }
@ -422,9 +429,11 @@ scheduler_state_trans_allocated_to_started(DbHashEntry *entry)
bool worker_registered; bool worker_registered;
Assert(entry->state == ALLOCATED); Assert(entry->state == ALLOCATED);
Assert(entry->db_scheduler_handle == NULL);
worker_registered = worker_registered =
register_entrypoint_for_db(entry->db_oid, entry->vxid, &entry->db_scheduler_handle); register_entrypoint_for_db(entry->db_oid, entry->vxid, &entry->db_scheduler_handle);
if (!worker_registered) if (!worker_registered)
{ {
report_error_on_worker_register_failure(entry); report_error_on_worker_register_failure(entry);
@ -439,6 +448,7 @@ static void
scheduler_state_trans_enabled_to_disabled(DbHashEntry *entry) scheduler_state_trans_enabled_to_disabled(DbHashEntry *entry)
{ {
Assert(entry->state == ENABLED); Assert(entry->state == ENABLED);
Assert(entry->db_scheduler_handle == NULL);
scheduler_modify_state(entry, DISABLED); scheduler_modify_state(entry, DISABLED);
} }
@ -446,6 +456,8 @@ static void
scheduler_state_trans_allocated_to_disabled(DbHashEntry *entry) scheduler_state_trans_allocated_to_disabled(DbHashEntry *entry)
{ {
Assert(entry->state == ALLOCATED); Assert(entry->state == ALLOCATED);
Assert(entry->db_scheduler_handle == NULL);
ts_bgw_total_workers_decrement(); ts_bgw_total_workers_decrement();
scheduler_modify_state(entry, DISABLED); scheduler_modify_state(entry, DISABLED);
} }
@ -455,7 +467,13 @@ scheduler_state_trans_started_to_disabled(DbHashEntry *entry)
{ {
Assert(entry->state == STARTED); Assert(entry->state == STARTED);
Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED); Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED);
ts_bgw_total_workers_decrement(); ts_bgw_total_workers_decrement();
if (entry->db_scheduler_handle != NULL)
{
pfree(entry->db_scheduler_handle);
entry->db_scheduler_handle = NULL;
}
scheduler_modify_state(entry, DISABLED); scheduler_modify_state(entry, DISABLED);
} }
@ -510,7 +528,13 @@ launcher_pre_shmem_cleanup(int code, Datum arg)
* them anymore) * them anymore)
*/ */
while ((current_entry = hash_seq_search(&hash_seq)) != NULL) while ((current_entry = hash_seq_search(&hash_seq)) != NULL)
{
if (current_entry->db_scheduler_handle != NULL)
{
terminate_background_worker(current_entry->db_scheduler_handle); terminate_background_worker(current_entry->db_scheduler_handle);
pfree(current_entry->db_scheduler_handle);
}
}
hash_destroy(db_htab); hash_destroy(db_htab);
} }

View File

@ -397,6 +397,11 @@ send_ack(dsm_segment *seg, bool success)
ResetLatch(MyLatch); ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
} }
/* we are responsible for pfree'ing the handle, the dsm infrastructure only
* deals with the queue itself
*/
pfree(ack_queue_handle);
if (ack_res != SHM_MQ_SUCCESS) if (ack_res != SHM_MQ_SUCCESS)
return SEND_FAILURE; return SEND_FAILURE;

1
test/sql/.gitignore vendored
View File

@ -13,3 +13,4 @@
/sql_query_results_unoptimized-*.sql /sql_query_results_unoptimized-*.sql
/sql_query_results_x_diff-*.sql /sql_query_results_x_diff-*.sql
/sql_query-*.sql /sql_query-*.sql

View File

@ -324,7 +324,7 @@ static bool
test_job_dispatcher(BgwJob *job) test_job_dispatcher(BgwJob *job)
{ {
ts_register_emit_log_hook(); ts_register_emit_log_hook();
ts_bgw_log_set_application_name(NameStr(job->fd.application_name)); ts_bgw_log_set_application_name(strdup(NameStr(job->fd.application_name)));
StartTransactionCommand(); StartTransactionCommand();
ts_params_get(); ts_params_get();