1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-17 11:03:36 +08:00

Cleanup PGresults on transaction end

Fix a regression due to a previous change in c571d54c. That change
unintentionally removed the cleanup of PGresults at the end of
transactions. Add back this functionality in order to reduce memory
usage.
This commit is contained in:
Erik Nordström 2023-05-03 14:05:47 +02:00 committed by Erik Nordström
parent f250eaa631
commit 96d2acea30
3 changed files with 156 additions and 14 deletions
tsl
src/remote
test

@ -149,8 +149,9 @@ list_insert_after(ListNode *entry, ListNode *prev)
*/
typedef struct ResultEntry
{
struct ListNode ln; /* Must be first entry */
TSConnection *conn; /* The connection the result was created on */
struct ListNode ln; /* Must be first entry */
TSConnection *conn; /* The connection the result was created on */
SubTransactionId subtxid; /* The subtransaction ID that created this result, if any. */
PGresult *result;
} ResultEntry;
@ -450,11 +451,17 @@ handle_result_create(PGEventResultCreate *event)
entry->ln.next = entry->ln.prev = NULL;
entry->conn = conn;
entry->result = event->result;
entry->subtxid = GetCurrentSubTransactionId();
/* Add entry as new head and set instance data */
list_insert_after(&entry->ln, &conn->results);
PQresultSetInstanceData(event->result, eventproc, entry);
elog(DEBUG3, "created result %p on connection %p", event->result, conn);
elog(DEBUG3,
"created result %p on connection %p subtxid %u",
event->result,
conn,
entry->subtxid);
connstats.results_created++;
@ -475,10 +482,9 @@ handle_result_destroy(PGEventResultDestroy *event)
/* Detach entry */
list_detach(&entry->ln);
elog(DEBUG3, "destroyed result %p", entry->result);
elog(DEBUG3, "destroyed result %p for subtxnid %u", entry->result, entry->subtxid);
pfree(entry);
connstats.results_cleared++;
return EVENTPROC_SUCCESS;
@ -2549,13 +2555,143 @@ remote_connection_stats_get(void)
}
#endif
/*
* Cleanup connections and results at the end of a (sub-)transaction.
*
* This function is called at the end of transactions and sub-transactions to
* auto-cleanup connections and result objects.
*/
static void
remote_connections_xact_cleanup(SubTransactionId subtxid, bool isabort)
{
ListNode *curr = connections.next;
unsigned int num_connections = 0;
unsigned int num_results = 0;
while (curr != &connections)
{
TSConnection *conn = (TSConnection *) curr;
/* Move to next connection since closing the current one might
* otherwise make the curr pointer invalid. */
curr = curr->next;
/* We're not closing the connection, but we should clean up any
* lingering results */
ListNode *curr_result = conn->results.next;
while (curr_result != &conn->results)
{
ResultEntry *entry = (ResultEntry *) curr_result;
curr_result = curr_result->next;
if (subtxid == InvalidSubTransactionId || subtxid == entry->subtxid)
{
PQclear(entry->result);
num_results++;
}
}
}
if (subtxid == InvalidSubTransactionId)
elog(DEBUG3,
"cleaned up %u connections and %u results at %s of transaction",
num_connections,
num_results,
isabort ? "abort" : "commit");
else
elog(DEBUG3,
"cleaned up %u connections and %u results at %s of sub-transaction %u",
num_connections,
num_results,
isabort ? "abort" : "commit",
subtxid);
}
static void
remote_connection_xact_end(XactEvent event, void *unused_arg)
{
/*
* We are deep down in CommitTransaction code path. We do not want our
* emit_log_hook_callback to interfere since it uses its own transaction
*/
emit_log_hook_type prev_emit_log_hook = emit_log_hook;
emit_log_hook = NULL;
switch (event)
{
case XACT_EVENT_ABORT:
case XACT_EVENT_PARALLEL_ABORT:
/*
* We expect that the waitpoint will be retried and then we
* will return due to the process receiving a SIGTERM if
* the advisory lock is exclusively held by a user call
*/
DEBUG_RETRY_WAITPOINT("remote_conn_xact_end");
remote_connections_xact_cleanup(InvalidSubTransactionId, true);
break;
case XACT_EVENT_COMMIT:
case XACT_EVENT_PARALLEL_COMMIT:
/* Same retry behavior as above */
DEBUG_RETRY_WAITPOINT("remote_conn_xact_end");
remote_connections_xact_cleanup(InvalidSubTransactionId, false);
break;
case XACT_EVENT_PREPARE:
/*
* We expect that the waitpoint will be retried and then we
* will return with a warning on crossing the retry count if
* the advisory lock is exclusively held by a user call
*/
DEBUG_RETRY_WAITPOINT("remote_conn_xact_end");
break;
default:
/* other events are too early to use DEBUG_WAITPOINT.. */
break;
}
/* re-enable the emit_log_hook */
emit_log_hook = prev_emit_log_hook;
}
static void
remote_connection_subxact_end(SubXactEvent event, SubTransactionId subtxid,
SubTransactionId parent_subtxid, void *unused_arg)
{
/*
* We are deep down in CommitTransaction code path. We do not want our
* emit_log_hook_callback to interfere since it uses its own transaction
*/
emit_log_hook_type prev_emit_log_hook = emit_log_hook;
emit_log_hook = NULL;
switch (event)
{
case SUBXACT_EVENT_ABORT_SUB:
remote_connections_xact_cleanup(subtxid, true);
break;
case SUBXACT_EVENT_COMMIT_SUB:
remote_connections_xact_cleanup(subtxid, false);
break;
default:
break;
}
/* re-enable the emit_log_hook */
emit_log_hook = prev_emit_log_hook;
}
void
_remote_connection_init(void)
{
RegisterXactCallback(remote_connection_xact_end, NULL);
RegisterSubXactCallback(remote_connection_subxact_end, NULL);
unset_libpq_envvar();
}
void
_remote_connection_fini(void)
{
UnregisterXactCallback(remote_connection_xact_end, NULL);
UnregisterSubXactCallback(remote_connection_subxact_end, NULL);
}

@ -55,7 +55,7 @@ SELECT * FROM test.get_connection_stats();
\set ON_ERROR_STOP 0
SELECT test.send_remote_query_that_generates_exception();
ERROR: XX000: bad query error thrown from test
LOCATION: ts_test_bad_remote_query, connection.c:216
LOCATION: ts_test_bad_remote_query, connection.c:222
\set ON_ERROR_STOP 1
SELECT * FROM test.get_connection_stats();
connections_created | connections_closed | results_created | results_cleared

@ -170,31 +170,37 @@ test_connection_and_result_leaks()
ASSERT_NUM_OPEN_RESULTS(stats, 3);
RollbackAndReleaseCurrentSubTransaction();
/* The connection created in the rolled back transaction should be
/* The results in the rolled-back transaction should be closed */
ASSERT_NUM_OPEN_RESULTS(stats, 1);
/* The connection created in the rolled-back transaction should be
* destroyed */
ASSERT_NUM_OPEN_CONNECTIONS(stats, 2);
remote_connection_exec(subconn, "SELECT 1");
ASSERT_NUM_OPEN_RESULTS(stats, 4);
ASSERT_NUM_OPEN_RESULTS(stats, 2);
/* Note that releasing/committing the subtransaction does not delete the memory */
ReleaseCurrentSubTransaction();
/* No remaining results */
ASSERT_NUM_OPEN_RESULTS(stats, 0);
/* The original and subconn still exists */
ASSERT_NUM_OPEN_CONNECTIONS(stats, 2);
ASSERT_NUM_OPEN_RESULTS(stats, 4);
remote_connection_exec(conn, "SELECT 1");
ASSERT_NUM_OPEN_RESULTS(stats, 5);
ASSERT_NUM_OPEN_RESULTS(stats, 1);
MemoryContextSwitchTo(old);
MemoryContextDelete(mcxt);
/* Original connection should be cleaned up along with its 3 results. The
* subconn object was created on a subtransaction memory context that will
* be cleared when the main transaction ends. */
/* Original connection should be cleaned up along with all its
* results. The subconn object, however, was created on a subtransaction
* memory context that will be cleared when the main transaction ends. */
ASSERT_NUM_OPEN_CONNECTIONS(stats, 1);
ASSERT_NUM_OPEN_RESULTS(stats, 2);
ASSERT_NUM_OPEN_RESULTS(stats, 0);
remote_connection_stats_reset();
}