Minor refactor of distributed xact handling

Cleanup some inconsistencies and address nits in abort handlers for
distributed transactions.
This commit is contained in:
Erik Nordström 2019-11-28 16:51:47 +01:00 committed by Erik Nordström
parent 506b1189b1
commit c0e0f673b8
5 changed files with 61 additions and 34 deletions

View File

@ -146,7 +146,16 @@ dist_txn_xact_callback_1pc_pre_commit()
dist_txn_deallocate_prepared_stmts_if_needed(); dist_txn_deallocate_prepared_stmts_if_needed();
} }
/* On abort on the frontend send aborts to all of the remote endpoints */ /*
* Abort on the access node.
*
* The access node needs to send aborts to all of the remote endpoints. This
* code should not throw errors itself, since we are already in abort due to a
* previous error. Instead, we try to emit errors as warnings. For safety, we
* should probaby try-catch and swallow any potential lower-layer errors given
* that we're doing remote calls over the network. But the semantics for
* capturing and proceeding after such recursive errors are unclear.
*/
static void static void
dist_txn_xact_callback_abort() dist_txn_xact_callback_abort()
{ {
@ -493,15 +502,19 @@ dist_txn_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
if (store == NULL) if (store == NULL)
return; return;
switch (event)
{
case SUBXACT_EVENT_START_SUB:
case SUBXACT_EVENT_COMMIT_SUB:
/* Nothing to do at subxact start, nor after commit. */ /* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB || event == SUBXACT_EVENT_ABORT_SUB))
return; return;
case SUBXACT_EVENT_PRE_COMMIT_SUB:
if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
reject_transactions_with_incomplete_transitions(); reject_transactions_with_incomplete_transitions();
break;
if (event == SUBXACT_EVENT_ABORT_SUB) case SUBXACT_EVENT_ABORT_SUB:
eventcallback(DTXN_EVENT_SUB_XACT_ABORT); eventcallback(DTXN_EVENT_SUB_XACT_ABORT);
break;
}
curlevel = GetCurrentTransactionNestLevel(); curlevel = GetCurrentTransactionNestLevel();

View File

@ -45,7 +45,7 @@ extern void remote_dist_txn_set_event_handler(const DistTransactionEventHandler
extern DistTransactionEvent remote_dist_txn_event_from_name(const char *eventname); extern DistTransactionEvent remote_dist_txn_event_from_name(const char *eventname);
extern const char *remote_dist_txn_event_name(const DistTransactionEvent event); extern const char *remote_dist_txn_event_name(const DistTransactionEvent event);
#endif #endif /* DEBUG */
void _remote_dist_txn_init(void); void _remote_dist_txn_init(void);
void _remote_dist_txn_fini(void); void _remote_dist_txn_fini(void);

View File

@ -3,6 +3,7 @@
* Please see the included NOTICE for copyright information and * Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license. * LICENSE-TIMESCALE for a copy of the license.
*/ */
#include "libpq-fe.h"
#include <postgres.h> #include <postgres.h>
#include <access/xact.h> #include <access/xact.h>
#include <utils/builtins.h> #include <utils/builtins.h>
@ -299,14 +300,18 @@ remote_txn_check_for_leaked_prepared_statements(RemoteTxn *entry)
{ {
PGresult *res; PGresult *res;
char *count_string; char *count_string;
ExecStatusType status;
if (PQTRANS_IDLE != PQtransactionStatus(remote_connection_get_pg_conn(entry->conn))) if (PQTRANS_IDLE != PQtransactionStatus(remote_connection_get_pg_conn(entry->conn)))
return; return;
res = remote_connection_exec(entry->conn, "SELECT count(*) FROM pg_prepared_statements"); res = remote_connection_exec(entry->conn, "SELECT count(*) FROM pg_prepared_statements");
if (PQresultStatus(res) == PGRES_TUPLES_OK) status = PQresultStatus(res);
switch (status)
{ {
case PGRES_TUPLES_OK:
if (PQntuples(res) == 1 && PQnfields(res) == 1) if (PQntuples(res) == 1 && PQnfields(res) == 1)
{ {
count_string = PQgetvalue(res, 0, 0); count_string = PQgetvalue(res, 0, 0);
@ -314,13 +319,16 @@ remote_txn_check_for_leaked_prepared_statements(RemoteTxn *entry)
elog(WARNING, "leak check: connection leaked prepared statement"); elog(WARNING, "leak check: connection leaked prepared statement");
} }
else else
{ elog(ERROR, "leak check: unexpected number of rows or columns returned");
remote_result_close(res); break;
elog(ERROR, "leak check: incorrect number of rows or columns returned"); case PGRES_FATAL_ERROR:
case PGRES_NONFATAL_ERROR:
elog(WARNING, "leak check: ERROR [\"%s\"]", PQresultErrorMessage(res));
break;
default:
elog(WARNING, "leak check: unexpected result state %u", status);
break;
} }
}
else
elog(WARNING, "leak check: unexpected result \"%s\"", PQresultErrorMessage(res));
remote_result_close(res); remote_result_close(res);
} }

View File

@ -25,10 +25,6 @@
#include "test_utils.h" #include "test_utils.h"
#include "connection.h" #include "connection.h"
static const char *sql_get_backend_pid = "SELECT pg_backend_pid()";
static const char *sql_get_application_name =
"SELECT application_name from pg_stat_activity where pid = pg_backend_pid()";
TSConnection * TSConnection *
get_connection() get_connection()
{ {
@ -250,10 +246,10 @@ remote_connection_get_remote_pid(const TSConnection *conn)
char *pid_string; char *pid_string;
unsigned long pid_long; unsigned long pid_long;
res = PQexec(remote_connection_get_pg_conn(conn), sql_get_backend_pid); res = PQexec(remote_connection_get_pg_conn(conn), "SELECT pg_backend_pid()");
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
return 0; return -1;
Assert(1 == PQntuples(res)); Assert(1 == PQntuples(res));
Assert(1 == PQnfields(res)); Assert(1 == PQnfields(res));
@ -271,7 +267,10 @@ remote_connection_get_application_name(const TSConnection *conn)
PGresult *res; PGresult *res;
char *app_name; char *app_name;
res = PQexec(remote_connection_get_pg_conn(conn), sql_get_application_name); res = PQexec(remote_connection_get_pg_conn(conn),
"SELECT application_name "
"FROM pg_stat_activity "
"WHERE pid = pg_backend_pid()");
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
return 0; return 0;

View File

@ -51,9 +51,14 @@ void
remote_node_killer_init(RemoteNodeKiller *rnk, const TSConnection *conn, remote_node_killer_init(RemoteNodeKiller *rnk, const TSConnection *conn,
const DistTransactionEvent event) const DistTransactionEvent event)
{ {
int remote_pid = remote_connection_get_remote_pid(conn);
if (remote_pid == -1)
elog(ERROR, "could not get PID of remote backend process");
MemSet(rnk, 0, sizeof(*rnk)); MemSet(rnk, 0, sizeof(*rnk));
rnk->conn = conn; rnk->conn = conn;
rnk->pid = remote_connection_get_remote_pid(conn); rnk->pid = remote_pid;
rnk->killevent = event; rnk->killevent = event;
} }
@ -118,10 +123,12 @@ remote_node_killer_kill(RemoteNodeKiller *rnk)
elog(WARNING, "timeout while waiting for killed backend to exit"); elog(WARNING, "timeout while waiting for killed backend to exit");
break; break;
} }
wait_count--; wait_count--;
pg_usleep(100L); pg_usleep(100L);
} }
/* Once PG registered the backend as gone, wait some additional time
* for it to really exit */
pg_usleep(500L);
} }
} }