From 96d2acea30e6cc27ab624e5a155f6b57767ac411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Wed, 3 May 2023 14:05:47 +0200 Subject: [PATCH] Cleanup PGresults on transaction end Fix a regression due to a previous change in c571d54c. That change unintentionally removed the cleanup of PGresults at the end of transactions. Add back this functionality in order to reduce memory usage. --- tsl/src/remote/connection.c | 146 +++++++++++++++++++++++- tsl/test/expected/remote_connection.out | 2 +- tsl/test/src/remote/connection.c | 22 ++-- 3 files changed, 156 insertions(+), 14 deletions(-) diff --git a/tsl/src/remote/connection.c b/tsl/src/remote/connection.c index 30634dca9..4bbe28ad1 100644 --- a/tsl/src/remote/connection.c +++ b/tsl/src/remote/connection.c @@ -149,8 +149,9 @@ list_insert_after(ListNode *entry, ListNode *prev) */ typedef struct ResultEntry { - struct ListNode ln; /* Must be first entry */ - TSConnection *conn; /* The connection the result was created on */ + struct ListNode ln; /* Must be first entry */ + TSConnection *conn; /* The connection the result was created on */ + SubTransactionId subtxid; /* The subtransaction ID that created this result, if any. */ PGresult *result; } ResultEntry; @@ -450,11 +451,17 @@ handle_result_create(PGEventResultCreate *event) entry->ln.next = entry->ln.prev = NULL; entry->conn = conn; entry->result = event->result; + entry->subtxid = GetCurrentSubTransactionId(); + /* Add entry as new head and set instance data */ list_insert_after(&entry->ln, &conn->results); PQresultSetInstanceData(event->result, eventproc, entry); - elog(DEBUG3, "created result %p on connection %p", event->result, conn); + elog(DEBUG3, + "created result %p on connection %p subtxid %u", + event->result, + conn, + entry->subtxid); connstats.results_created++; @@ -475,10 +482,9 @@ handle_result_destroy(PGEventResultDestroy *event) /* Detach entry */ list_detach(&entry->ln); - elog(DEBUG3, "destroyed result %p", entry->result); + elog(DEBUG3, "destroyed result %p for subtxnid %u", entry->result, entry->subtxid); pfree(entry); - connstats.results_cleared++; return EVENTPROC_SUCCESS; @@ -2549,13 +2555,143 @@ remote_connection_stats_get(void) } #endif +/* + * Cleanup connections and results at the end of a (sub-)transaction. + * + * This function is called at the end of transactions and sub-transactions to + * auto-cleanup connections and result objects. + */ +static void +remote_connections_xact_cleanup(SubTransactionId subtxid, bool isabort) +{ + ListNode *curr = connections.next; + unsigned int num_connections = 0; + unsigned int num_results = 0; + + while (curr != &connections) + { + TSConnection *conn = (TSConnection *) curr; + + /* Move to next connection since closing the current one might + * otherwise make the curr pointer invalid. */ + curr = curr->next; + + /* We're not closing the connection, but we should clean up any + * lingering results */ + ListNode *curr_result = conn->results.next; + + while (curr_result != &conn->results) + { + ResultEntry *entry = (ResultEntry *) curr_result; + + curr_result = curr_result->next; + + if (subtxid == InvalidSubTransactionId || subtxid == entry->subtxid) + { + PQclear(entry->result); + num_results++; + } + } + } + + if (subtxid == InvalidSubTransactionId) + elog(DEBUG3, + "cleaned up %u connections and %u results at %s of transaction", + num_connections, + num_results, + isabort ? "abort" : "commit"); + else + elog(DEBUG3, + "cleaned up %u connections and %u results at %s of sub-transaction %u", + num_connections, + num_results, + isabort ? "abort" : "commit", + subtxid); +} + +static void +remote_connection_xact_end(XactEvent event, void *unused_arg) +{ + /* + * We are deep down in CommitTransaction code path. We do not want our + * emit_log_hook_callback to interfere since it uses its own transaction + */ + emit_log_hook_type prev_emit_log_hook = emit_log_hook; + emit_log_hook = NULL; + + switch (event) + { + case XACT_EVENT_ABORT: + case XACT_EVENT_PARALLEL_ABORT: + /* + * We expect that the waitpoint will be retried and then we + * will return due to the process receiving a SIGTERM if + * the advisory lock is exclusively held by a user call + */ + DEBUG_RETRY_WAITPOINT("remote_conn_xact_end"); + remote_connections_xact_cleanup(InvalidSubTransactionId, true); + break; + case XACT_EVENT_COMMIT: + case XACT_EVENT_PARALLEL_COMMIT: + /* Same retry behavior as above */ + DEBUG_RETRY_WAITPOINT("remote_conn_xact_end"); + remote_connections_xact_cleanup(InvalidSubTransactionId, false); + break; + case XACT_EVENT_PREPARE: + /* + * We expect that the waitpoint will be retried and then we + * will return with a warning on crossing the retry count if + * the advisory lock is exclusively held by a user call + */ + DEBUG_RETRY_WAITPOINT("remote_conn_xact_end"); + break; + default: + /* other events are too early to use DEBUG_WAITPOINT.. */ + break; + } + + /* re-enable the emit_log_hook */ + emit_log_hook = prev_emit_log_hook; +} + +static void +remote_connection_subxact_end(SubXactEvent event, SubTransactionId subtxid, + SubTransactionId parent_subtxid, void *unused_arg) +{ + /* + * We are deep down in CommitTransaction code path. We do not want our + * emit_log_hook_callback to interfere since it uses its own transaction + */ + emit_log_hook_type prev_emit_log_hook = emit_log_hook; + emit_log_hook = NULL; + + switch (event) + { + case SUBXACT_EVENT_ABORT_SUB: + remote_connections_xact_cleanup(subtxid, true); + break; + case SUBXACT_EVENT_COMMIT_SUB: + remote_connections_xact_cleanup(subtxid, false); + break; + default: + break; + } + + /* re-enable the emit_log_hook */ + emit_log_hook = prev_emit_log_hook; +} + void _remote_connection_init(void) { + RegisterXactCallback(remote_connection_xact_end, NULL); + RegisterSubXactCallback(remote_connection_subxact_end, NULL); unset_libpq_envvar(); } void _remote_connection_fini(void) { + UnregisterXactCallback(remote_connection_xact_end, NULL); + UnregisterSubXactCallback(remote_connection_subxact_end, NULL); } diff --git a/tsl/test/expected/remote_connection.out b/tsl/test/expected/remote_connection.out index 00436d31f..cadc8d864 100644 --- a/tsl/test/expected/remote_connection.out +++ b/tsl/test/expected/remote_connection.out @@ -55,7 +55,7 @@ SELECT * FROM test.get_connection_stats(); \set ON_ERROR_STOP 0 SELECT test.send_remote_query_that_generates_exception(); ERROR: XX000: bad query error thrown from test -LOCATION: ts_test_bad_remote_query, connection.c:216 +LOCATION: ts_test_bad_remote_query, connection.c:222 \set ON_ERROR_STOP 1 SELECT * FROM test.get_connection_stats(); connections_created | connections_closed | results_created | results_cleared diff --git a/tsl/test/src/remote/connection.c b/tsl/test/src/remote/connection.c index a5e6ae862..01de5ef91 100644 --- a/tsl/test/src/remote/connection.c +++ b/tsl/test/src/remote/connection.c @@ -170,31 +170,37 @@ test_connection_and_result_leaks() ASSERT_NUM_OPEN_RESULTS(stats, 3); RollbackAndReleaseCurrentSubTransaction(); - /* The connection created in the rolled back transaction should be + + /* The results in the rolled-back transaction should be closed */ + ASSERT_NUM_OPEN_RESULTS(stats, 1); + + /* The connection created in the rolled-back transaction should be * destroyed */ ASSERT_NUM_OPEN_CONNECTIONS(stats, 2); remote_connection_exec(subconn, "SELECT 1"); - ASSERT_NUM_OPEN_RESULTS(stats, 4); + ASSERT_NUM_OPEN_RESULTS(stats, 2); /* Note that releasing/committing the subtransaction does not delete the memory */ ReleaseCurrentSubTransaction(); + /* No remaining results */ + ASSERT_NUM_OPEN_RESULTS(stats, 0); + /* The original and subconn still exists */ ASSERT_NUM_OPEN_CONNECTIONS(stats, 2); - ASSERT_NUM_OPEN_RESULTS(stats, 4); remote_connection_exec(conn, "SELECT 1"); - ASSERT_NUM_OPEN_RESULTS(stats, 5); + ASSERT_NUM_OPEN_RESULTS(stats, 1); MemoryContextSwitchTo(old); MemoryContextDelete(mcxt); - /* Original connection should be cleaned up along with its 3 results. The - * subconn object was created on a subtransaction memory context that will - * be cleared when the main transaction ends. */ + /* Original connection should be cleaned up along with all its + * results. The subconn object, however, was created on a subtransaction + * memory context that will be cleared when the main transaction ends. */ ASSERT_NUM_OPEN_CONNECTIONS(stats, 1); - ASSERT_NUM_OPEN_RESULTS(stats, 2); + ASSERT_NUM_OPEN_RESULTS(stats, 0); remote_connection_stats_reset(); }