From 1cc1b7c14e793d13f7f047f54babcc231caa36d4 Mon Sep 17 00:00:00 2001 From: Joshua Lockerman Date: Mon, 23 Sep 2019 16:19:58 -0400 Subject: [PATCH] Fix memory leaks part 1: BGWs This commit fixes memory leaks in the Background Worker infrastructure. Notably, it fixes a leak in the launcher and BGW scheduler where shared message queue handles would not be freed, and it fixes a leak where BackgroundWorkerHandles were not freed in the scheduler. --- scripts/suppressions/suppr_leak.txt | 8 ++++++ src/bgw/job.c | 42 ++++++++++++++++++++++++----- src/loader/bgw_launcher.c | 26 +++++++++++++++++- src/loader/bgw_message_queue.c | 5 ++++ test/sql/.gitignore | 1 + test/src/bgw/scheduler_mock.c | 2 +- 6 files changed, 75 insertions(+), 9 deletions(-) diff --git a/scripts/suppressions/suppr_leak.txt b/scripts/suppressions/suppr_leak.txt index 4dc97edd0..800ba6d81 100644 --- a/scripts/suppressions/suppr_leak.txt +++ b/scripts/suppressions/suppr_leak.txt @@ -17,3 +17,11 @@ leak:CheckMyDatabase #pg_dump leak:getSchemaData leak:dumpDumpableObject + +#should live as long as the process +leak:ShmemInitHash + +#test only functions +leak:deserialize_test_parameters +leak:ts_params_get +leak:test_job_dispatcher diff --git a/src/bgw/job.c b/src/bgw/job.c index 8ad280b7f..a6d1fcefa 100644 --- a/src/bgw/job.c +++ b/src/bgw/job.c @@ -125,13 +125,18 @@ ts_bgw_job_start(BgwJob *job, Oid user_uid) { int32 job_id = Int32GetDatum(job->fd.id); StringInfo si = makeStringInfo(); + BackgroundWorkerHandle *bgw_handle; /* Changing this requires changes to ts_bgw_job_entrypoint */ appendStringInfo(si, "%u %d", user_uid, job_id); - return ts_bgw_start_worker(job_entrypoint_function_name, - NameStr(job->fd.application_name), - si->data); + bgw_handle = ts_bgw_start_worker(job_entrypoint_function_name, + NameStr(job->fd.application_name), + si->data); + + pfree(si->data); + pfree(si); + return bgw_handle; } static JobType @@ -443,6 +448,7 @@ ts_bgw_job_execute(BgwJob *job) { case JOB_TYPE_VERSION_CHECK: { + bool next_start_set; /* * In the first 12 hours, we want telemetry to ping every * hour. After that initial period, we default to the @@ -457,10 +463,12 @@ ts_bgw_job_execute(BgwJob *job) Int32GetDatum(0), Float8GetDatum(0))); - return ts_bgw_job_run_and_set_next_start(job, - ts_telemetry_main_wrapper, - TELEMETRY_INITIAL_NUM_RUNS, - one_hour); + next_start_set = ts_bgw_job_run_and_set_next_start(job, + ts_telemetry_main_wrapper, + TELEMETRY_INITIAL_NUM_RUNS, + one_hour); + pfree(one_hour); + return next_start_set; } case JOB_TYPE_REORDER: case JOB_TYPE_DROP_CHUNKS: @@ -601,6 +609,16 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS) AbortCurrentTransaction(); StartTransactionCommand(); + /* Free the old job if it exists, it's no longer needed, and since it's + * in the TopMemoryContext it won't be freed otherwise. + */ + + if (job != NULL) + { + pfree(job); + job = NULL; + } + /* * Note that the mark_start happens in the scheduler right before the * job is launched. Try to get a lock on the job again. Because the error @@ -613,7 +631,11 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS) TXN_LOCK, /* block */ false); if (job != NULL) + { ts_bgw_job_stat_mark_end(job, JOB_FAILURE); + pfree(job); + job = NULL; + } CommitTransactionCommand(); /* @@ -636,6 +658,12 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS) ts_bgw_job_stat_mark_end(job, res); CommitTransactionCommand(); + if (job != NULL) + { + pfree(job); + job = NULL; + } + elog(DEBUG1, "exiting job %d with %s", job_id, (res == JOB_SUCCESS ? "success" : "failure")); PG_RETURN_VOID(); diff --git a/src/loader/bgw_launcher.c b/src/loader/bgw_launcher.c index a801e858d..99b2d9385 100644 --- a/src/loader/bgw_launcher.c +++ b/src/loader/bgw_launcher.c @@ -391,6 +391,7 @@ static void scheduler_state_trans_disabled_to_enabled(DbHashEntry *entry) { Assert(entry->state == DISABLED); + Assert(entry->db_scheduler_handle == NULL); scheduler_modify_state(entry, ENABLED); } @@ -398,6 +399,7 @@ static void scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry) { Assert(entry->state == ENABLED); + Assert(entry->db_scheduler_handle == NULL); /* Reserve a spot for this scheduler with BGW counter */ if (!ts_bgw_total_workers_increment()) { @@ -412,6 +414,11 @@ scheduler_state_trans_started_to_allocated(DbHashEntry *entry) { Assert(entry->state == STARTED); Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED); + if (entry->db_scheduler_handle != NULL) + { + pfree(entry->db_scheduler_handle); + entry->db_scheduler_handle = NULL; + } scheduler_modify_state(entry, ALLOCATED); } @@ -422,9 +429,11 @@ scheduler_state_trans_allocated_to_started(DbHashEntry *entry) bool worker_registered; Assert(entry->state == ALLOCATED); + Assert(entry->db_scheduler_handle == NULL); worker_registered = register_entrypoint_for_db(entry->db_oid, entry->vxid, &entry->db_scheduler_handle); + if (!worker_registered) { report_error_on_worker_register_failure(entry); @@ -439,6 +448,7 @@ static void scheduler_state_trans_enabled_to_disabled(DbHashEntry *entry) { Assert(entry->state == ENABLED); + Assert(entry->db_scheduler_handle == NULL); scheduler_modify_state(entry, DISABLED); } @@ -446,6 +456,8 @@ static void scheduler_state_trans_allocated_to_disabled(DbHashEntry *entry) { Assert(entry->state == ALLOCATED); + Assert(entry->db_scheduler_handle == NULL); + ts_bgw_total_workers_decrement(); scheduler_modify_state(entry, DISABLED); } @@ -455,7 +467,13 @@ scheduler_state_trans_started_to_disabled(DbHashEntry *entry) { Assert(entry->state == STARTED); Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED); + ts_bgw_total_workers_decrement(); + if (entry->db_scheduler_handle != NULL) + { + pfree(entry->db_scheduler_handle); + entry->db_scheduler_handle = NULL; + } scheduler_modify_state(entry, DISABLED); } @@ -510,7 +528,13 @@ launcher_pre_shmem_cleanup(int code, Datum arg) * them anymore) */ while ((current_entry = hash_seq_search(&hash_seq)) != NULL) - terminate_background_worker(current_entry->db_scheduler_handle); + { + if (current_entry->db_scheduler_handle != NULL) + { + terminate_background_worker(current_entry->db_scheduler_handle); + pfree(current_entry->db_scheduler_handle); + } + } hash_destroy(db_htab); } diff --git a/src/loader/bgw_message_queue.c b/src/loader/bgw_message_queue.c index 5e3eefb32..99bb96ac8 100644 --- a/src/loader/bgw_message_queue.c +++ b/src/loader/bgw_message_queue.c @@ -397,6 +397,11 @@ send_ack(dsm_segment *seg, bool success) ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } + + /* we are responsible for pfree'ing the handle, the dsm infrastructure only + * deals with the queue itself + */ + pfree(ack_queue_handle); if (ack_res != SHM_MQ_SUCCESS) return SEND_FAILURE; diff --git a/test/sql/.gitignore b/test/sql/.gitignore index 344212af4..e6a016a45 100644 --- a/test/sql/.gitignore +++ b/test/sql/.gitignore @@ -13,3 +13,4 @@ /sql_query_results_unoptimized-*.sql /sql_query_results_x_diff-*.sql /sql_query-*.sql + diff --git a/test/src/bgw/scheduler_mock.c b/test/src/bgw/scheduler_mock.c index 79c634564..4777921d5 100644 --- a/test/src/bgw/scheduler_mock.c +++ b/test/src/bgw/scheduler_mock.c @@ -324,7 +324,7 @@ static bool test_job_dispatcher(BgwJob *job) { ts_register_emit_log_hook(); - ts_bgw_log_set_application_name(NameStr(job->fd.application_name)); + ts_bgw_log_set_application_name(strdup(NameStr(job->fd.application_name))); StartTransactionCommand(); ts_params_get();