diff --git a/tsl/src/remote/connection.c b/tsl/src/remote/connection.c index 32f8e9e74..d2697cde5 100644 --- a/tsl/src/remote/connection.c +++ b/tsl/src/remote/connection.c @@ -662,7 +662,7 @@ remote_connection_xact_is_transitioning(const TSConnection *conn) } PGconn * -remote_connection_get_pg_conn(TSConnection *conn) +remote_connection_get_pg_conn(const TSConnection *conn) { Assert(conn != NULL); return conn->pg_conn; diff --git a/tsl/src/remote/connection.h b/tsl/src/remote/connection.h index 218148b2a..56b85d3d1 100644 --- a/tsl/src/remote/connection.h +++ b/tsl/src/remote/connection.h @@ -73,7 +73,7 @@ extern bool remote_connection_check_extension(TSConnection *conn, const char **o extern void remote_validate_extension_version(TSConnection *conn, const char *data_node_version); extern bool remote_connection_cancel_query(TSConnection *conn); -extern PGconn *remote_connection_get_pg_conn(TSConnection *conn); +extern PGconn *remote_connection_get_pg_conn(const TSConnection *conn); extern bool remote_connection_is_processing(const TSConnection *conn); extern void remote_connection_set_processing(TSConnection *conn, bool processing); extern bool remote_connection_configure_if_changed(TSConnection *conn); diff --git a/tsl/src/remote/dist_txn.c b/tsl/src/remote/dist_txn.c index 7d32f28ab..daf66aeed 100644 --- a/tsl/src/remote/dist_txn.c +++ b/tsl/src/remote/dist_txn.c @@ -20,16 +20,56 @@ #ifdef DEBUG -void (*testing_callback_call_hook)(const char *event) = NULL; +static const DistTransactionEventHandler *event_handler = NULL; +static const char *eventnames[MAX_DTXN_EVENT] = { + [DTXN_EVENT_ANY] = "any", + [DTXN_EVENT_PRE_COMMIT] = "pre-commit", + [DTXN_EVENT_WAIT_COMMIT] = "waiting-commit", + [DTXN_EVENT_PRE_ABORT] = "pre-abort", + [DTXN_EVENT_PRE_PREPARE] = "pre-prepare-transaction", + [DTXN_EVENT_WAIT_PREPARE] = "waiting-prepare-transaction", + [DTXN_EVENT_POST_PREPARE] = "post-prepare-transaction", + [DTXN_EVENT_PRE_COMMIT_PREPARED] = "pre-commit-prepared", + [DTXN_EVENT_WAIT_COMMIT_PREPARED] = "waiting-commit-prepared", + [DTXN_EVENT_SUB_XACT_ABORT] = "subxact-abort", +}; + +void +remote_dist_txn_set_event_handler(const DistTransactionEventHandler *handler) +{ + event_handler = handler; +} + +static inline void +eventcallback(const DistTransactionEvent event) +{ + if (NULL != event_handler && NULL != event_handler->handler) + event_handler->handler(event, event_handler->data); +} + +DistTransactionEvent +remote_dist_txn_event_from_name(const char *eventname) +{ + int i; + + for (i = 0; i < MAX_DTXN_EVENT; i++) + { + if (strcmp(eventname, eventnames[i]) == 0) + return i; + } + + elog(ERROR, "invalid event name"); + pg_unreachable(); +} + +const char * +remote_dist_txn_event_name(const DistTransactionEvent event) +{ + return eventnames[event]; +} -#define testing_callback_call(event) \ - do \ - { \ - if (testing_callback_call_hook != NULL) \ - testing_callback_call_hook(event); \ - } while (0) #else -#define testing_callback_call(event) \ +#define eventcallback(event) \ do \ { \ } while (0) @@ -88,7 +128,7 @@ dist_txn_xact_callback_1pc_pre_commit() RemoteTxn *remote_txn; AsyncRequestSet *ars = async_request_set_create(); - testing_callback_call("pre-commit"); + eventcallback(DTXN_EVENT_PRE_COMMIT); /* send a commit to all connections */ remote_txn_store_foreach(store, remote_txn) @@ -99,7 +139,7 @@ dist_txn_xact_callback_1pc_pre_commit() async_request_set_add(ars, remote_txn_async_send_commit(remote_txn)); } - testing_callback_call("waiting-commit"); + eventcallback(DTXN_EVENT_WAIT_COMMIT); /* async collect all the replies */ async_request_set_wait_all_ok_commands(ars); @@ -112,7 +152,7 @@ dist_txn_xact_callback_abort() { RemoteTxn *remote_txn; - testing_callback_call("pre-abort"); + eventcallback(DTXN_EVENT_PRE_ABORT); remote_txn_store_foreach(store, remote_txn) { @@ -245,7 +285,7 @@ dist_txn_send_prepare_transaction() AsyncResponse *error_response = NULL; AsyncResponse *res; - testing_callback_call("pre-prepare-transaction"); + eventcallback(DTXN_EVENT_PRE_PREPARE); /* send a prepare transaction to all connections */ remote_txn_store_foreach(store, remote_txn) @@ -257,7 +297,7 @@ dist_txn_send_prepare_transaction() async_request_set_add(ars, req); } - testing_callback_call("waiting-prepare-transaction"); + eventcallback(DTXN_EVENT_WAIT_PREPARE); /* * async collect the replies. Since errors in PREPARE TRANSACTION are not @@ -303,7 +343,7 @@ dist_txn_send_prepare_transaction() if (error_response != NULL) async_response_report_error(error_response, ERROR); - testing_callback_call("post-prepare-transaction"); + eventcallback(DTXN_EVENT_POST_PREPARE); } static void @@ -332,7 +372,7 @@ dist_txn_send_commit_prepared_transaction() async_request_set_add(ars, req); } - testing_callback_call("waiting-commit-prepared"); + eventcallback(DTXN_EVENT_WAIT_COMMIT_PREPARED); /* async collect the replies */ while ((res = async_request_set_wait_any_response(ars))) @@ -397,7 +437,7 @@ dist_txn_xact_callback_2pc(XactEvent event, void *arg) break; case XACT_EVENT_PARALLEL_COMMIT: case XACT_EVENT_COMMIT: - testing_callback_call("pre-commit-prepared"); + eventcallback(DTXN_EVENT_PRE_COMMIT_PREPARED); /* * We send a commit here so that future commands on this @@ -461,7 +501,7 @@ dist_txn_subxact_callback(SubXactEvent event, SubTransactionId mySubid, reject_transactions_with_incomplete_transitions(); if (event == SUBXACT_EVENT_ABORT_SUB) - testing_callback_call("subxact-abort"); + eventcallback(DTXN_EVENT_SUB_XACT_ABORT); curlevel = GetCurrentTransactionNestLevel(); diff --git a/tsl/src/remote/dist_txn.h b/tsl/src/remote/dist_txn.h index 8ee7d1365..90e28d471 100644 --- a/tsl/src/remote/dist_txn.h +++ b/tsl/src/remote/dist_txn.h @@ -18,7 +18,33 @@ extern TSConnection *remote_dist_txn_get_connection(TSConnectionId id, RemoteTxnPrepStmtOption prep_stmt); #ifdef DEBUG -extern void (*testing_callback_call_hook)(const char *event); + +typedef enum DistTransactionEvent +{ + DTXN_EVENT_ANY, + DTXN_EVENT_PRE_COMMIT, + DTXN_EVENT_WAIT_COMMIT, + DTXN_EVENT_PRE_ABORT, + DTXN_EVENT_PRE_PREPARE, + DTXN_EVENT_WAIT_PREPARE, + DTXN_EVENT_POST_PREPARE, + DTXN_EVENT_PRE_COMMIT_PREPARED, + DTXN_EVENT_WAIT_COMMIT_PREPARED, + DTXN_EVENT_SUB_XACT_ABORT, +} DistTransactionEvent; + +#define MAX_DTXN_EVENT (DTXN_EVENT_SUB_XACT_ABORT + 1) + +typedef struct DistTransactionEventHandler +{ + void (*handler)(const DistTransactionEvent event, void *data); + void *data; +} DistTransactionEventHandler; + +extern void remote_dist_txn_set_event_handler(const DistTransactionEventHandler *handler); +extern DistTransactionEvent remote_dist_txn_event_from_name(const char *eventname); +extern const char *remote_dist_txn_event_name(const DistTransactionEvent event); + #endif void _remote_dist_txn_init(void); diff --git a/tsl/test/src/remote/async.c b/tsl/test/src/remote/async.c index f3bbf54bf..c96a269d4 100644 --- a/tsl/test/src/remote/async.c +++ b/tsl/test/src/remote/async.c @@ -21,6 +21,7 @@ #include "connection.h" #include "test_utils.h" #include "node_killer.h" +#include "remote/dist_txn.h" TS_FUNCTION_INFO_V1(ts_test_remote_async); @@ -204,13 +205,13 @@ test_node_death() AsyncResponse *response; PGresult *pg_res; AsyncRequestSet *set; - RemoteNodeKiller *rnk; + RemoteNodeKiller rnk; /* killed node causes an error response, then a communication error */ - rnk = remote_node_killer_init(conn); + remote_node_killer_init(&rnk, conn, DTXN_EVENT_ANY); set = async_request_set_create(); async_request_set_add_sql(set, conn, "SELECT 1"); - remote_node_killer_kill(rnk); + remote_node_killer_kill(&rnk); response = async_request_set_wait_any_response_deadline(set, TS_NO_TIMEOUT); TestAssertTrue(async_response_get_type(response) == RESPONSE_RESULT); result = (AsyncResponseResult *) response; @@ -233,10 +234,10 @@ test_node_death() /* test error throwing in async_request_set_wait_any_result */ conn = get_connection(); - rnk = remote_node_killer_init(conn); + remote_node_killer_init(&rnk, conn, DTXN_EVENT_ANY); set = async_request_set_create(); async_request_set_add_sql(set, conn, "SELECT 1"); - remote_node_killer_kill(rnk); + remote_node_killer_kill(&rnk); /* first we get error result */ TestAssertTrue(NULL != async_request_set_wait_any_result(set)); @@ -244,18 +245,19 @@ test_node_death() /* do cancel query before first response */ conn = get_connection(); - rnk = remote_node_killer_init(conn); + remote_node_killer_init(&rnk, conn, DTXN_EVENT_ANY); set = async_request_set_create(); async_request_set_add_sql(set, conn, "SELECT 1"); - remote_node_killer_kill(rnk); + + remote_node_killer_kill(&rnk); TestAssertTrue(false == remote_connection_cancel_query(conn)); /* do cancel query after seeing error */ conn = get_connection(); - rnk = remote_node_killer_init(conn); + remote_node_killer_init(&rnk, conn, DTXN_EVENT_ANY); set = async_request_set_create(); async_request_set_add_sql(set, conn, "SELECT 1"); - remote_node_killer_kill(rnk); + remote_node_killer_kill(&rnk); /* first we get error result */ TestEnsureError(async_request_set_wait_ok_result(set)); diff --git a/tsl/test/src/remote/connection.c b/tsl/test/src/remote/connection.c index 0d95ed207..8b54f8a01 100644 --- a/tsl/test/src/remote/connection.c +++ b/tsl/test/src/remote/connection.c @@ -244,7 +244,7 @@ ts_test_remote_connection(PG_FUNCTION_ARGS) } pid_t -remote_connecton_get_remote_pid(TSConnection *conn) +remote_connection_get_remote_pid(const TSConnection *conn) { PGresult *res; char *pid_string; @@ -255,8 +255,8 @@ remote_connecton_get_remote_pid(TSConnection *conn) if (PQresultStatus(res) != PGRES_TUPLES_OK) return 0; - TestAssertTrue(1 == PQntuples(res)); - TestAssertTrue(1 == PQnfields(res)); + Assert(1 == PQntuples(res)); + Assert(1 == PQnfields(res)); pid_string = PQgetvalue(res, 0, 0); pid_long = strtol(pid_string, NULL, 10); @@ -266,7 +266,7 @@ remote_connecton_get_remote_pid(TSConnection *conn) } char * -remote_connecton_get_application_name(TSConnection *conn) +remote_connection_get_application_name(const TSConnection *conn) { PGresult *res; char *app_name; @@ -276,8 +276,8 @@ remote_connecton_get_application_name(TSConnection *conn) if (PQresultStatus(res) != PGRES_TUPLES_OK) return 0; - TestAssertTrue(1 == PQntuples(res)); - TestAssertTrue(1 == PQnfields(res)); + Assert(1 == PQntuples(res)); + Assert(1 == PQnfields(res)); app_name = pstrdup(PQgetvalue(res, 0, 0)); diff --git a/tsl/test/src/remote/connection.h b/tsl/test/src/remote/connection.h index e163db11b..23eafa80f 100644 --- a/tsl/test/src/remote/connection.h +++ b/tsl/test/src/remote/connection.h @@ -12,7 +12,7 @@ #include extern TSConnection *get_connection(void); -extern pid_t remote_connecton_get_remote_pid(TSConnection *conn); -extern char *remote_connecton_get_application_name(TSConnection *conn); +extern pid_t remote_connection_get_remote_pid(const TSConnection *conn); +extern char *remote_connection_get_application_name(const TSConnection *conn); #endif /* TIMESCALEDB_TSL_TEST_REMOTE_CONNECTION_H */ diff --git a/tsl/test/src/remote/connection_cache.c b/tsl/test/src/remote/connection_cache.c index 2812a4c8a..76abe113b 100644 --- a/tsl/test/src/remote/connection_cache.c +++ b/tsl/test/src/remote/connection_cache.c @@ -52,17 +52,17 @@ test_basic_cache() GetUserId()); conn_1 = remote_connection_cache_get_connection(id_1); - pid_1 = remote_connecton_get_remote_pid(conn_1); + pid_1 = remote_connection_get_remote_pid(conn_1); TestAssertTrue(pid_1 != 0); conn_2 = remote_connection_cache_get_connection(id_2); - pid_2 = remote_connecton_get_remote_pid(conn_2); + pid_2 = remote_connection_get_remote_pid(conn_2); TestAssertTrue(pid_2 != 0); TestAssertTrue(pid_1 != pid_2); conn_1 = remote_connection_cache_get_connection(id_1); - pid_prime = remote_connecton_get_remote_pid(conn_1); + pid_prime = remote_connection_get_remote_pid(conn_1); TestAssertTrue(pid_1 == pid_prime); } @@ -145,7 +145,7 @@ test_invalidate_server() GetUserId()); conn_1 = remote_connection_cache_get_connection(id_1); - pid_1 = remote_connecton_get_remote_pid(conn_1); + pid_1 = remote_connection_get_remote_pid(conn_1); TestAssertTrue(pid_1 != 0); /* simulate an invalidation in another backend */ @@ -154,13 +154,13 @@ test_invalidate_server() /* Should get a different connection since we invalidated the foreign * server and didn't yet start a transaction on the remote node. */ conn_1 = remote_connection_cache_get_connection(id_1); - pid_prime = remote_connecton_get_remote_pid(conn_1); + pid_prime = remote_connection_get_remote_pid(conn_1); TestAssertTrue(pid_1 != pid_prime); /* Test that connections remain despite invalidations during ongoing * remote transaction */ pid_1 = pid_prime; - original_application_name = remote_connecton_get_application_name(conn_1); + original_application_name = remote_connection_get_application_name(conn_1); BeginInternalSubTransaction("sub1"); /* Start a remote transaction on the connection */ @@ -168,20 +168,20 @@ test_invalidate_server() invalidate_server(); conn_1 = remote_connection_cache_get_connection(id_1); - pid_prime = remote_connecton_get_remote_pid(conn_1); + pid_prime = remote_connection_get_remote_pid(conn_1); /* Connection should be the same despite invalidation since we're in a * transaction */ TestAssertTrue(pid_1 == pid_prime); TestAssertTrue( - strcmp(original_application_name, remote_connecton_get_application_name(conn_1)) == 0); + strcmp(original_application_name, remote_connection_get_application_name(conn_1)) == 0); RollbackAndReleaseCurrentSubTransaction(); /* After rollback, we're still in a remote transaction. Connection should * be the same. */ conn_1 = remote_connection_cache_get_connection(id_1); - pid_prime = remote_connecton_get_remote_pid(conn_1); + pid_prime = remote_connection_get_remote_pid(conn_1); TestAssertTrue(pid_1 == pid_prime); pid_1 = pid_prime; } @@ -199,14 +199,14 @@ test_remove() GetUserId()); conn_1 = remote_connection_cache_get_connection(id_1); - pid_1 = remote_connecton_get_remote_pid(conn_1); + pid_1 = remote_connection_get_remote_pid(conn_1); TestAssertTrue(pid_1 != 0); remote_connection_cache_remove(id_1); /* even using the same pin, get new connection */ conn_1 = remote_connection_cache_get_connection(id_1); - pid_prime = remote_connecton_get_remote_pid(conn_1); + pid_prime = remote_connection_get_remote_pid(conn_1); TestAssertTrue(pid_1 != pid_prime); pid_1 = pid_prime; @@ -215,7 +215,7 @@ test_remove() remote_connection_cache_remove(id_1); conn_1 = remote_connection_cache_get_connection(id_1); - pid_prime = remote_connecton_get_remote_pid(conn_1); + pid_prime = remote_connection_get_remote_pid(conn_1); TestAssertTrue(pid_1 != pid_prime); } diff --git a/tsl/test/src/remote/node_killer.c b/tsl/test/src/remote/node_killer.c index b68e5baaf..f45e559c1 100644 --- a/tsl/test/src/remote/node_killer.c +++ b/tsl/test/src/remote/node_killer.c @@ -17,88 +17,136 @@ #include "export.h" #include "remote/connection.h" #include "remote/dist_txn.h" + #include "test_utils.h" #include "connection.h" -typedef struct RemoteNodeKiller -{ - pid_t pid; - TSConnection *conn; -} RemoteNodeKiller; +static RemoteNodeKiller rnk_event = { + .conn = NULL, + .pid = -1, + .killevent = DTXN_EVENT_ANY, +}; -static char *kill_event = NULL; -static RemoteNodeKiller *rnk_event = NULL; +static void +kill_backend(const DistTransactionEvent event, void *data) +{ + RemoteNodeKiller *rnk = data; + + if (event == rnk->killevent || rnk->killevent == DTXN_EVENT_ANY) + { + elog(WARNING, "kill event: %s", remote_dist_txn_event_name(rnk->killevent)); + remote_node_killer_kill(&rnk_event); + remote_dist_txn_set_event_handler(NULL); + } +} + +static DistTransactionEventHandler eventhandler = { + .handler = kill_backend, + .data = &rnk_event, +}; TS_FUNCTION_INFO_V1(ts_remote_node_killer_set_event); -RemoteNodeKiller * -remote_node_killer_init(TSConnection *conn) +void +remote_node_killer_init(RemoteNodeKiller *rnk, const TSConnection *conn, + const DistTransactionEvent event) { - RemoteNodeKiller *rnk = palloc(sizeof(RemoteNodeKiller)); - + MemSet(rnk, 0, sizeof(*rnk)); rnk->conn = conn; - rnk->pid = remote_connecton_get_remote_pid(conn); - - /* do not throw error here on pid = 0 to avoid recursive abort */ - /* remote_connection_report_error(ERROR, res, conn, false, sql); */ - return rnk; + rnk->pid = remote_connection_get_remote_pid(conn); + rnk->killevent = event; } +static int +kill_backends(pid_t pid, int sig) +{ +#ifdef HAVE_SETSID + /* If PostgreSQL is compiled with setsid support, then the running + * PostgreSQL processes are in a process group and the negative pid kills + * all the other processes in the same group. */ + return kill(-pid, SIGTERM); +#else + return kill(pid, SIGTERM); +#endif +} + +/* + * Kill a remote process (data node). + * + * This only works if the process is on the same instance, which is true for + * regression tests. + * + * The function should not throw errors since it might be invoked from a + * transaction abort handler, which itself is invoked by errors. Throwing an + * error could cause unexpected recursion, and crash. + */ void remote_node_killer_kill(RemoteNodeKiller *rnk) { + PGPROC *proc; + int ret; + + if (rnk->num_kills > 0) + elog(WARNING, "cannot kill backend twice on the same event"); + /* * do not use pg_terminate_backend here because that does permission * checks through the catalog which requires you to be in a transaction */ - PGPROC *proc = BackendPidGetProc(rnk->pid); + proc = BackendPidGetProc(rnk->pid); if (proc == NULL) ereport(WARNING, (errmsg("PID %d is not a PostgreSQL server process", rnk->pid))); - kill_event = NULL; -#ifdef HAVE_SETSID - if (kill(-rnk->pid, SIGTERM)) -#else - if (kill(rnk->pid, SIGTERM)) -#endif + + rnk->num_kills++; + + ret = kill_backends(rnk->pid, SIGTERM); + + if (ret != 0) ereport(WARNING, (errmsg("could not send signal to process %d: %m", rnk->pid))); + else + { + /* Ensure that the backend is dead before proceeding. Otherwise, we + * might end up in a race condition where we check the status of the + * backend before it has actually exited, leading to flaky testing. */ + unsigned wait_count = 300; + + while (BackendPidGetProc(rnk->pid) != NULL) + { + if (wait_count == 0) + { + elog(WARNING, "timeout while waiting for killed backend to exit"); + break; + } + + wait_count--; + pg_usleep(100L); + } + } } Datum ts_remote_node_killer_set_event(PG_FUNCTION_ARGS) { - Datum arg1 = PG_GETARG_DATUM(0); - Datum arg2 = PG_GETARG_DATUM(1); - char *event; - char *server_name; + const char *event_name = PG_ARGISNULL(0) ? NULL : TextDatumGetCString(PG_GETARG_DATUM(0)); + const char *server_name = PG_ARGISNULL(1) ? NULL : TextDatumGetCString(PG_GETARG_DATUM(1)); + DistTransactionEvent event; TSConnectionId id; - MemoryContext ctx; + TSConnection *conn; - testing_callback_call_hook = remote_node_killer_kill_on_event; + if (NULL == event_name || NULL == server_name) + elog(ERROR, "invalid argument"); - ctx = MemoryContextSwitchTo(TopTransactionContext); - - event = TextDatumGetCString(arg1); - kill_event = event; - - server_name = TextDatumGetCString(arg2); + remote_dist_txn_set_event_handler(&eventhandler); remote_connection_id_set(&id, GetForeignServerByName(server_name, false)->serverid, GetUserId()); - rnk_event = - remote_node_killer_init(remote_dist_txn_get_connection(id, REMOTE_TXN_NO_PREP_STMT)); - MemoryContextSwitchTo(ctx); + conn = remote_dist_txn_get_connection(id, REMOTE_TXN_NO_PREP_STMT); + Assert(conn); + event = remote_dist_txn_event_from_name(event_name); + remote_node_killer_init(&rnk_event, conn, event); + PG_RETURN_VOID(); } - -extern void -remote_node_killer_kill_on_event(const char *event) -{ - if (kill_event != NULL && strcmp(kill_event, event) == 0) - { - elog(WARNING, "kill event: %s", event); - remote_node_killer_kill(rnk_event); - } -} diff --git a/tsl/test/src/remote/node_killer.h b/tsl/test/src/remote/node_killer.h index a8afb311c..e2c9ba883 100644 --- a/tsl/test/src/remote/node_killer.h +++ b/tsl/test/src/remote/node_killer.h @@ -10,9 +10,18 @@ #include #include "remote/connection.h" +#include "remote/dist_txn.h" -typedef struct RemoteNodeKiller RemoteNodeKiller; -extern RemoteNodeKiller *remote_node_killer_init(TSConnection *conn); +typedef struct RemoteNodeKiller +{ + pid_t pid; + DistTransactionEvent killevent; + const TSConnection *conn; + int num_kills; +} RemoteNodeKiller; + +extern void remote_node_killer_init(RemoteNodeKiller *rnk, const TSConnection *conn, + const DistTransactionEvent event); void remote_node_killer_kill(RemoteNodeKiller *rnk); extern void remote_node_killer_kill_on_event(const char *event);