mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-26 00:00:54 +08:00
Refactor node killer test functionality
This change refactors and cleans up some of the test infrastructure around distributed transactions. In particular, the node killer now waits for the killed process to exit in an attempt to make tests more predictible.
This commit is contained in:
parent
7f93faad02
commit
506b1189b1
@ -662,7 +662,7 @@ remote_connection_xact_is_transitioning(const TSConnection *conn)
|
|||||||
}
|
}
|
||||||
|
|
||||||
PGconn *
|
PGconn *
|
||||||
remote_connection_get_pg_conn(TSConnection *conn)
|
remote_connection_get_pg_conn(const TSConnection *conn)
|
||||||
{
|
{
|
||||||
Assert(conn != NULL);
|
Assert(conn != NULL);
|
||||||
return conn->pg_conn;
|
return conn->pg_conn;
|
||||||
|
@ -73,7 +73,7 @@ extern bool remote_connection_check_extension(TSConnection *conn, const char **o
|
|||||||
extern void remote_validate_extension_version(TSConnection *conn, const char *data_node_version);
|
extern void remote_validate_extension_version(TSConnection *conn, const char *data_node_version);
|
||||||
|
|
||||||
extern bool remote_connection_cancel_query(TSConnection *conn);
|
extern bool remote_connection_cancel_query(TSConnection *conn);
|
||||||
extern PGconn *remote_connection_get_pg_conn(TSConnection *conn);
|
extern PGconn *remote_connection_get_pg_conn(const TSConnection *conn);
|
||||||
extern bool remote_connection_is_processing(const TSConnection *conn);
|
extern bool remote_connection_is_processing(const TSConnection *conn);
|
||||||
extern void remote_connection_set_processing(TSConnection *conn, bool processing);
|
extern void remote_connection_set_processing(TSConnection *conn, bool processing);
|
||||||
extern bool remote_connection_configure_if_changed(TSConnection *conn);
|
extern bool remote_connection_configure_if_changed(TSConnection *conn);
|
||||||
|
@ -20,16 +20,56 @@
|
|||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
|
|
||||||
void (*testing_callback_call_hook)(const char *event) = NULL;
|
static const DistTransactionEventHandler *event_handler = NULL;
|
||||||
|
static const char *eventnames[MAX_DTXN_EVENT] = {
|
||||||
|
[DTXN_EVENT_ANY] = "any",
|
||||||
|
[DTXN_EVENT_PRE_COMMIT] = "pre-commit",
|
||||||
|
[DTXN_EVENT_WAIT_COMMIT] = "waiting-commit",
|
||||||
|
[DTXN_EVENT_PRE_ABORT] = "pre-abort",
|
||||||
|
[DTXN_EVENT_PRE_PREPARE] = "pre-prepare-transaction",
|
||||||
|
[DTXN_EVENT_WAIT_PREPARE] = "waiting-prepare-transaction",
|
||||||
|
[DTXN_EVENT_POST_PREPARE] = "post-prepare-transaction",
|
||||||
|
[DTXN_EVENT_PRE_COMMIT_PREPARED] = "pre-commit-prepared",
|
||||||
|
[DTXN_EVENT_WAIT_COMMIT_PREPARED] = "waiting-commit-prepared",
|
||||||
|
[DTXN_EVENT_SUB_XACT_ABORT] = "subxact-abort",
|
||||||
|
};
|
||||||
|
|
||||||
|
void
|
||||||
|
remote_dist_txn_set_event_handler(const DistTransactionEventHandler *handler)
|
||||||
|
{
|
||||||
|
event_handler = handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void
|
||||||
|
eventcallback(const DistTransactionEvent event)
|
||||||
|
{
|
||||||
|
if (NULL != event_handler && NULL != event_handler->handler)
|
||||||
|
event_handler->handler(event, event_handler->data);
|
||||||
|
}
|
||||||
|
|
||||||
|
DistTransactionEvent
|
||||||
|
remote_dist_txn_event_from_name(const char *eventname)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i = 0; i < MAX_DTXN_EVENT; i++)
|
||||||
|
{
|
||||||
|
if (strcmp(eventname, eventnames[i]) == 0)
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
|
||||||
|
elog(ERROR, "invalid event name");
|
||||||
|
pg_unreachable();
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *
|
||||||
|
remote_dist_txn_event_name(const DistTransactionEvent event)
|
||||||
|
{
|
||||||
|
return eventnames[event];
|
||||||
|
}
|
||||||
|
|
||||||
#define testing_callback_call(event) \
|
|
||||||
do \
|
|
||||||
{ \
|
|
||||||
if (testing_callback_call_hook != NULL) \
|
|
||||||
testing_callback_call_hook(event); \
|
|
||||||
} while (0)
|
|
||||||
#else
|
#else
|
||||||
#define testing_callback_call(event) \
|
#define eventcallback(event) \
|
||||||
do \
|
do \
|
||||||
{ \
|
{ \
|
||||||
} while (0)
|
} while (0)
|
||||||
@ -88,7 +128,7 @@ dist_txn_xact_callback_1pc_pre_commit()
|
|||||||
RemoteTxn *remote_txn;
|
RemoteTxn *remote_txn;
|
||||||
AsyncRequestSet *ars = async_request_set_create();
|
AsyncRequestSet *ars = async_request_set_create();
|
||||||
|
|
||||||
testing_callback_call("pre-commit");
|
eventcallback(DTXN_EVENT_PRE_COMMIT);
|
||||||
|
|
||||||
/* send a commit to all connections */
|
/* send a commit to all connections */
|
||||||
remote_txn_store_foreach(store, remote_txn)
|
remote_txn_store_foreach(store, remote_txn)
|
||||||
@ -99,7 +139,7 @@ dist_txn_xact_callback_1pc_pre_commit()
|
|||||||
async_request_set_add(ars, remote_txn_async_send_commit(remote_txn));
|
async_request_set_add(ars, remote_txn_async_send_commit(remote_txn));
|
||||||
}
|
}
|
||||||
|
|
||||||
testing_callback_call("waiting-commit");
|
eventcallback(DTXN_EVENT_WAIT_COMMIT);
|
||||||
|
|
||||||
/* async collect all the replies */
|
/* async collect all the replies */
|
||||||
async_request_set_wait_all_ok_commands(ars);
|
async_request_set_wait_all_ok_commands(ars);
|
||||||
@ -112,7 +152,7 @@ dist_txn_xact_callback_abort()
|
|||||||
{
|
{
|
||||||
RemoteTxn *remote_txn;
|
RemoteTxn *remote_txn;
|
||||||
|
|
||||||
testing_callback_call("pre-abort");
|
eventcallback(DTXN_EVENT_PRE_ABORT);
|
||||||
|
|
||||||
remote_txn_store_foreach(store, remote_txn)
|
remote_txn_store_foreach(store, remote_txn)
|
||||||
{
|
{
|
||||||
@ -245,7 +285,7 @@ dist_txn_send_prepare_transaction()
|
|||||||
AsyncResponse *error_response = NULL;
|
AsyncResponse *error_response = NULL;
|
||||||
AsyncResponse *res;
|
AsyncResponse *res;
|
||||||
|
|
||||||
testing_callback_call("pre-prepare-transaction");
|
eventcallback(DTXN_EVENT_PRE_PREPARE);
|
||||||
|
|
||||||
/* send a prepare transaction to all connections */
|
/* send a prepare transaction to all connections */
|
||||||
remote_txn_store_foreach(store, remote_txn)
|
remote_txn_store_foreach(store, remote_txn)
|
||||||
@ -257,7 +297,7 @@ dist_txn_send_prepare_transaction()
|
|||||||
async_request_set_add(ars, req);
|
async_request_set_add(ars, req);
|
||||||
}
|
}
|
||||||
|
|
||||||
testing_callback_call("waiting-prepare-transaction");
|
eventcallback(DTXN_EVENT_WAIT_PREPARE);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* async collect the replies. Since errors in PREPARE TRANSACTION are not
|
* async collect the replies. Since errors in PREPARE TRANSACTION are not
|
||||||
@ -303,7 +343,7 @@ dist_txn_send_prepare_transaction()
|
|||||||
if (error_response != NULL)
|
if (error_response != NULL)
|
||||||
async_response_report_error(error_response, ERROR);
|
async_response_report_error(error_response, ERROR);
|
||||||
|
|
||||||
testing_callback_call("post-prepare-transaction");
|
eventcallback(DTXN_EVENT_POST_PREPARE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -332,7 +372,7 @@ dist_txn_send_commit_prepared_transaction()
|
|||||||
async_request_set_add(ars, req);
|
async_request_set_add(ars, req);
|
||||||
}
|
}
|
||||||
|
|
||||||
testing_callback_call("waiting-commit-prepared");
|
eventcallback(DTXN_EVENT_WAIT_COMMIT_PREPARED);
|
||||||
|
|
||||||
/* async collect the replies */
|
/* async collect the replies */
|
||||||
while ((res = async_request_set_wait_any_response(ars)))
|
while ((res = async_request_set_wait_any_response(ars)))
|
||||||
@ -397,7 +437,7 @@ dist_txn_xact_callback_2pc(XactEvent event, void *arg)
|
|||||||
break;
|
break;
|
||||||
case XACT_EVENT_PARALLEL_COMMIT:
|
case XACT_EVENT_PARALLEL_COMMIT:
|
||||||
case XACT_EVENT_COMMIT:
|
case XACT_EVENT_COMMIT:
|
||||||
testing_callback_call("pre-commit-prepared");
|
eventcallback(DTXN_EVENT_PRE_COMMIT_PREPARED);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We send a commit here so that future commands on this
|
* We send a commit here so that future commands on this
|
||||||
@ -461,7 +501,7 @@ dist_txn_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
|
|||||||
reject_transactions_with_incomplete_transitions();
|
reject_transactions_with_incomplete_transitions();
|
||||||
|
|
||||||
if (event == SUBXACT_EVENT_ABORT_SUB)
|
if (event == SUBXACT_EVENT_ABORT_SUB)
|
||||||
testing_callback_call("subxact-abort");
|
eventcallback(DTXN_EVENT_SUB_XACT_ABORT);
|
||||||
|
|
||||||
curlevel = GetCurrentTransactionNestLevel();
|
curlevel = GetCurrentTransactionNestLevel();
|
||||||
|
|
||||||
|
@ -18,7 +18,33 @@ extern TSConnection *remote_dist_txn_get_connection(TSConnectionId id,
|
|||||||
RemoteTxnPrepStmtOption prep_stmt);
|
RemoteTxnPrepStmtOption prep_stmt);
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
extern void (*testing_callback_call_hook)(const char *event);
|
|
||||||
|
typedef enum DistTransactionEvent
|
||||||
|
{
|
||||||
|
DTXN_EVENT_ANY,
|
||||||
|
DTXN_EVENT_PRE_COMMIT,
|
||||||
|
DTXN_EVENT_WAIT_COMMIT,
|
||||||
|
DTXN_EVENT_PRE_ABORT,
|
||||||
|
DTXN_EVENT_PRE_PREPARE,
|
||||||
|
DTXN_EVENT_WAIT_PREPARE,
|
||||||
|
DTXN_EVENT_POST_PREPARE,
|
||||||
|
DTXN_EVENT_PRE_COMMIT_PREPARED,
|
||||||
|
DTXN_EVENT_WAIT_COMMIT_PREPARED,
|
||||||
|
DTXN_EVENT_SUB_XACT_ABORT,
|
||||||
|
} DistTransactionEvent;
|
||||||
|
|
||||||
|
#define MAX_DTXN_EVENT (DTXN_EVENT_SUB_XACT_ABORT + 1)
|
||||||
|
|
||||||
|
typedef struct DistTransactionEventHandler
|
||||||
|
{
|
||||||
|
void (*handler)(const DistTransactionEvent event, void *data);
|
||||||
|
void *data;
|
||||||
|
} DistTransactionEventHandler;
|
||||||
|
|
||||||
|
extern void remote_dist_txn_set_event_handler(const DistTransactionEventHandler *handler);
|
||||||
|
extern DistTransactionEvent remote_dist_txn_event_from_name(const char *eventname);
|
||||||
|
extern const char *remote_dist_txn_event_name(const DistTransactionEvent event);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void _remote_dist_txn_init(void);
|
void _remote_dist_txn_init(void);
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
#include "connection.h"
|
#include "connection.h"
|
||||||
#include "test_utils.h"
|
#include "test_utils.h"
|
||||||
#include "node_killer.h"
|
#include "node_killer.h"
|
||||||
|
#include "remote/dist_txn.h"
|
||||||
|
|
||||||
TS_FUNCTION_INFO_V1(ts_test_remote_async);
|
TS_FUNCTION_INFO_V1(ts_test_remote_async);
|
||||||
|
|
||||||
@ -204,13 +205,13 @@ test_node_death()
|
|||||||
AsyncResponse *response;
|
AsyncResponse *response;
|
||||||
PGresult *pg_res;
|
PGresult *pg_res;
|
||||||
AsyncRequestSet *set;
|
AsyncRequestSet *set;
|
||||||
RemoteNodeKiller *rnk;
|
RemoteNodeKiller rnk;
|
||||||
|
|
||||||
/* killed node causes an error response, then a communication error */
|
/* killed node causes an error response, then a communication error */
|
||||||
rnk = remote_node_killer_init(conn);
|
remote_node_killer_init(&rnk, conn, DTXN_EVENT_ANY);
|
||||||
set = async_request_set_create();
|
set = async_request_set_create();
|
||||||
async_request_set_add_sql(set, conn, "SELECT 1");
|
async_request_set_add_sql(set, conn, "SELECT 1");
|
||||||
remote_node_killer_kill(rnk);
|
remote_node_killer_kill(&rnk);
|
||||||
response = async_request_set_wait_any_response_deadline(set, TS_NO_TIMEOUT);
|
response = async_request_set_wait_any_response_deadline(set, TS_NO_TIMEOUT);
|
||||||
TestAssertTrue(async_response_get_type(response) == RESPONSE_RESULT);
|
TestAssertTrue(async_response_get_type(response) == RESPONSE_RESULT);
|
||||||
result = (AsyncResponseResult *) response;
|
result = (AsyncResponseResult *) response;
|
||||||
@ -233,10 +234,10 @@ test_node_death()
|
|||||||
|
|
||||||
/* test error throwing in async_request_set_wait_any_result */
|
/* test error throwing in async_request_set_wait_any_result */
|
||||||
conn = get_connection();
|
conn = get_connection();
|
||||||
rnk = remote_node_killer_init(conn);
|
remote_node_killer_init(&rnk, conn, DTXN_EVENT_ANY);
|
||||||
set = async_request_set_create();
|
set = async_request_set_create();
|
||||||
async_request_set_add_sql(set, conn, "SELECT 1");
|
async_request_set_add_sql(set, conn, "SELECT 1");
|
||||||
remote_node_killer_kill(rnk);
|
remote_node_killer_kill(&rnk);
|
||||||
|
|
||||||
/* first we get error result */
|
/* first we get error result */
|
||||||
TestAssertTrue(NULL != async_request_set_wait_any_result(set));
|
TestAssertTrue(NULL != async_request_set_wait_any_result(set));
|
||||||
@ -244,18 +245,19 @@ test_node_death()
|
|||||||
|
|
||||||
/* do cancel query before first response */
|
/* do cancel query before first response */
|
||||||
conn = get_connection();
|
conn = get_connection();
|
||||||
rnk = remote_node_killer_init(conn);
|
remote_node_killer_init(&rnk, conn, DTXN_EVENT_ANY);
|
||||||
set = async_request_set_create();
|
set = async_request_set_create();
|
||||||
async_request_set_add_sql(set, conn, "SELECT 1");
|
async_request_set_add_sql(set, conn, "SELECT 1");
|
||||||
remote_node_killer_kill(rnk);
|
|
||||||
|
remote_node_killer_kill(&rnk);
|
||||||
TestAssertTrue(false == remote_connection_cancel_query(conn));
|
TestAssertTrue(false == remote_connection_cancel_query(conn));
|
||||||
|
|
||||||
/* do cancel query after seeing error */
|
/* do cancel query after seeing error */
|
||||||
conn = get_connection();
|
conn = get_connection();
|
||||||
rnk = remote_node_killer_init(conn);
|
remote_node_killer_init(&rnk, conn, DTXN_EVENT_ANY);
|
||||||
set = async_request_set_create();
|
set = async_request_set_create();
|
||||||
async_request_set_add_sql(set, conn, "SELECT 1");
|
async_request_set_add_sql(set, conn, "SELECT 1");
|
||||||
remote_node_killer_kill(rnk);
|
remote_node_killer_kill(&rnk);
|
||||||
|
|
||||||
/* first we get error result */
|
/* first we get error result */
|
||||||
TestEnsureError(async_request_set_wait_ok_result(set));
|
TestEnsureError(async_request_set_wait_ok_result(set));
|
||||||
|
@ -244,7 +244,7 @@ ts_test_remote_connection(PG_FUNCTION_ARGS)
|
|||||||
}
|
}
|
||||||
|
|
||||||
pid_t
|
pid_t
|
||||||
remote_connecton_get_remote_pid(TSConnection *conn)
|
remote_connection_get_remote_pid(const TSConnection *conn)
|
||||||
{
|
{
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
char *pid_string;
|
char *pid_string;
|
||||||
@ -255,8 +255,8 @@ remote_connecton_get_remote_pid(TSConnection *conn)
|
|||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
TestAssertTrue(1 == PQntuples(res));
|
Assert(1 == PQntuples(res));
|
||||||
TestAssertTrue(1 == PQnfields(res));
|
Assert(1 == PQnfields(res));
|
||||||
|
|
||||||
pid_string = PQgetvalue(res, 0, 0);
|
pid_string = PQgetvalue(res, 0, 0);
|
||||||
pid_long = strtol(pid_string, NULL, 10);
|
pid_long = strtol(pid_string, NULL, 10);
|
||||||
@ -266,7 +266,7 @@ remote_connecton_get_remote_pid(TSConnection *conn)
|
|||||||
}
|
}
|
||||||
|
|
||||||
char *
|
char *
|
||||||
remote_connecton_get_application_name(TSConnection *conn)
|
remote_connection_get_application_name(const TSConnection *conn)
|
||||||
{
|
{
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
char *app_name;
|
char *app_name;
|
||||||
@ -276,8 +276,8 @@ remote_connecton_get_application_name(TSConnection *conn)
|
|||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
TestAssertTrue(1 == PQntuples(res));
|
Assert(1 == PQntuples(res));
|
||||||
TestAssertTrue(1 == PQnfields(res));
|
Assert(1 == PQnfields(res));
|
||||||
|
|
||||||
app_name = pstrdup(PQgetvalue(res, 0, 0));
|
app_name = pstrdup(PQgetvalue(res, 0, 0));
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
#include <remote/connection.h>
|
#include <remote/connection.h>
|
||||||
|
|
||||||
extern TSConnection *get_connection(void);
|
extern TSConnection *get_connection(void);
|
||||||
extern pid_t remote_connecton_get_remote_pid(TSConnection *conn);
|
extern pid_t remote_connection_get_remote_pid(const TSConnection *conn);
|
||||||
extern char *remote_connecton_get_application_name(TSConnection *conn);
|
extern char *remote_connection_get_application_name(const TSConnection *conn);
|
||||||
|
|
||||||
#endif /* TIMESCALEDB_TSL_TEST_REMOTE_CONNECTION_H */
|
#endif /* TIMESCALEDB_TSL_TEST_REMOTE_CONNECTION_H */
|
||||||
|
@ -52,17 +52,17 @@ test_basic_cache()
|
|||||||
GetUserId());
|
GetUserId());
|
||||||
|
|
||||||
conn_1 = remote_connection_cache_get_connection(id_1);
|
conn_1 = remote_connection_cache_get_connection(id_1);
|
||||||
pid_1 = remote_connecton_get_remote_pid(conn_1);
|
pid_1 = remote_connection_get_remote_pid(conn_1);
|
||||||
TestAssertTrue(pid_1 != 0);
|
TestAssertTrue(pid_1 != 0);
|
||||||
|
|
||||||
conn_2 = remote_connection_cache_get_connection(id_2);
|
conn_2 = remote_connection_cache_get_connection(id_2);
|
||||||
pid_2 = remote_connecton_get_remote_pid(conn_2);
|
pid_2 = remote_connection_get_remote_pid(conn_2);
|
||||||
TestAssertTrue(pid_2 != 0);
|
TestAssertTrue(pid_2 != 0);
|
||||||
|
|
||||||
TestAssertTrue(pid_1 != pid_2);
|
TestAssertTrue(pid_1 != pid_2);
|
||||||
|
|
||||||
conn_1 = remote_connection_cache_get_connection(id_1);
|
conn_1 = remote_connection_cache_get_connection(id_1);
|
||||||
pid_prime = remote_connecton_get_remote_pid(conn_1);
|
pid_prime = remote_connection_get_remote_pid(conn_1);
|
||||||
TestAssertTrue(pid_1 == pid_prime);
|
TestAssertTrue(pid_1 == pid_prime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,7 +145,7 @@ test_invalidate_server()
|
|||||||
GetUserId());
|
GetUserId());
|
||||||
|
|
||||||
conn_1 = remote_connection_cache_get_connection(id_1);
|
conn_1 = remote_connection_cache_get_connection(id_1);
|
||||||
pid_1 = remote_connecton_get_remote_pid(conn_1);
|
pid_1 = remote_connection_get_remote_pid(conn_1);
|
||||||
TestAssertTrue(pid_1 != 0);
|
TestAssertTrue(pid_1 != 0);
|
||||||
|
|
||||||
/* simulate an invalidation in another backend */
|
/* simulate an invalidation in another backend */
|
||||||
@ -154,13 +154,13 @@ test_invalidate_server()
|
|||||||
/* Should get a different connection since we invalidated the foreign
|
/* Should get a different connection since we invalidated the foreign
|
||||||
* server and didn't yet start a transaction on the remote node. */
|
* server and didn't yet start a transaction on the remote node. */
|
||||||
conn_1 = remote_connection_cache_get_connection(id_1);
|
conn_1 = remote_connection_cache_get_connection(id_1);
|
||||||
pid_prime = remote_connecton_get_remote_pid(conn_1);
|
pid_prime = remote_connection_get_remote_pid(conn_1);
|
||||||
TestAssertTrue(pid_1 != pid_prime);
|
TestAssertTrue(pid_1 != pid_prime);
|
||||||
|
|
||||||
/* Test that connections remain despite invalidations during ongoing
|
/* Test that connections remain despite invalidations during ongoing
|
||||||
* remote transaction */
|
* remote transaction */
|
||||||
pid_1 = pid_prime;
|
pid_1 = pid_prime;
|
||||||
original_application_name = remote_connecton_get_application_name(conn_1);
|
original_application_name = remote_connection_get_application_name(conn_1);
|
||||||
BeginInternalSubTransaction("sub1");
|
BeginInternalSubTransaction("sub1");
|
||||||
|
|
||||||
/* Start a remote transaction on the connection */
|
/* Start a remote transaction on the connection */
|
||||||
@ -168,20 +168,20 @@ test_invalidate_server()
|
|||||||
invalidate_server();
|
invalidate_server();
|
||||||
|
|
||||||
conn_1 = remote_connection_cache_get_connection(id_1);
|
conn_1 = remote_connection_cache_get_connection(id_1);
|
||||||
pid_prime = remote_connecton_get_remote_pid(conn_1);
|
pid_prime = remote_connection_get_remote_pid(conn_1);
|
||||||
|
|
||||||
/* Connection should be the same despite invalidation since we're in a
|
/* Connection should be the same despite invalidation since we're in a
|
||||||
* transaction */
|
* transaction */
|
||||||
TestAssertTrue(pid_1 == pid_prime);
|
TestAssertTrue(pid_1 == pid_prime);
|
||||||
TestAssertTrue(
|
TestAssertTrue(
|
||||||
strcmp(original_application_name, remote_connecton_get_application_name(conn_1)) == 0);
|
strcmp(original_application_name, remote_connection_get_application_name(conn_1)) == 0);
|
||||||
|
|
||||||
RollbackAndReleaseCurrentSubTransaction();
|
RollbackAndReleaseCurrentSubTransaction();
|
||||||
|
|
||||||
/* After rollback, we're still in a remote transaction. Connection should
|
/* After rollback, we're still in a remote transaction. Connection should
|
||||||
* be the same. */
|
* be the same. */
|
||||||
conn_1 = remote_connection_cache_get_connection(id_1);
|
conn_1 = remote_connection_cache_get_connection(id_1);
|
||||||
pid_prime = remote_connecton_get_remote_pid(conn_1);
|
pid_prime = remote_connection_get_remote_pid(conn_1);
|
||||||
TestAssertTrue(pid_1 == pid_prime);
|
TestAssertTrue(pid_1 == pid_prime);
|
||||||
pid_1 = pid_prime;
|
pid_1 = pid_prime;
|
||||||
}
|
}
|
||||||
@ -199,14 +199,14 @@ test_remove()
|
|||||||
GetUserId());
|
GetUserId());
|
||||||
|
|
||||||
conn_1 = remote_connection_cache_get_connection(id_1);
|
conn_1 = remote_connection_cache_get_connection(id_1);
|
||||||
pid_1 = remote_connecton_get_remote_pid(conn_1);
|
pid_1 = remote_connection_get_remote_pid(conn_1);
|
||||||
TestAssertTrue(pid_1 != 0);
|
TestAssertTrue(pid_1 != 0);
|
||||||
|
|
||||||
remote_connection_cache_remove(id_1);
|
remote_connection_cache_remove(id_1);
|
||||||
|
|
||||||
/* even using the same pin, get new connection */
|
/* even using the same pin, get new connection */
|
||||||
conn_1 = remote_connection_cache_get_connection(id_1);
|
conn_1 = remote_connection_cache_get_connection(id_1);
|
||||||
pid_prime = remote_connecton_get_remote_pid(conn_1);
|
pid_prime = remote_connection_get_remote_pid(conn_1);
|
||||||
TestAssertTrue(pid_1 != pid_prime);
|
TestAssertTrue(pid_1 != pid_prime);
|
||||||
|
|
||||||
pid_1 = pid_prime;
|
pid_1 = pid_prime;
|
||||||
@ -215,7 +215,7 @@ test_remove()
|
|||||||
remote_connection_cache_remove(id_1);
|
remote_connection_cache_remove(id_1);
|
||||||
|
|
||||||
conn_1 = remote_connection_cache_get_connection(id_1);
|
conn_1 = remote_connection_cache_get_connection(id_1);
|
||||||
pid_prime = remote_connecton_get_remote_pid(conn_1);
|
pid_prime = remote_connection_get_remote_pid(conn_1);
|
||||||
TestAssertTrue(pid_1 != pid_prime);
|
TestAssertTrue(pid_1 != pid_prime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,88 +17,136 @@
|
|||||||
#include "export.h"
|
#include "export.h"
|
||||||
#include "remote/connection.h"
|
#include "remote/connection.h"
|
||||||
#include "remote/dist_txn.h"
|
#include "remote/dist_txn.h"
|
||||||
|
|
||||||
#include "test_utils.h"
|
#include "test_utils.h"
|
||||||
#include "connection.h"
|
#include "connection.h"
|
||||||
|
|
||||||
typedef struct RemoteNodeKiller
|
static RemoteNodeKiller rnk_event = {
|
||||||
{
|
.conn = NULL,
|
||||||
pid_t pid;
|
.pid = -1,
|
||||||
TSConnection *conn;
|
.killevent = DTXN_EVENT_ANY,
|
||||||
} RemoteNodeKiller;
|
};
|
||||||
|
|
||||||
static char *kill_event = NULL;
|
static void
|
||||||
static RemoteNodeKiller *rnk_event = NULL;
|
kill_backend(const DistTransactionEvent event, void *data)
|
||||||
|
{
|
||||||
|
RemoteNodeKiller *rnk = data;
|
||||||
|
|
||||||
|
if (event == rnk->killevent || rnk->killevent == DTXN_EVENT_ANY)
|
||||||
|
{
|
||||||
|
elog(WARNING, "kill event: %s", remote_dist_txn_event_name(rnk->killevent));
|
||||||
|
remote_node_killer_kill(&rnk_event);
|
||||||
|
remote_dist_txn_set_event_handler(NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static DistTransactionEventHandler eventhandler = {
|
||||||
|
.handler = kill_backend,
|
||||||
|
.data = &rnk_event,
|
||||||
|
};
|
||||||
|
|
||||||
TS_FUNCTION_INFO_V1(ts_remote_node_killer_set_event);
|
TS_FUNCTION_INFO_V1(ts_remote_node_killer_set_event);
|
||||||
|
|
||||||
RemoteNodeKiller *
|
void
|
||||||
remote_node_killer_init(TSConnection *conn)
|
remote_node_killer_init(RemoteNodeKiller *rnk, const TSConnection *conn,
|
||||||
|
const DistTransactionEvent event)
|
||||||
{
|
{
|
||||||
RemoteNodeKiller *rnk = palloc(sizeof(RemoteNodeKiller));
|
MemSet(rnk, 0, sizeof(*rnk));
|
||||||
|
|
||||||
rnk->conn = conn;
|
rnk->conn = conn;
|
||||||
rnk->pid = remote_connecton_get_remote_pid(conn);
|
rnk->pid = remote_connection_get_remote_pid(conn);
|
||||||
|
rnk->killevent = event;
|
||||||
/* do not throw error here on pid = 0 to avoid recursive abort */
|
|
||||||
/* remote_connection_report_error(ERROR, res, conn, false, sql); */
|
|
||||||
return rnk;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
kill_backends(pid_t pid, int sig)
|
||||||
|
{
|
||||||
|
#ifdef HAVE_SETSID
|
||||||
|
/* If PostgreSQL is compiled with setsid support, then the running
|
||||||
|
* PostgreSQL processes are in a process group and the negative pid kills
|
||||||
|
* all the other processes in the same group. */
|
||||||
|
return kill(-pid, SIGTERM);
|
||||||
|
#else
|
||||||
|
return kill(pid, SIGTERM);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Kill a remote process (data node).
|
||||||
|
*
|
||||||
|
* This only works if the process is on the same instance, which is true for
|
||||||
|
* regression tests.
|
||||||
|
*
|
||||||
|
* The function should not throw errors since it might be invoked from a
|
||||||
|
* transaction abort handler, which itself is invoked by errors. Throwing an
|
||||||
|
* error could cause unexpected recursion, and crash.
|
||||||
|
*/
|
||||||
void
|
void
|
||||||
remote_node_killer_kill(RemoteNodeKiller *rnk)
|
remote_node_killer_kill(RemoteNodeKiller *rnk)
|
||||||
{
|
{
|
||||||
|
PGPROC *proc;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
if (rnk->num_kills > 0)
|
||||||
|
elog(WARNING, "cannot kill backend twice on the same event");
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* do not use pg_terminate_backend here because that does permission
|
* do not use pg_terminate_backend here because that does permission
|
||||||
* checks through the catalog which requires you to be in a transaction
|
* checks through the catalog which requires you to be in a transaction
|
||||||
*/
|
*/
|
||||||
PGPROC *proc = BackendPidGetProc(rnk->pid);
|
proc = BackendPidGetProc(rnk->pid);
|
||||||
|
|
||||||
if (proc == NULL)
|
if (proc == NULL)
|
||||||
ereport(WARNING, (errmsg("PID %d is not a PostgreSQL server process", rnk->pid)));
|
ereport(WARNING, (errmsg("PID %d is not a PostgreSQL server process", rnk->pid)));
|
||||||
kill_event = NULL;
|
|
||||||
#ifdef HAVE_SETSID
|
rnk->num_kills++;
|
||||||
if (kill(-rnk->pid, SIGTERM))
|
|
||||||
#else
|
ret = kill_backends(rnk->pid, SIGTERM);
|
||||||
if (kill(rnk->pid, SIGTERM))
|
|
||||||
#endif
|
if (ret != 0)
|
||||||
ereport(WARNING, (errmsg("could not send signal to process %d: %m", rnk->pid)));
|
ereport(WARNING, (errmsg("could not send signal to process %d: %m", rnk->pid)));
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* Ensure that the backend is dead before proceeding. Otherwise, we
|
||||||
|
* might end up in a race condition where we check the status of the
|
||||||
|
* backend before it has actually exited, leading to flaky testing. */
|
||||||
|
unsigned wait_count = 300;
|
||||||
|
|
||||||
|
while (BackendPidGetProc(rnk->pid) != NULL)
|
||||||
|
{
|
||||||
|
if (wait_count == 0)
|
||||||
|
{
|
||||||
|
elog(WARNING, "timeout while waiting for killed backend to exit");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
wait_count--;
|
||||||
|
pg_usleep(100L);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Datum
|
Datum
|
||||||
ts_remote_node_killer_set_event(PG_FUNCTION_ARGS)
|
ts_remote_node_killer_set_event(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
Datum arg1 = PG_GETARG_DATUM(0);
|
const char *event_name = PG_ARGISNULL(0) ? NULL : TextDatumGetCString(PG_GETARG_DATUM(0));
|
||||||
Datum arg2 = PG_GETARG_DATUM(1);
|
const char *server_name = PG_ARGISNULL(1) ? NULL : TextDatumGetCString(PG_GETARG_DATUM(1));
|
||||||
char *event;
|
DistTransactionEvent event;
|
||||||
char *server_name;
|
|
||||||
TSConnectionId id;
|
TSConnectionId id;
|
||||||
MemoryContext ctx;
|
TSConnection *conn;
|
||||||
|
|
||||||
testing_callback_call_hook = remote_node_killer_kill_on_event;
|
if (NULL == event_name || NULL == server_name)
|
||||||
|
elog(ERROR, "invalid argument");
|
||||||
|
|
||||||
ctx = MemoryContextSwitchTo(TopTransactionContext);
|
remote_dist_txn_set_event_handler(&eventhandler);
|
||||||
|
|
||||||
event = TextDatumGetCString(arg1);
|
|
||||||
kill_event = event;
|
|
||||||
|
|
||||||
server_name = TextDatumGetCString(arg2);
|
|
||||||
|
|
||||||
remote_connection_id_set(&id,
|
remote_connection_id_set(&id,
|
||||||
GetForeignServerByName(server_name, false)->serverid,
|
GetForeignServerByName(server_name, false)->serverid,
|
||||||
GetUserId());
|
GetUserId());
|
||||||
rnk_event =
|
|
||||||
remote_node_killer_init(remote_dist_txn_get_connection(id, REMOTE_TXN_NO_PREP_STMT));
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(ctx);
|
conn = remote_dist_txn_get_connection(id, REMOTE_TXN_NO_PREP_STMT);
|
||||||
|
Assert(conn);
|
||||||
|
event = remote_dist_txn_event_from_name(event_name);
|
||||||
|
remote_node_killer_init(&rnk_event, conn, event);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
extern void
|
|
||||||
remote_node_killer_kill_on_event(const char *event)
|
|
||||||
{
|
|
||||||
if (kill_event != NULL && strcmp(kill_event, event) == 0)
|
|
||||||
{
|
|
||||||
elog(WARNING, "kill event: %s", event);
|
|
||||||
remote_node_killer_kill(rnk_event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -10,9 +10,18 @@
|
|||||||
#include <libpq-fe.h>
|
#include <libpq-fe.h>
|
||||||
|
|
||||||
#include "remote/connection.h"
|
#include "remote/connection.h"
|
||||||
|
#include "remote/dist_txn.h"
|
||||||
|
|
||||||
typedef struct RemoteNodeKiller RemoteNodeKiller;
|
typedef struct RemoteNodeKiller
|
||||||
extern RemoteNodeKiller *remote_node_killer_init(TSConnection *conn);
|
{
|
||||||
|
pid_t pid;
|
||||||
|
DistTransactionEvent killevent;
|
||||||
|
const TSConnection *conn;
|
||||||
|
int num_kills;
|
||||||
|
} RemoteNodeKiller;
|
||||||
|
|
||||||
|
extern void remote_node_killer_init(RemoteNodeKiller *rnk, const TSConnection *conn,
|
||||||
|
const DistTransactionEvent event);
|
||||||
void remote_node_killer_kill(RemoteNodeKiller *rnk);
|
void remote_node_killer_kill(RemoteNodeKiller *rnk);
|
||||||
|
|
||||||
extern void remote_node_killer_kill_on_event(const char *event);
|
extern void remote_node_killer_kill_on_event(const char *event);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user