diff --git a/sql/ddl_internal.sql b/sql/ddl_internal.sql index fe0f70080..135d91963 100644 --- a/sql/ddl_internal.sql +++ b/sql/ddl_internal.sql @@ -50,3 +50,7 @@ BEGIN RAISE 'subscription sync wait timedout'; END $BODY$ SET search_path TO pg_catalog, pg_temp; + +CREATE OR REPLACE FUNCTION _timescaledb_internal.health() RETURNS +TABLE (node_name NAME, healthy BOOL, in_recovery BOOL, error TEXT) +AS '@MODULE_PATHNAME@', 'ts_health_check' LANGUAGE C VOLATILE; diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index 8ee47e78e..c32bbb085 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -224,3 +224,5 @@ ALTER TABLE _timescaledb_internal.bgw_policy_chunk_stats ADD CONSTRAINT bgw_policy_chunk_stats_job_id_fkey FOREIGN KEY(job_id) REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE; + +DROP FUNCTION _timescaledb_internal.health; diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index eec82d93d..f0128af24 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -123,6 +123,7 @@ CROSSMODULE_WRAPPER(dist_remote_hypertable_index_info); CROSSMODULE_WRAPPER(distributed_exec); CROSSMODULE_WRAPPER(create_distributed_restore_point); CROSSMODULE_WRAPPER(hypertable_distributed_set_replication_factor); +CROSSMODULE_WRAPPER(health_check); TS_FUNCTION_INFO_V1(ts_dist_set_id); Datum @@ -540,6 +541,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .chunk_create_replica_table = error_no_default_fn_pg_community, .hypertable_distributed_set_replication_factor = error_no_default_fn_pg_community, .update_compressed_chunk_relstats = update_compressed_chunk_relstats_default, + .health_check = error_no_default_fn_pg_community, }; TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default; diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 1207a60ab..6e4fd0549 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -202,6 +202,7 @@ typedef struct CrossModuleFunctions TupleTableSlot *(*compress_row_exec)(CompressSingleRowState *cr, TupleTableSlot *slot); void (*compress_row_end)(CompressSingleRowState *cr); void (*compress_row_destroy)(CompressSingleRowState *cr); + PGFunction health_check; } CrossModuleFunctions; extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions; diff --git a/tsl/src/init.c b/tsl/src/init.c index 32314005d..3cd6b3955 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -50,6 +50,7 @@ #include "remote/dist_commands.h" #include "remote/dist_copy.h" #include "remote/dist_txn.h" +#include "remote/healthcheck.h" #include "remote/txn_id.h" #include "remote/txn_resolve.h" #include "reorder.h" @@ -230,6 +231,7 @@ CrossModuleFunctions tsl_cm_functions = { .hypertable_distributed_set_replication_factor = hypertable_set_replication_factor, .cache_syscache_invalidate = cache_syscache_invalidate, .update_compressed_chunk_relstats = update_compressed_chunk_relstats, + .health_check = ts_dist_health_check, }; static void diff --git a/tsl/src/remote/CMakeLists.txt b/tsl/src/remote/CMakeLists.txt index c02b46255..1bb485dde 100644 --- a/tsl/src/remote/CMakeLists.txt +++ b/tsl/src/remote/CMakeLists.txt @@ -10,6 +10,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/dist_copy.c ${CMAKE_CURRENT_SOURCE_DIR}/dist_ddl.c ${CMAKE_CURRENT_SOURCE_DIR}/copy_fetcher.c + ${CMAKE_CURRENT_SOURCE_DIR}/healthcheck.c ${CMAKE_CURRENT_SOURCE_DIR}/stmt_params.c ${CMAKE_CURRENT_SOURCE_DIR}/tuplefactory.c ${CMAKE_CURRENT_SOURCE_DIR}/txn.c diff --git a/tsl/src/remote/healthcheck.c b/tsl/src/remote/healthcheck.c new file mode 100644 index 000000000..183495b55 --- /dev/null +++ b/tsl/src/remote/healthcheck.c @@ -0,0 +1,313 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include +#include + +#include "healthcheck.h" +#include "data_node.h" +#include "dist_commands.h" +#include "dist_util.h" + +/* + * Functions and data structures for printing a health check result. + */ +enum Anum_show_conn +{ + Anum_health_node_name = 1, + Anum_health_healthy, + Anum_health_in_recovery, + Anum_health_error, + _Anum_health_max, +}; + +#define Natts_health (_Anum_health_max - 1) + +/* + * Health check result. + * + * Produces a tuple with four columns: + * + * 1. Node name (NULL for the access node itself) + * + * 2. A "healthy" boolean. An access node returns "false" in case a data node + * is not responding. + * + * 3. A boolean telling if the node is in recovery. Note that the "healthy" + * status will be "true" while the node is in recovery. + * + * 4. An optional error message explaining the reason for "healthy" being + * false. + */ +static HeapTuple +create_health_check_tuple(const char *data_node, bool in_recovery, TupleDesc tupdesc) +{ + Datum values[Natts_health]; + bool nulls[Natts_health]; + + memset(nulls, true, sizeof(nulls)); + + if (NULL != data_node) + { + NameData *node_name = palloc(sizeof(NameData)); + namestrcpy(node_name, data_node); + values[AttrNumberGetAttrOffset(Anum_health_node_name)] = NameGetDatum(node_name); + nulls[AttrNumberGetAttrOffset(Anum_health_node_name)] = false; + } + + values[AttrNumberGetAttrOffset(Anum_health_healthy)] = BoolGetDatum(in_recovery ? false : true); + nulls[AttrNumberGetAttrOffset(Anum_health_healthy)] = false; + values[AttrNumberGetAttrOffset(Anum_health_in_recovery)] = BoolGetDatum(in_recovery); + nulls[AttrNumberGetAttrOffset(Anum_health_in_recovery)] = false; + + return heap_form_tuple(tupdesc, values, nulls); +} + +static IOFuncSelector +get_io_func_selector_from_format(int format) +{ + switch (format) + { + case 0: /* text format */ + return IOFunc_input; + case 1: /* binary format */ + return IOFunc_receive; + default: + /* reserved for future, so likely won't happen */ + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("unexpected format of data node response"))); + } + + pg_unreachable(); + /* Silence Windows compiler */ + return IOFunc_input; +} + +static void +fill_in_result_error(Datum values[Natts_health], bool nulls[Natts_health], const char *errormsg) +{ + values[AttrNumberGetAttrOffset(Anum_health_error)] = CStringGetTextDatum(errormsg); + nulls[AttrNumberGetAttrOffset(Anum_health_error)] = false; +} + +Datum +ts_dist_health_check(PG_FUNCTION_ARGS) +{ + DistCmdResult *result; + FuncCallContext *funcctx; + HeapTuple tuple = NULL; + + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + TupleDesc tupdesc; + + funcctx = SRF_FIRSTCALL_INIT(); + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + if (dist_util_membership() == DIST_MEMBER_ACCESS_NODE) + { + List *data_node_list; + StringInfo cmd = makeStringInfo(); + Oid fnamespaceid = get_func_namespace(fcinfo->flinfo->fn_oid); + + appendStringInfo(cmd, + "SELECT * FROM %s.%s()", + get_namespace_name(fnamespaceid), + get_func_name(fcinfo->flinfo->fn_oid)); + data_node_list = data_node_get_node_name_list(); + result = ts_dist_cmd_invoke_on_data_nodes_using_search_path(cmd->data, + NULL, + data_node_list, + true); + funcctx->user_fctx = result; + list_free(data_node_list); + } + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + + switch (dist_util_membership()) + { + case DIST_MEMBER_ACCESS_NODE: + result = funcctx->user_fctx; + unsigned int call_cnt = funcctx->call_cntr; + + if (call_cnt == 0) + { + /* On call count 0, produce a tuple for health of AN itself */ + tuple = create_health_check_tuple(NULL, RecoveryInProgress(), funcctx->tuple_desc); + } + else if (result == NULL) + { + /* No result from data nodes, so nothing to do. This probably + * cannot happen in practice. */ + SRF_RETURN_DONE(funcctx); + } + else if (call_cnt > ts_dist_cmd_response_count(result)) + { + /* No more responses so no more tuples to produce */ + ts_dist_cmd_close_response(result); + funcctx->user_fctx = NULL; + SRF_RETURN_DONE(funcctx); + } + else + { + /* + * Produce a tuple from a data node's response. + * + * TODO: Currently, the remote commands to data nodes will + * either succeed or throw an error if one of the data nodes + * cannot be contacted. Therefore, the access node will never + * produce a result showing a data node as unhealthy (unless a + * node is in recovery). This needs to be changed so that + * connection issues (or errors from data node) won't result + * in throwing an error here. Instead, the result should be + * "unhealthy" with the appropriate error string if available. + */ + const char *node_name = ""; + NameData data_node_name; + PGresult *pgres = ts_dist_cmd_get_result_by_index(result, call_cnt - 1, &node_name); + TupleDesc return_tupdesc = funcctx->tuple_desc; + Datum values[Natts_health]; + bool nulls[Natts_health]; + + memset(nulls, true, sizeof(nulls)); + + namestrcpy(&data_node_name, node_name); + nulls[AttrNumberGetAttrOffset(Anum_health_node_name)] = false; + values[AttrNumberGetAttrOffset(Anum_health_node_name)] = + NameGetDatum(&data_node_name); + if (PQresultStatus(pgres) != PGRES_TUPLES_OK) + { + values[AttrNumberGetAttrOffset(Anum_health_error)] = + CStringGetTextDatum(PQresultErrorMessage(pgres)); + nulls[AttrNumberGetAttrOffset(Anum_health_error)] = false; + } + else if (PQnfields(pgres) != funcctx->tuple_desc->natts) + { + StringInfo error = makeStringInfo(); + appendStringInfo(error, + "unexpected number of fields in data node response (%d vs %d) " + "%s", + PQnfields(pgres), + funcctx->tuple_desc->natts, + PQgetvalue(pgres, 0, 0)); + + fill_in_result_error(values, nulls, error->data); + } + else if (PQntuples(pgres) != 1) + { + StringInfo error = makeStringInfo(); + appendStringInfo(error, + "unexpected number of rows in data node response (%d vs %d)", + PQntuples(pgres), + 1); + + fill_in_result_error(values, nulls, error->data); + } + else + { + int i; + + for (i = 0; i < return_tupdesc->natts; i++) + { + /* Data nodes don't return their own name, and it was already filled in */ + if (i == AttrNumberGetAttrOffset(Anum_health_node_name)) + { + Assert(PQgetisnull(pgres, 0 /* row */, i /* column */) == 1); + } + else if (PQgetisnull(pgres, 0 /* row */, i /* column */) == 1) + { + nulls[i] = true; + } + else + { + Oid typid = PQftype(pgres, i); + int format = PQfformat(pgres, i); + IOFuncSelector iofuncsel = get_io_func_selector_from_format(format); + int16 typlen; + bool typbyval; + char typalign; + char typdelim; + Oid typioparam; + Oid typfuncid; + + if (typid != return_tupdesc->attrs[i].atttypid) + { + StringInfo error = makeStringInfo(); + appendStringInfo(error, + "unexpected field type in data node response %u " + "vs %u", + typid, + return_tupdesc->attrs[i].attrelid); + + fill_in_result_error(values, nulls, error->data); + break; + } + + get_type_io_data(typid, + iofuncsel, + &typlen, + &typbyval, + &typalign, + &typdelim, + &typioparam, + &typfuncid); + + if (iofuncsel == IOFunc_receive) + { + StringInfo data = makeStringInfo(); + appendBinaryStringInfo(data, + PQgetvalue(pgres, 0, i), + PQgetlength(pgres, 0, i)); + values[i] = OidReceiveFunctionCall(typfuncid, + data, + typioparam, + PQfmod(pgres, i)); + } + else + { + Assert(iofuncsel == IOFunc_input); + values[i] = OidInputFunctionCall(typfuncid, + PQgetvalue(pgres, 0, i), + typioparam, + PQfmod(pgres, i)); + } + + nulls[i] = false; + } + } + } + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + } + break; + case DIST_MEMBER_DATA_NODE: + case DIST_MEMBER_NONE: + if (funcctx->call_cntr > 0) + SRF_RETURN_DONE(funcctx); + + tuple = create_health_check_tuple(NULL, RecoveryInProgress(), funcctx->tuple_desc); + break; + } + + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); +} diff --git a/tsl/src/remote/healthcheck.h b/tsl/src/remote/healthcheck.h new file mode 100644 index 000000000..5298013b0 --- /dev/null +++ b/tsl/src/remote/healthcheck.h @@ -0,0 +1,13 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#ifndef TIMESCALEDB_TSL_REMOTE_HEALTHCHECK_H + +#include +#include + +extern Datum ts_dist_health_check(PG_FUNCTION_ARGS); + +#endif /* TIMESCALEDB_TSL_REMOTE_HEALTHCHECK_H */ diff --git a/tsl/test/expected/dist_commands.out b/tsl/test/expected/dist_commands.out index 3d1f10063..4264942ba 100644 --- a/tsl/test/expected/dist_commands.out +++ b/tsl/test/expected/dist_commands.out @@ -330,6 +330,16 @@ CALL distributed_exec('SELECT 1'); ERROR: function must be run on the access node only \set ON_ERROR_STOP 1 \c :TEST_DBNAME :ROLE_SUPERUSER +-- Test health check function output on access node +SELECT * FROM _timescaledb_internal.health() ORDER BY 1 NULLS FIRST; + node_name | healthy | in_recovery | error +--------------------+---------+-------------+------- + | t | f | + db_dist_commands_1 | t | f | + db_dist_commands_2 | t | f | + db_dist_commands_3 | t | f | +(4 rows) + SELECT * FROM delete_data_node(:'DATA_NODE_1'); delete_data_node ------------------ @@ -348,6 +358,13 @@ SELECT * FROM delete_data_node(:'DATA_NODE_3'); t (1 row) +-- Test health check when no longer an access node (no data nodes) +SELECT * FROM _timescaledb_internal.health() ORDER BY 1 NULLS FIRST; + node_name | healthy | in_recovery | error +-----------+---------+-------------+------- + | t | f | +(1 row) + DROP DATABASE :DATA_NODE_1; DROP DATABASE :DATA_NODE_2; DROP DATABASE :DATA_NODE_3; diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index e1d4874a6..effbfc829 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -81,6 +81,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text _timescaledb_internal.get_partition_for_key(anyelement) _timescaledb_internal.get_partition_hash(anyelement) _timescaledb_internal.get_time_type(integer) + _timescaledb_internal.health() _timescaledb_internal.hist_combinefunc(internal,internal) _timescaledb_internal.hist_deserializefunc(bytea,internal) _timescaledb_internal.hist_finalfunc(internal,double precision,double precision,double precision,integer) diff --git a/tsl/test/sql/dist_commands.sql b/tsl/test/sql/dist_commands.sql index 6f74b89c9..6c845f6ef 100644 --- a/tsl/test/sql/dist_commands.sql +++ b/tsl/test/sql/dist_commands.sql @@ -172,9 +172,17 @@ CALL distributed_exec('SELECT 1'); \set ON_ERROR_STOP 1 \c :TEST_DBNAME :ROLE_SUPERUSER + +-- Test health check function output on access node +SELECT * FROM _timescaledb_internal.health() ORDER BY 1 NULLS FIRST; + SELECT * FROM delete_data_node(:'DATA_NODE_1'); SELECT * FROM delete_data_node(:'DATA_NODE_2'); SELECT * FROM delete_data_node(:'DATA_NODE_3'); + +-- Test health check when no longer an access node (no data nodes) +SELECT * FROM _timescaledb_internal.health() ORDER BY 1 NULLS FIRST; + DROP DATABASE :DATA_NODE_1; DROP DATABASE :DATA_NODE_2; DROP DATABASE :DATA_NODE_3; diff --git a/tsl/test/t/007_healthcheck.pl b/tsl/test/t/007_healthcheck.pl new file mode 100644 index 000000000..edf983303 --- /dev/null +++ b/tsl/test/t/007_healthcheck.pl @@ -0,0 +1,42 @@ +# This file and its contents are licensed under the Timescale License. +# Please see the included NOTICE for copyright information and +# LICENSE-TIMESCALE for a copy of the license. + +use strict; +use warnings; +use AccessNode; +use DataNode; +use TestLib; +use Test::More tests => 4; + +#Initialize all the multi-node instances +my $an = AccessNode->create('an'); +my $dn1 = DataNode->create('dn1'); +my $dn2 = DataNode->create('dn2'); + +$an->add_data_node($dn1); +$an->add_data_node($dn2); + +$an->psql_is( + 'postgres', + 'SELECT * FROM _timescaledb_internal.health() ORDER BY 1 NULLS FIRST', + q[|t|f| +dn1|t|f| +dn2|t|f|], 'Health check shows healthy AN and two healthy DNs'); + +# Stop a data node to simulate failure +$dn1->stop('fast'); + +# Health check will currently fail with an error when a data node +# cannot be contacted. This should be fixed so that the health check +# instead returns a negative status for the node. +my ($ret, $stdout, $stderr) = $an->psql('postgres', + 'SELECT * FROM _timescaledb_internal.health() ORDER BY 1 NULLS FIRST;'); + +# psql return error code 3 in case of failure in script +is($ret, qq(3), "expect error code 3 due to failed data node"); + +done_testing(); + +1; + diff --git a/tsl/test/t/CMakeLists.txt b/tsl/test/t/CMakeLists.txt index 76cd6fc6e..57c45e820 100644 --- a/tsl/test/t/CMakeLists.txt +++ b/tsl/test/t/CMakeLists.txt @@ -1,6 +1,7 @@ set(PROVE_TEST_FILES 001_simple_multinode.pl 003_connections_privs.pl) -set(PROVE_DEBUG_TEST_FILES 002_chunk_copy_move.pl 004_multinode_rdwr_1pc.pl - 005_add_data_node.pl 006_job_crash_log.pl) +set(PROVE_DEBUG_TEST_FILES + 002_chunk_copy_move.pl 004_multinode_rdwr_1pc.pl 005_add_data_node.pl + 006_job_crash_log.pl 007_healthcheck.pl) if(CMAKE_BUILD_TYPE MATCHES Debug) list(APPEND PROVE_TEST_FILES ${PROVE_DEBUG_TEST_FILES})