Add timeout argument to the ping_data_node()

This PR introduces a timeout argument and a new logic to the
timescale_internal.ping_data_node() function which allows
to handle io timeouts for nodes being unresponsive.

Fix #5312
This commit is contained in:
Dmitry Simonenko 2023-02-21 19:17:20 +02:00 committed by Dmitry Simonenko
parent 0976634399
commit f12a361ef7
10 changed files with 132 additions and 23 deletions

View File

@ -41,6 +41,7 @@ Sooner to that time, we will announce the specific version of TimescaleDB in whi
* #5253 Make data node command execution interruptible
* #5262 Extend enabling compression on a continuous aggregrate with 'compress_segmentby' and 'compress_orderby' parameters
* #5343 Set PortalContext when starting job
* #5312 Add timeout support to the ping_data_node()
**Bugfixes**
* #5214 Fix use of prepared statement in async module

View File

@ -3,7 +3,7 @@
-- LICENSE-APACHE for a copy of the license.
-- Check if a data node is up
CREATE OR REPLACE FUNCTION _timescaledb_internal.ping_data_node(node_name NAME) RETURNS BOOLEAN
CREATE OR REPLACE FUNCTION _timescaledb_internal.ping_data_node(node_name NAME, timeout INTERVAL = NULL) RETURNS BOOLEAN
AS '@MODULE_PATHNAME@', 'ts_data_node_ping' LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION _timescaledb_internal.remote_txn_heal_data_node(foreign_server_oid oid)

View File

@ -0,0 +1,4 @@
DROP FUNCTION _timescaledb_internal.ping_data_node(NAME);
CREATE FUNCTION _timescaledb_internal.ping_data_node(node_name NAME, timeout INTERVAL = NULL) RETURNS BOOLEAN
AS '@MODULE_PATHNAME@', 'ts_data_node_ping' LANGUAGE C VOLATILE;

View File

@ -3,3 +3,8 @@ GRANT ALL ON _timescaledb_internal.job_errors TO PUBLIC;
ALTER EXTENSION timescaledb DROP VIEW timescaledb_information.job_errors;
DROP VIEW timescaledb_information.job_errors;
DROP FUNCTION _timescaledb_internal.ping_data_node(NAME, INTERVAL);
CREATE OR REPLACE FUNCTION _timescaledb_internal.ping_data_node(node_name NAME) RETURNS BOOLEAN
AS '@MODULE_PATHNAME@', 'ts_data_node_ping' LANGUAGE C VOLATILE;

View File

@ -598,7 +598,7 @@ connect_for_bootstrapping(const char *node_name, const char *const host, int32 p
{
List *node_options =
create_data_node_options(host, port, bootstrap_databases[i], username, password);
conn = remote_connection_open(node_name, node_options, &err);
conn = remote_connection_open(node_name, node_options, TS_NO_TIMEOUT, &err);
if (conn)
return conn;
@ -1778,7 +1778,7 @@ drop_data_node_database(const ForeignServer *server)
server = data_node_get_foreign_server(nodename, ACL_USAGE, true, false);
/* Open a connection to the bootstrap database using the new server options */
conn_options = remote_connection_prepare_auth_options(server, userid);
conn = remote_connection_open(nodename, conn_options, &err);
conn = remote_connection_open(nodename, conn_options, TS_NO_TIMEOUT, &err);
if (NULL != conn)
break;
@ -2056,6 +2056,9 @@ Datum
data_node_ping(PG_FUNCTION_ARGS)
{
const char *node_name = PG_ARGISNULL(0) ? NULL : PG_GETARG_CSTRING(0);
Interval *timeout = PG_ARGISNULL(1) ? NULL : PG_GETARG_INTERVAL_P(1);
TimestampTz endtime = TS_NO_TIMEOUT;
/* Allow anyone to ping a data node. Otherwise the
* timescaledb_information.data_node view won't work for those users. */
ForeignServer *server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, false);
@ -2063,7 +2066,11 @@ data_node_ping(PG_FUNCTION_ARGS)
Assert(NULL != server);
success = remote_connection_ping(server->servername);
/* Get endtime in microseconds */
if (timeout)
endtime = GetCurrentTimestamp() + ts_get_interval_period_approx(timeout);
success = remote_connection_ping(server->servername, endtime);
PG_RETURN_DATUM(BoolGetDatum(success));
}

View File

@ -919,8 +919,26 @@ remote_connection_get_result_error(const PGresult *res, TSConnectionError *err)
fill_result_error(err, ERRCODE_CONNECTION_EXCEPTION, "", res);
}
static long
timeout_diff_ms(TimestampTz endtime)
{
TimestampTz now;
long secs;
int microsecs;
if (endtime == TS_NO_TIMEOUT)
return -1;
now = GetCurrentTimestamp();
if (now >= endtime)
return 0;
TimestampDifference(now, endtime, &secs, &microsecs);
return secs * 1000 + (microsecs / 1000);
}
PGresult *
remote_connection_get_result(const TSConnection *conn)
remote_connection_get_result(const TSConnection *conn, TimestampTz endtime)
{
PGresult *pgres = NULL;
int busy = 1;
@ -933,11 +951,23 @@ remote_connection_get_result(const TSConnection *conn)
if (busy == 1)
{
/* Busy, wait for readable */
uint32 events;
WaitEvent event;
long timeout_ms;
int ret;
ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_READABLE, NULL);
WaitEventSetWait(conn->wes, -1, &event, 1, PG_WAIT_EXTENSION);
events = WL_SOCKET_READABLE;
if (endtime != TS_NO_TIMEOUT)
events |= WL_TIMEOUT;
timeout_ms = timeout_diff_ms(endtime);
/* Busy, wait for readable */
ModifyWaitEvent(conn->wes, conn->sockeventpos, events, NULL);
ret = WaitEventSetWait(conn->wes, timeout_ms, &event, 1, PG_WAIT_EXTENSION);
/* Timeout */
if (ret == 0)
break;
if (event.events & WL_LATCH_SET)
{
@ -983,7 +1013,7 @@ remote_connection_get_result(const TSConnection *conn)
* string.
*/
PGresult *
remote_connection_exec(TSConnection *conn, const char *cmd)
remote_connection_exec_timeout(TSConnection *conn, const char *cmd, TimestampTz endtime)
{
WaitEvent event;
PGresult *res = NULL;
@ -999,11 +1029,23 @@ remote_connection_exec(TSConnection *conn, const char *cmd)
do
{
uint32 events;
long timeout_ms;
CHECK_FOR_INTERRUPTS();
events = WL_SOCKET_WRITEABLE;
if (endtime != TS_NO_TIMEOUT)
events |= WL_TIMEOUT;
timeout_ms = timeout_diff_ms(endtime);
/* Wait for writable socket in outer loop */
ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_WRITEABLE, NULL);
WaitEventSetWait(conn->wes, -1, &event, 1, PG_WAIT_EXTENSION);
ModifyWaitEvent(conn->wes, conn->sockeventpos, events, NULL);
ret = WaitEventSetWait(conn->wes, timeout_ms, &event, 1, PG_WAIT_EXTENSION);
/* Timeout */
if (ret == 0)
break;
if (event.events & WL_LATCH_SET)
{
@ -1034,7 +1076,7 @@ remote_connection_exec(TSConnection *conn, const char *cmd)
* concactinated, but that is not possible here due to lack of
* access to internals. PG14 handles that automatically, however.
*/
while ((res = remote_connection_get_result(conn)) != NULL)
while ((res = remote_connection_get_result(conn, endtime)) != NULL)
{
if (last_result)
{
@ -1085,6 +1127,12 @@ remote_connection_exec(TSConnection *conn, const char *cmd)
return res;
}
PGresult *
remote_connection_exec(TSConnection *conn, const char *cmd)
{
return remote_connection_exec_timeout(conn, cmd, TS_NO_TIMEOUT);
}
/*
* Must be a macro since va_start() must be called in the function that takes
* a variable number of arguments.
@ -1522,7 +1570,8 @@ setup_full_connection_options(List *connection_options, const char ***all_keywor
* an error message is optionally returned via the "errmsg" parameter.
*/
TSConnection *
remote_connection_open(const char *node_name, List *connection_options, char **errmsg)
remote_connection_open(const char *node_name, List *connection_options, TimestampTz endtime,
char **errmsg)
{
PGconn *pg_conn = NULL;
TSConnection *ts_conn = NULL;
@ -1553,6 +1602,8 @@ remote_connection_open(const char *node_name, List *connection_options, char **e
do
{
long timeout_ms;
int events;
int io_flag;
int rc;
@ -1565,6 +1616,14 @@ remote_connection_open(const char *node_name, List *connection_options, char **e
#endif
else
io_flag = WL_SOCKET_WRITEABLE;
if (endtime == TS_NO_TIMEOUT)
events = io_flag;
else
events = io_flag | WL_TIMEOUT;
timeout_ms = timeout_diff_ms(endtime);
/*
* Wait for latch or socket event. Note that it is not possible to
* reuse a WaitEventSet using the same socket file descriptor in each
@ -1574,10 +1633,17 @@ remote_connection_open(const char *node_name, List *connection_options, char **e
* correct file descriptor (socket) with PQsocket().
*/
rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | events,
PQsocket(pg_conn),
0,
timeout_ms,
PG_WAIT_EXTENSION);
if (rc & WL_TIMEOUT)
{
finish_connection(pg_conn, errmsg);
return NULL;
}
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
@ -1623,7 +1689,7 @@ TSConnection *
remote_connection_open_session(const char *node_name, List *connection_options, bool set_dist_id)
{
char *err = NULL;
TSConnection *conn = remote_connection_open(node_name, connection_options, &err);
TSConnection *conn = remote_connection_open(node_name, connection_options, TS_NO_TIMEOUT, &err);
if (NULL == conn)
ereport(ERROR,
@ -1858,7 +1924,7 @@ remote_connection_get_connstr(const char *node_name)
#define PING_QUERY "SELECT 1"
bool
remote_connection_ping(const char *node_name)
remote_connection_ping(const char *node_name, TimestampTz endtime)
{
Oid server_id = get_foreign_server_oid(node_name, false);
ForeignServer *server = GetForeignServer(server_id);
@ -1874,14 +1940,14 @@ remote_connection_ping(const char *node_name)
}
connection_options = remote_connection_prepare_auth_options(server, GetUserId());
conn = remote_connection_open(server->servername, connection_options, NULL);
conn = remote_connection_open(server->servername, connection_options, endtime, NULL);
if (NULL == conn)
return false;
if (PQstatus(conn->pg_conn) == CONNECTION_OK)
{
PGresult *res = remote_connection_exec(conn, PING_QUERY);
PGresult *res = remote_connection_exec_timeout(conn, PING_QUERY, endtime);
success = (PQresultStatus(res) == PGRES_TUPLES_OK);
}

View File

@ -58,7 +58,7 @@ typedef struct TSConnectionError
* `remote_connection_close`
*/
extern TSConnection *remote_connection_open(const char *node_name, List *connection_options,
char **errmsg);
TimestampTz endtime, char **errmsg);
extern TSConnection *remote_connection_open_session(const char *node_name, List *connection_options,
bool set_dist_id);
extern TSConnection *remote_connection_open_session_by_id(TSConnectionId id);
@ -69,9 +69,11 @@ extern int remote_connection_xact_depth_dec(TSConnection *conn);
extern void remote_connection_xact_transition_begin(TSConnection *conn);
extern void remote_connection_xact_transition_end(TSConnection *conn);
extern bool remote_connection_xact_is_transitioning(const TSConnection *conn);
extern bool remote_connection_ping(const char *node_name);
extern bool remote_connection_ping(const char *node_name, TimestampTz endtime);
extern void remote_connection_close(TSConnection *conn);
extern PGresult *remote_connection_get_result(const TSConnection *conn);
extern PGresult *remote_connection_get_result(const TSConnection *conn, TimestampTz endtime);
extern PGresult *remote_connection_exec_timeout(TSConnection *conn, const char *cmd,
TimestampTz endtime);
extern PGresult *remote_connection_exec(TSConnection *conn, const char *cmd);
extern PGresult *remote_connection_execf(TSConnection *conn, const char *fmt, ...)
pg_attribute_printf(2, 3);

View File

@ -674,6 +674,25 @@ SELECT * FROM _timescaledb_internal.ping_data_node('data_node_1');
t
(1 row)
-- Ensure timeout returned by argument
SELECT * FROM _timescaledb_internal.ping_data_node('data_node_1', interval '0s');
ping_data_node
----------------
f
(1 row)
SELECT * FROM _timescaledb_internal.ping_data_node('data_node_1', interval '3s');
ping_data_node
----------------
t
(1 row)
SELECT * FROM _timescaledb_internal.ping_data_node('data_node_1', interval '1 day');
ping_data_node
----------------
t
(1 row)
-- Create data node referencing postgres_fdw
RESET ROLE;
CREATE EXTENSION postgres_fdw;

View File

@ -110,7 +110,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
_timescaledb_internal.main_table_from_hypertable(integer)
_timescaledb_internal.materialization_invalidation_log_delete(integer)
_timescaledb_internal.partialize_agg(anyelement)
_timescaledb_internal.ping_data_node(name)
_timescaledb_internal.ping_data_node(name,interval)
_timescaledb_internal.policy_compression(integer,jsonb)
_timescaledb_internal.policy_compression_check(jsonb)
_timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean)

View File

@ -340,6 +340,11 @@ SELECT * FROM _timescaledb_catalog.chunk_data_node;
SELECT * FROM _timescaledb_internal.ping_data_node('data_node_1');
-- Ensure timeout returned by argument
SELECT * FROM _timescaledb_internal.ping_data_node('data_node_1', interval '0s');
SELECT * FROM _timescaledb_internal.ping_data_node('data_node_1', interval '3s');
SELECT * FROM _timescaledb_internal.ping_data_node('data_node_1', interval '1 day');
-- Create data node referencing postgres_fdw
RESET ROLE;
CREATE EXTENSION postgres_fdw;