1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-19 20:24:46 +08:00

Add function to alter data nodes

Add a new function, `alter_data_node()`, which can be used to change
the data node's configuration originally set up via `add_data_node()`
on the access node.

The new functions introduces a new option "available" that allows
configuring the availability of the data node. Setting
`available=>false` means that the node should no longer be used for
reads and writes. Only read "failover" is implemented as part of this
change, however.

To fail over reads, the alter data node function finds all the chunks
for which the unavailable data node is the "primary" query target and
"fails over" to a chunk replica on another data node instead. If some
chunks do not have a replica to fail over to, a warning will be
raised.

When a data node is available again, the function can be used to
switch back to using the data node for queries.

Closes 
This commit is contained in:
Erik Nordström 2022-10-14 11:09:53 +02:00 committed by Erik Nordström
parent fe6731cead
commit f13214891c
25 changed files with 1050 additions and 43 deletions

@ -216,3 +216,12 @@ CREATE OR REPLACE PROCEDURE @extschema@.refresh_continuous_aggregate(
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';
CREATE OR REPLACE FUNCTION @extschema@.alter_data_node(
node_name NAME,
host TEXT = NULL,
database NAME = NULL,
port INTEGER = NULL,
available BOOLEAN = NULL
) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, available BOOLEAN)
AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE;

@ -255,6 +255,8 @@ CREATE TABLE _timescaledb_catalog.chunk_data_node (
CONSTRAINT chunk_data_node_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id)
);
CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk_data_node', '');
-- Default jobs are given the id space [1,1000). User-installed jobs and any jobs created inside tests

@ -379,3 +379,14 @@ GRANT SELECT ON _timescaledb_catalog.dimension_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.dimension TO PUBLIC;
-- end recreate _timescaledb_catalog.dimension table --
-- changes related to alter_data_node():
CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name);
CREATE FUNCTION @extschema@.alter_data_node(
node_name NAME,
host TEXT = NULL,
database NAME = NULL,
port INTEGER = NULL,
available BOOLEAN = NULL
) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, available BOOLEAN)
AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE;

@ -305,3 +305,7 @@ GRANT SELECT ON _timescaledb_catalog.dimension_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.dimension TO PUBLIC;
-- end recreate _timescaledb_catalog.dimension table --
-- changes related to alter_data_node()
DROP INDEX _timescaledb_catalog.chunk_data_node_node_name_idx;
DROP FUNCTION @extschema@.alter_data_node;

@ -102,6 +102,7 @@ CROSSMODULE_WRAPPER(data_node_add);
CROSSMODULE_WRAPPER(data_node_delete);
CROSSMODULE_WRAPPER(data_node_attach);
CROSSMODULE_WRAPPER(data_node_detach);
CROSSMODULE_WRAPPER(data_node_alter);
CROSSMODULE_WRAPPER(chunk_drop_replica);
CROSSMODULE_WRAPPER(chunk_set_default_data_node);
@ -505,6 +506,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.data_node_attach = error_no_default_fn_pg_community,
.data_node_ping = error_no_default_fn_pg_community,
.data_node_detach = error_no_default_fn_pg_community,
.data_node_alter = error_no_default_fn_pg_community,
.data_node_allow_new_chunks = error_no_default_fn_pg_community,
.data_node_block_new_chunks = error_no_default_fn_pg_community,
.distributed_exec = error_no_default_fn_pg_community,

@ -158,6 +158,7 @@ typedef struct CrossModuleFunctions
PGFunction data_node_attach;
PGFunction data_node_ping;
PGFunction data_node_detach;
PGFunction data_node_alter;
PGFunction data_node_allow_new_chunks;
PGFunction data_node_block_new_chunks;

@ -2711,7 +2711,16 @@ ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cub
ts_hypercube_get_slice_by_dimension_id(cube, space_dim->fd.id);
const DimensionPartition *dp =
ts_dimension_partition_find(space_dim->dimension_partitions, slice->fd.range_start);
chunk_data_nodes = dp->data_nodes;
ListCell *lc;
/* Filter out data nodes that aren't available */
foreach (lc, dp->data_nodes)
{
char *node_name = lfirst(lc);
if (ts_data_node_is_available(node_name))
chunk_data_nodes = lappend(chunk_data_nodes, node_name);
}
}
else
{
@ -2758,6 +2767,9 @@ typedef bool (*hypertable_data_node_filter)(const HypertableDataNode *hdn);
static bool
filter_non_blocked_data_nodes(const HypertableDataNode *node)
{
if (!ts_data_node_is_available(NameStr(node->fd.node_name)))
return false;
return !node->fd.block_chunks;
}

@ -153,7 +153,8 @@ extern TSDLLEXPORT bool ts_hypertable_set_compress_interval(Hypertable *ht,
int64 compress_interval);
extern TSDLLEXPORT void ts_hypertable_clone_constraints_to_compressed(const Hypertable *ht,
List *constraint_list);
extern List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cube);
extern TSDLLEXPORT List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht,
const Hypercube *cube);
extern TSDLLEXPORT List *ts_hypertable_get_data_node_name_list(const Hypertable *ht);
extern TSDLLEXPORT List *ts_hypertable_get_data_node_serverids_list(const Hypertable *ht);
extern TSDLLEXPORT List *ts_hypertable_get_available_data_nodes(const Hypertable *ht,

@ -4,6 +4,7 @@
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include <foreign/foreign.h>
#include <nodes/parsenodes.h>
#include <nodes/nodes.h>
#include <nodes/makefuncs.h>
@ -43,6 +44,7 @@
#include "annotations.h"
#include "export.h"
#include "extension_constants.h"
#include "process_utility.h"
#include "ts_catalog/catalog.h"
#include "chunk.h"
@ -495,14 +497,32 @@ static DDLResult
process_alter_foreign_server(ProcessUtilityArgs *args)
{
AlterForeignServerStmt *stmt = (AlterForeignServerStmt *) args->parsetree;
ForeignServer *server = GetForeignServerByName(stmt->servername, true);
Oid fdwid = get_foreign_data_wrapper_oid(EXTENSION_FDW_NAME, false);
ListCell *lc;
if (stmt->has_version)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("operation not supported"),
errdetail("It is not possible to set a version on the data node configuration.")));
if (server != NULL && server->fdwid == fdwid)
{
if (stmt->has_version)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("version not supported"),
errdetail(
"It is not possible to set a version on the data node configuration.")));
/* Other options are validated by the FDW */
/* Options are validated by the FDW, but we need to block available option
* since that must be handled via alter_data_node(). */
foreach (lc, stmt->options)
{
DefElem *elem = lfirst(lc);
if (strcmp(elem->defname, "available") == 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot set \"available\" using ALTER SERVER"),
errhint("Use alter_data_node() to set \"available\".")));
}
}
return DDL_CONTINUE;
}

@ -190,6 +190,7 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
.names = (char *[]) {
[CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_chunk_id_node_name_key",
[CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_node_chunk_id_node_name_key",
[CHUNK_DATA_NODE_NODE_NAME_IDX] = "chunk_data_node_node_name_idx",
}
},
[TABLESPACE] = {

@ -598,6 +598,7 @@ enum
{
CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX,
CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX,
CHUNK_DATA_NODE_NODE_NAME_IDX,
_MAX_CHUNK_DATA_NODE_INDEX,
};
@ -627,6 +628,17 @@ struct FormData_chunk_data_node_node_chunk_id_node_name_idx
NameData node_name;
};
enum Anum_chunk_data_node_node_name_idx
{
Anum_chunk_data_node_name_idx_node_name = 1,
_Anum_chunk_data_node_node_name_idx_max,
};
struct FormData_chunk_data_node_node_name_idx
{
NameData node_name;
};
/************************************
*
* Tablespace table definitions
@ -1460,7 +1472,7 @@ extern TSDLLEXPORT void ts_catalog_delete_tid_only(Relation rel, ItemPointer tid
extern TSDLLEXPORT void ts_catalog_delete_tid(Relation rel, ItemPointer tid);
extern TSDLLEXPORT void ts_catalog_delete_only(Relation rel, HeapTuple tuple);
extern TSDLLEXPORT void ts_catalog_delete(Relation rel, HeapTuple tuple);
extern void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation);
extern TSDLLEXPORT void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation);
bool TSDLLEXPORT ts_catalog_scan_one(CatalogTable table, int indexid, ScanKeyData *scankey,
int num_keys, tuple_found_func tuple_found, LOCKMODE lockmode,

@ -106,14 +106,15 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data)
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
Form_chunk_data_node form = (Form_chunk_data_node) GETSTRUCT(tuple);
ForeignServer *server;
ChunkDataNode *chunk_data_node;
MemoryContext old;
server = GetForeignServerByName(NameStr(form->node_name), false);
old = MemoryContextSwitchTo(ti->mctx);
chunk_data_node = palloc(sizeof(ChunkDataNode));
memcpy(&chunk_data_node->fd, form, sizeof(FormData_chunk_data_node));
chunk_data_node->foreign_server_oid =
get_foreign_server_oid(NameStr(form->node_name), /* missing_ok = */ false);
chunk_data_node->foreign_server_oid = server->serverid;
*nodes = lappend(*nodes, chunk_data_node);
MemoryContextSwitchTo(old);
@ -339,3 +340,16 @@ ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id)
F_INT4EQ,
Int32GetDatum(chunk_id));
}
void
ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, const char *node_name)
{
it->ctx.index =
catalog_get_index(ts_catalog_get(), CHUNK_DATA_NODE, CHUNK_DATA_NODE_NODE_NAME_IDX);
ts_scan_iterator_scan_key_reset(it);
ts_scan_iterator_scan_key_init(it,
Anum_chunk_data_node_name_idx_node_name,
BTEqualStrategyNumber,
F_NAMEEQ,
CStringGetDatum(node_name));
}

@ -32,7 +32,10 @@ extern int ts_chunk_data_node_delete_by_node_name(const char *node_name);
extern TSDLLEXPORT List *
ts_chunk_data_node_scan_by_node_name_and_hypertable_id(const char *node_name, int32 hypertable_id,
MemoryContext mctx);
extern ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt);
extern void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id);
extern TSDLLEXPORT ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it,
int32 chunk_id);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it,
const char *node_name);
#endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */

@ -1302,3 +1302,26 @@ ts_copy_relation_acl(const Oid source_relid, const Oid target_relid, const Oid o
ReleaseSysCache(source_tuple);
table_close(class_rel, RowExclusiveLock);
}
bool
ts_data_node_is_available_by_server(const ForeignServer *server)
{
ListCell *lc;
foreach (lc, server->options)
{
DefElem *elem = lfirst(lc);
if (strcmp(elem->defname, "available") == 0)
return defGetBoolean(elem);
}
/* Default to available if option is not yet added */
return true;
}
bool
ts_data_node_is_available(const char *name)
{
return ts_data_node_is_available_by_server(GetForeignServerByName(name, false));
}

@ -10,6 +10,7 @@
#include <access/htup_details.h>
#include <catalog/pg_proc.h>
#include <common/int.h>
#include <foreign/foreign.h>
#include <nodes/pathnodes.h>
#include <nodes/extensible.h>
#include <utils/datetime.h>
@ -201,5 +202,7 @@ extern TSDLLEXPORT void ts_alter_table_with_event_trigger(Oid relid, Node *cmd,
bool recurse);
extern TSDLLEXPORT void ts_copy_relation_acl(const Oid source_relid, const Oid target_relid,
const Oid owner_id);
extern TSDLLEXPORT bool ts_data_node_is_available_by_server(const ForeignServer *server);
extern TSDLLEXPORT bool ts_data_node_is_available(const char *node_name);
#endif /* TIMESCALEDB_UTILS_H */

@ -13,6 +13,7 @@
#include <access/htup_details.h>
#include <access/xact.h>
#include <nodes/makefuncs.h>
#include <nodes/parsenodes.h>
#include <utils/acl.h>
#include <utils/builtins.h>
#include <utils/syscache.h>
@ -36,6 +37,7 @@
#include <errors.h>
#include <error_utils.h>
#include <hypertable_cache.h>
#include "hypercube.h"
#include "chunk.h"
#include "chunk_api.h"
@ -44,6 +46,7 @@
#include "dist_util.h"
#include "remote/dist_commands.h"
#include "ts_catalog/chunk_data_node.h"
#include "utils.h"
static bool
chunk_match_data_node_by_server(const Chunk *chunk, const ForeignServer *server)
@ -66,7 +69,7 @@ chunk_match_data_node_by_server(const Chunk *chunk, const ForeignServer *server)
}
static bool
chunk_set_foreign_server(Chunk *chunk, ForeignServer *new_server)
chunk_set_foreign_server(const Chunk *chunk, const ForeignServer *new_server)
{
Relation ftrel;
HeapTuple tuple;
@ -134,32 +137,134 @@ chunk_set_foreign_server(Chunk *chunk, ForeignServer *new_server)
return true;
}
void
chunk_update_foreign_server_if_needed(int32 chunk_id, Oid existing_server_id)
/*
* Change the data node used to query a chunk.
*
* Either switch "away" from using the given data node or switch to using it
* (depending on the "available" parameter). The function will only switch
* back to using the data node if it is the determined primary/default data
* node for the chunk according to the partitioning configuration.
*
* Return true if the chunk's data node was changed or no change was
* needed. Return false if a change should have been made but wasn't possible
* (due to, e.g., lack of replica chunks).
*/
bool
chunk_update_foreign_server_if_needed(const Chunk *chunk, Oid data_node_id, bool available)
{
ListCell *lc;
ChunkDataNode *new_server = NULL;
Chunk *chunk = ts_chunk_get_by_id(chunk_id, true);
ForeignTable *foreign_table = NULL;
ForeignServer *server = NULL;
bool should_switch_data_node = false;
ListCell *lc;
Assert(chunk->relkind == RELKIND_FOREIGN_TABLE);
foreign_table = GetForeignTable(chunk->table_id);
/* no need to update since foreign table doesn't reference server we try to remove */
if (existing_server_id != foreign_table->serverid)
return;
/* Cannot switch to other data node if only one or none assigned */
if (list_length(chunk->data_nodes) < 2)
return false;
Assert(list_length(chunk->data_nodes) > 1);
/* Nothing to do if the chunk table already has the requested data node set */
if ((!available && data_node_id != foreign_table->serverid) ||
(available && data_node_id == foreign_table->serverid))
return true;
foreach (lc, chunk->data_nodes)
if (available)
{
new_server = lfirst(lc);
if (new_server->foreign_server_oid != existing_server_id)
break;
}
Assert(new_server != NULL);
/* Switch to using the given data node, but only on chunks where the
* given node is the "default" according to partitioning */
Cache *htcache = ts_hypertable_cache_pin();
const Hypertable *ht =
ts_hypertable_cache_get_entry(htcache, chunk->hypertable_relid, CACHE_FLAG_NONE);
const Dimension *dim = hyperspace_get_closed_dimension(ht->space, 0);
chunk_set_foreign_server(chunk, GetForeignServer(new_server->foreign_server_oid));
if (dim != NULL)
{
/* For space-partitioned tables, use the current partitioning
* configuration in that dimension (dimension partition) as a
* template for picking the query data node */
const DimensionSlice *slice =
ts_hypercube_get_slice_by_dimension_id(chunk->cube, dim->fd.id);
unsigned int i;
Assert(dim->dimension_partitions);
for (i = 0; i < dim->dimension_partitions->num_partitions; i++)
{
const DimensionPartition *dp = dim->dimension_partitions->partitions[i];
/* Match the chunk with the dimension partition. Count as a
* match if the start of chunk is within the range of the
* partition. This captures both the case when the chunk
* aligns perfectly with the partition and when it is bigger
* or smaller (due to a previous partitioning change). */
if (slice->fd.range_start >= dp->range_start &&
slice->fd.range_start <= dp->range_end)
{
ListCell *lc;
/* Use the data node for queries if it is the first
* available data node in the partition's list (i.e., the
* default choice) */
foreach (lc, dp->data_nodes)
{
const char *node_name = lfirst(lc);
server = GetForeignServerByName(node_name, false);
if (ts_data_node_is_available_by_server(server))
{
should_switch_data_node = (server->serverid == data_node_id);
break;
}
}
}
}
}
else
{
/* For hypertables without a space partition, use the data node
* assignment logic to figure out whether to use the data node as
* query data node. The "default" query data node is the first in
* the list. The chunk assign logic only returns available data
* nodes. */
List *datanodes = ts_hypertable_assign_chunk_data_nodes(ht, chunk->cube);
const char *node_name = linitial(datanodes);
server = GetForeignServerByName(node_name, false);
if (server->serverid == data_node_id)
should_switch_data_node = true;
}
ts_cache_release(htcache);
}
else
{
/* Switch "away" from using the given data node. Pick the first
* "available" data node referenced by the chunk */
foreach (lc, chunk->data_nodes)
{
const ChunkDataNode *cdn = lfirst(lc);
if (cdn->foreign_server_oid != data_node_id)
{
server = GetForeignServer(cdn->foreign_server_oid);
if (ts_data_node_is_available_by_server(server))
{
should_switch_data_node = true;
break;
}
}
}
}
if (should_switch_data_node)
{
Assert(server != NULL);
chunk_set_foreign_server(chunk, server);
}
return should_switch_data_node;
}
Datum

@ -10,7 +10,8 @@
#include <fmgr.h>
#include <chunk.h>
extern void chunk_update_foreign_server_if_needed(int32 chunk_id, Oid existing_server_id);
extern bool chunk_update_foreign_server_if_needed(const Chunk *chunk, Oid data_node_id,
bool available);
extern Datum chunk_set_default_data_node(PG_FUNCTION_ARGS);
extern Datum chunk_drop_replica(PG_FUNCTION_ARGS);
extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type);

@ -1767,6 +1767,6 @@ chunk_api_call_chunk_drop_replica(const Chunk *chunk, const char *node_name, Oid
* This chunk might have this data node as primary, change that association
* if so. Then delete the chunk_id and node_name association.
*/
chunk_update_foreign_server_if_needed(chunk->fd.id, serverid);
chunk_update_foreign_server_if_needed(chunk, serverid, false);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(chunk->fd.id, node_name);
}

@ -3,9 +3,7 @@
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include "cache.h"
#include <postgres.h>
#include <access/htup_details.h>
#include <access/xact.h>
#include <catalog/namespace.h>
@ -17,22 +15,30 @@
#include <commands/defrem.h>
#include <commands/event_trigger.h>
#include <compat/compat.h>
#include <executor/tuptable.h>
#include <extension.h>
#include <fmgr.h>
#include <funcapi.h>
#include <libpq/crypt.h>
#include <miscadmin.h>
#include <nodes/makefuncs.h>
#include <nodes/parsenodes.h>
#include <nodes/nodes.h>
#include <nodes/value.h>
#include <utils/acl.h>
#include <utils/builtins.h>
#include <utils/array.h>
#include <utils/builtins.h>
#include <utils/guc.h>
#include <utils/inval.h>
#include <utils/palloc.h>
#include <utils/syscache.h>
#include "compat/compat.h"
#include "config.h"
#include "extension.h"
#include "cache.h"
#include "chunk.h"
#include "fdw/fdw.h"
#include "remote/async.h"
#include "remote/connection.h"
@ -45,7 +51,8 @@
#include "dist_util.h"
#include "utils/uuid.h"
#include "mb/pg_wchar.h"
#include "chunk.h"
#include "scan_iterator.h"
#include "ts_catalog/catalog.h"
#include "ts_catalog/chunk_data_node.h"
#include "ts_catalog/dimension_partition.h"
#include "ts_catalog/hypertable_data_node.h"
@ -678,6 +685,16 @@ get_server_port()
return pg_strtoint32(portstr);
}
static void
validate_data_node_port(int port)
{
if (port < 1 || port > PG_UINT16_MAX)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
(errmsg("invalid port number %d", port),
errhint("The port number must be between 1 and %u.", PG_UINT16_MAX))));
}
/* set_distid may need to be false for some otherwise invalid configurations
* that are useful for testing */
static Datum
@ -718,11 +735,7 @@ data_node_add_internal(PG_FUNCTION_ARGS, bool set_distid)
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
(errmsg("data node name cannot be NULL"))));
if (port < 1 || port > PG_UINT16_MAX)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
(errmsg("invalid port number %d", port),
errhint("The port number must be between 1 and %u.", PG_UINT16_MAX))));
validate_data_node_port(port);
result = get_database_info(MyDatabaseId, &database);
Assert(result);
@ -1134,8 +1147,8 @@ data_node_modify_hypertable_data_nodes(const char *node_name, List *hypertable_d
foreach (cs_lc, chunk_data_nodes)
{
ChunkDataNode *cdn = lfirst(cs_lc);
chunk_update_foreign_server_if_needed(cdn->fd.chunk_id, cdn->foreign_server_oid);
const Chunk *chunk = ts_chunk_get_by_id(cdn->fd.chunk_id, true);
chunk_update_foreign_server_if_needed(chunk, cdn->foreign_server_oid, false);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id,
NameStr(cdn->fd.node_name));
}
@ -1415,6 +1428,240 @@ data_node_detach(PG_FUNCTION_ARGS)
PG_RETURN_INT32(removed);
}
enum Anum_show_conn
{
Anum_alter_data_node_node_name = 1,
Anum_alter_data_node_host,
Anum_alter_data_node_port,
Anum_alter_data_node_database,
Anum_alter_data_node_available,
_Anum_alter_data_node_max,
};
#define Natts_alter_data_node (_Anum_alter_data_node_max - 1)
static HeapTuple
create_alter_data_node_tuple(TupleDesc tupdesc, const char *node_name, List *options)
{
Datum values[Natts_alter_data_node];
bool nulls[Natts_alter_data_node] = { false };
ListCell *lc;
MemSet(nulls, false, sizeof(nulls));
values[AttrNumberGetAttrOffset(Anum_alter_data_node_node_name)] = CStringGetDatum(node_name);
foreach (lc, options)
{
DefElem *elem = lfirst(lc);
if (strcmp("host", elem->defname) == 0)
{
values[AttrNumberGetAttrOffset(Anum_alter_data_node_host)] =
CStringGetTextDatum(defGetString(elem));
}
else if (strcmp("port", elem->defname) == 0)
{
int port = atoi(defGetString(elem));
values[AttrNumberGetAttrOffset(Anum_alter_data_node_port)] = Int32GetDatum(port);
}
else if (strcmp("dbname", elem->defname) == 0)
{
values[AttrNumberGetAttrOffset(Anum_alter_data_node_database)] =
CStringGetDatum(defGetString(elem));
}
else if (strcmp("available", elem->defname) == 0)
{
values[AttrNumberGetAttrOffset(Anum_alter_data_node_available)] =
BoolGetDatum(defGetBoolean(elem));
}
}
return heap_form_tuple(tupdesc, values, nulls);
}
/*
* Switch data node to use for queries on chunks.
*
* When available=false it will switch from the given data node to another
* one, but only if the data node is currently used for queries on the chunk.
*
* When available=true it will switch to the given data node, if it is
* "primary" for the chunk (according to the current partitioning
* configuration).
*/
static void
switch_data_node_on_chunks(const ForeignServer *datanode, bool available)
{
unsigned int failed_update_count = 0;
ScanIterator it = ts_chunk_data_nodes_scan_iterator_create(CurrentMemoryContext);
ts_chunk_data_nodes_scan_iterator_set_node_name(&it, datanode->servername);
/* Scan for chunks that reference the given data node */
ts_scanner_foreach(&it)
{
TupleTableSlot *slot = ts_scan_iterator_slot(&it);
bool PG_USED_FOR_ASSERTS_ONLY isnull = false;
Datum chunk_id = slot_getattr(slot, Anum_chunk_data_node_chunk_id, &isnull);
Assert(!isnull);
const Chunk *chunk = ts_chunk_get_by_id(DatumGetInt32(chunk_id), true);
if (!chunk_update_foreign_server_if_needed(chunk, datanode->serverid, available))
failed_update_count++;
}
if (!available && failed_update_count > 0)
elog(WARNING, "could not switch data node on %u chunks", failed_update_count);
ts_scan_iterator_close(&it);
}
/*
* Append new data node options.
*
* When setting options via AlterForeignServer(), the defelem list must
* account for whether the an option already exists (is set) in the current
* options or it is newly added. These are different operations on a foreign
* server.
*
* Any options that already exist are purged from the current_options list so
* that only the options not set or added remains. This list can be merged
* with the new options to produce the full list of options (new and old).
*/
static List *
append_data_node_option(List *new_options, List **current_options, const char *name, Node *value)
{
DefElem *elem;
ListCell *lc;
bool option_found = false;
#if PG13_LT
ListCell *prev_lc = NULL;
#endif
foreach (lc, *current_options)
{
elem = lfirst(lc);
if (strcmp(elem->defname, name) == 0)
{
option_found = true;
/* Remove the option which is replaced so that the remaining
* options can be merged later into an updated list */
#if PG13_GE
*current_options = list_delete_cell(*current_options, lc);
#else
*current_options = list_delete_cell(*current_options, lc, prev_lc);
#endif
break;
}
#if PG13_LT
prev_lc = lc;
#endif
}
elem = makeDefElemExtended(NULL,
pstrdup(name),
value,
option_found ? DEFELEM_SET : DEFELEM_ADD,
-1);
return lappend(new_options, elem);
}
/*
* Alter a data node.
*
* Change the configuration of a data node, including host, port, and
* database.
*
* Can also be used to mark a data node "unavailable", which ensures it is no
* longer used for reads as long as there are replica chunks on other data
* nodes to use for reads instead. If it is not possible to fail over all
* chunks, a warning will be raised.
*/
Datum
data_node_alter(PG_FUNCTION_ARGS)
{
const char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
const char *host = PG_ARGISNULL(1) ? NULL : TextDatumGetCString(PG_GETARG_TEXT_P(1));
const char *database = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2));
int port = PG_ARGISNULL(3) ? -1 : PG_GETARG_INT32(3);
bool available_is_null = PG_ARGISNULL(4);
bool available = available_is_null ? true : PG_GETARG_BOOL(4);
ForeignServer *server = NULL;
List *current_options = NIL;
List *options = NIL;
TupleDesc tupdesc;
AlterForeignServerStmt alter_server_stmt = {
.type = T_AlterForeignServerStmt,
.servername = node_name ? pstrdup(node_name) : NULL,
.has_version = false,
.version = NULL,
.options = NIL,
};
TS_PREVENT_FUNC_IF_READ_ONLY();
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
tupdesc = BlessTupleDesc(tupdesc);
/* Check if a data node with the given name actually exists, or raise an error. */
server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, false /* missing_ok */);
if (host == NULL && database == NULL && port == -1 && available_is_null)
PG_RETURN_DATUM(
HeapTupleGetDatum(create_alter_data_node_tuple(tupdesc, node_name, server->options)));
current_options = list_copy(server->options);
if (host != NULL)
options = append_data_node_option(options,
&current_options,
"host",
(Node *) makeString((char *) host));
if (database != NULL)
options = append_data_node_option(options,
&current_options,
"dbname",
(Node *) makeString((char *) database));
if (port != -1)
{
validate_data_node_port(port);
options =
append_data_node_option(options, &current_options, "port", (Node *) makeInteger(port));
}
if (!available_is_null)
options = append_data_node_option(options,
&current_options,
"available",
(Node *) makeString(available ? "true" : "false"));
alter_server_stmt.options = options;
AlterForeignServer(&alter_server_stmt);
/* Make changes to the data node (foreign server object) visible so that
* the changes are present when we switch "primary" data node on chunks */
CommandCounterIncrement();
/* Update the currently used query data node on all affected chunks to
* reflect the new status of the data node */
switch_data_node_on_chunks(server, available);
/* Add updated options last as they will take precedence over old options
* when creating the result tuple. */
options = list_concat(current_options, options);
PG_RETURN_DATUM(HeapTupleGetDatum(create_alter_data_node_tuple(tupdesc, node_name, options)));
}
/*
* Drop a data node's database.
*

@ -28,6 +28,7 @@ extern Datum data_node_add(PG_FUNCTION_ARGS);
extern Datum data_node_delete(PG_FUNCTION_ARGS);
extern Datum data_node_attach(PG_FUNCTION_ARGS);
extern Datum data_node_detach(PG_FUNCTION_ARGS);
extern Datum data_node_alter(PG_FUNCTION_ARGS);
extern Datum data_node_block_new_chunks(PG_FUNCTION_ARGS);
extern Datum data_node_allow_new_chunks(PG_FUNCTION_ARGS);
extern List *data_node_get_node_name_list_with_aclcheck(AclMode mode, bool fail_on_aclcheck);

@ -132,6 +132,11 @@ option_validate(List *options_list, Oid catalog)
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s requires a non-negative integer value", def->defname)));
}
else if (strcmp(def->defname, "available") == 0)
{
/* This will throw an error if not a boolean */
defGetBoolean(def);
}
}
}
@ -154,6 +159,7 @@ init_ts_fdw_options(void)
/* fetch_size is available on both foreign data wrapper and server */
{ "fetch_size", ForeignDataWrapperRelationId },
{ "fetch_size", ForeignServerRelationId },
{ "available", ForeignServerRelationId },
{ NULL, InvalidOid }
};

@ -190,6 +190,7 @@ CrossModuleFunctions tsl_cm_functions = {
.data_node_attach = data_node_attach,
.data_node_ping = data_node_ping,
.data_node_detach = data_node_detach,
.data_node_alter = data_node_alter,
.data_node_allow_new_chunks = data_node_allow_new_chunks,
.data_node_block_new_chunks = data_node_block_new_chunks,
.chunk_set_default_data_node = chunk_set_default_data_node,

@ -105,7 +105,10 @@ ROLLBACK;
\set ON_ERROR_STOP 0
-- Should not be possible to set a version:
ALTER SERVER data_node_3 VERSION '2';
ERROR: operation not supported
ERROR: version not supported
-- Should not be possible to set "available"
ALTER SERVER data_node_3 OPTIONS (SET available 'true');
ERROR: cannot set "available" using ALTER SERVER
\set ON_ERROR_STOP 1
-- Make sure changing server owner is allowed
ALTER SERVER data_node_1 OWNER TO CURRENT_USER;
@ -1591,3 +1594,367 @@ DROP DATABASE :DN_DBNAME_3;
DROP DATABASE :DN_DBNAME_4;
DROP DATABASE :DN_DBNAME_5;
DROP DATABASE :DN_DBNAME_6;
-----------------------------------------------
-- Test alter_data_node()
-----------------------------------------------
SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_1', host => 'localhost', database => :'DN_DBNAME_1');
node_name | database | node_created | database_created | extension_created
-------------+----------------+--------------+------------------+-------------------
data_node_1 | db_data_node_1 | t | t | t
(1 row)
SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_2', host => 'localhost', database => :'DN_DBNAME_2');
node_name | database | node_created | database_created | extension_created
-------------+----------------+--------------+------------------+-------------------
data_node_2 | db_data_node_2 | t | t | t
(1 row)
SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_3', host => 'localhost', database => :'DN_DBNAME_3');
node_name | database | node_created | database_created | extension_created
-------------+----------------+--------------+------------------+-------------------
data_node_3 | db_data_node_3 | t | t | t
(1 row)
GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO :ROLE_1;
SET ROLE :ROLE_1;
CREATE TABLE hyper1 (time timestamptz, location int, temp float);
CREATE TABLE hyper2 (LIKE hyper1);
CREATE TABLE hyper3 (LIKE hyper1);
CREATE TABLE hyper_1dim (LIKE hyper1);
SELECT create_distributed_hypertable('hyper1', 'time', 'location', replication_factor=>1);
NOTICE: adding not-null constraint to column "time"
create_distributed_hypertable
-------------------------------
(10,public,hyper1,t)
(1 row)
SELECT create_distributed_hypertable('hyper2', 'time', 'location', replication_factor=>2);
NOTICE: adding not-null constraint to column "time"
create_distributed_hypertable
-------------------------------
(11,public,hyper2,t)
(1 row)
SELECT create_distributed_hypertable('hyper3', 'time', 'location', replication_factor=>3);
NOTICE: adding not-null constraint to column "time"
create_distributed_hypertable
-------------------------------
(12,public,hyper3,t)
(1 row)
SELECT create_distributed_hypertable('hyper_1dim', 'time', chunk_time_interval=>interval '2 days', replication_factor=>3);
NOTICE: adding not-null constraint to column "time"
create_distributed_hypertable
-------------------------------
(13,public,hyper_1dim,t)
(1 row)
SELECT setseed(1);
setseed
---------
(1 row)
INSERT INTO hyper1
SELECT t, (abs(timestamp_hash(t::timestamp)) % 3) + 1, random() * 30
FROM generate_series('2022-01-01 00:00:00'::timestamptz, '2022-01-05 00:00:00', '1 h') t;
INSERT INTO hyper2 SELECT * FROM hyper1;
INSERT INTO hyper3 SELECT * FROM hyper1;
INSERT INTO hyper_1dim SELECT * FROM hyper1;
-- create view to see the data nodes and default data node of all
-- chunks
CREATE VIEW chunk_query_data_node AS
SELECT ch.hypertable_name, format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass AS chunk, ch.data_nodes, fs.srvname default_data_node
FROM timescaledb_information.chunks ch
INNER JOIN pg_foreign_table ft ON (format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass = ft.ftrelid)
INNER JOIN pg_foreign_server fs ON (ft.ftserver = fs.oid)
ORDER BY 1, 2;
SELECT * FROM chunk_query_data_node;
hypertable_name | chunk | data_nodes | default_data_node
-----------------+-----------------------------------------------+---------------------------------------+-------------------
hyper1 | _timescaledb_internal._dist_hyper_10_12_chunk | {data_node_1} | data_node_1
hyper1 | _timescaledb_internal._dist_hyper_10_13_chunk | {data_node_2} | data_node_2
hyper1 | _timescaledb_internal._dist_hyper_10_14_chunk | {data_node_3} | data_node_3
hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_1
hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2
hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3
hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1
hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2
hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3
hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2
hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3
hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1
(12 rows)
-- test alter_data_node permissions
\set ON_ERROR_STOP 0
-- must be owner to alter a data node
SELECT * FROM alter_data_node('data_node_1', available=>false);
ERROR: must be owner of foreign server data_node_1
SELECT * FROM alter_data_node('data_node_1', port=>8989);
ERROR: must be owner of foreign server data_node_1
\set ON_ERROR_STOP 1
-- query some data from all hypertables to show its working before
-- simulating the node being down
SELECT time, location FROM hyper1 ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
SELECT time, location FROM hyper2 ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
SELECT time, location FROM hyper3 ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
-- simulate a node being down by renaming the database for
-- data_node_1, but for that to work we need to reconnect the backend
-- to clear out the connection cache
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
ALTER DATABASE :DN_DBNAME_1 RENAME TO data_node_1_unavailable;
WARNING: you need to manually restart any running background workers after this command
\set ON_ERROR_STOP 0
SELECT time, location FROM hyper1 ORDER BY time LIMIT 1;
ERROR: could not connect to "data_node_1"
SELECT time, location FROM hyper2 ORDER BY time LIMIT 1;
ERROR: could not connect to "data_node_1"
SELECT time, location FROM hyper3 ORDER BY time LIMIT 1;
ERROR: could not connect to "data_node_1"
SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1;
ERROR: could not connect to "data_node_1"
\set ON_ERROR_STOP 1
-- alter the node as not available
SELECT * FROM alter_data_node('data_node_1', available=>false);
WARNING: could not switch data node on 1 chunks
node_name | host | port | database | available
-------------+-----------+-------+----------------+-----------
data_node_1 | localhost | 55432 | db_data_node_1 | f
(1 row)
-- the node that is not available for reads should no longer be
-- query data node for chunks, except for those that have no
-- alternative (i.e., the chunk only has one data node).
SELECT * FROM chunk_query_data_node;
hypertable_name | chunk | data_nodes | default_data_node
-----------------+-----------------------------------------------+---------------------------------------+-------------------
hyper1 | _timescaledb_internal._dist_hyper_10_12_chunk | {data_node_1} | data_node_1
hyper1 | _timescaledb_internal._dist_hyper_10_13_chunk | {data_node_2} | data_node_2
hyper1 | _timescaledb_internal._dist_hyper_10_14_chunk | {data_node_3} | data_node_3
hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_2
hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2
hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3
hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2
hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2
hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3
hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2
hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3
hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2
(12 rows)
-- queries should work again, except on hyper1 which has no
-- replication
\set ON_ERROR_STOP 0
SELECT time, location FROM hyper1 ORDER BY time LIMIT 1;
ERROR: could not connect to "data_node_1"
\set ON_ERROR_STOP 1
SELECT time, location FROM hyper2 ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
SELECT time, location FROM hyper3 ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
-- inserts should fail if going to chunks that exist on the
-- unavailable data node
\set ON_ERROR_STOP 0
INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1);
ERROR: could not connect to "data_node_1"
INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1);
ERROR: could not connect to "data_node_1"
\set ON_ERROR_STOP 1
-- inserts should work if going to a new chunk
INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1);
WARNING: insufficient number of data nodes
INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1);
WARNING: insufficient number of data nodes
SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks
WHERE hypertable_name IN ('hyper3', 'hyper_1dim')
AND range_start::timestamptz <= '2022-01-10 00:00:00'
AND range_end::timestamptz > '2022-01-10 00:00:00'
ORDER BY 1, 2;
hypertable_name | chunk_name | data_nodes
-----------------+-------------------------+---------------------------
hyper3 | _dist_hyper_12_24_chunk | {data_node_2,data_node_3}
hyper_1dim | _dist_hyper_13_25_chunk | {data_node_2,data_node_3}
(2 rows)
-- re-enable the data node and the chunks should "switch back" to
-- using the data node. However, only the chunks for which the node is
-- "primary" should switch to using the data node for queries
ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1;
WARNING: you need to manually restart any running background workers after this command
SELECT * FROM alter_data_node('data_node_1', available=>true);
node_name | host | port | database | available
-------------+-----------+-------+----------------+-----------
data_node_1 | localhost | 55432 | db_data_node_1 | t
(1 row)
SELECT * FROM chunk_query_data_node;
hypertable_name | chunk | data_nodes | default_data_node
-----------------+-----------------------------------------------+---------------------------------------+-------------------
hyper1 | _timescaledb_internal._dist_hyper_10_12_chunk | {data_node_1} | data_node_1
hyper1 | _timescaledb_internal._dist_hyper_10_13_chunk | {data_node_2} | data_node_2
hyper1 | _timescaledb_internal._dist_hyper_10_14_chunk | {data_node_3} | data_node_3
hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_1
hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2
hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3
hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1
hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2
hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3
hyper3 | _timescaledb_internal._dist_hyper_12_24_chunk | {data_node_2,data_node_3} | data_node_2
hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2
hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3
hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1
hyper_1dim | _timescaledb_internal._dist_hyper_13_25_chunk | {data_node_2,data_node_3} | data_node_2
(14 rows)
--queries should work again on all tables
SELECT time, location FROM hyper1 ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
SELECT time, location FROM hyper2 ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
SELECT time, location FROM hyper3 ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1;
time | location
------------------------------+----------
Sat Jan 01 00:00:00 2022 PST | 1
(1 row)
-- save old port so that we can restore connectivity after we test
-- changing the connection information for the data node
WITH options AS (
SELECT unnest(options) opt
FROM timescaledb_information.data_nodes
WHERE node_name = 'data_node_1'
)
SELECT split_part(opt, '=', 2) AS old_port
FROM options WHERE opt LIKE 'port%' \gset
-- also test altering host, port and database
SELECT node_name, options FROM timescaledb_information.data_nodes;
node_name | options
-------------+------------------------------------------------------------------
data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2}
data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3}
data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true}
(3 rows)
SELECT * FROM alter_data_node('data_node_1', available=>true, host=>'foo.bar', port=>8989, database=>'new_db');
node_name | host | port | database | available
-------------+---------+------+----------+-----------
data_node_1 | foo.bar | 8989 | new_db | t
(1 row)
SELECT node_name, options FROM timescaledb_information.data_nodes;
node_name | options
-------------+-------------------------------------------------------
data_node_1 | {host=foo.bar,port=8989,dbname=new_db,available=true}
data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2}
data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3}
(3 rows)
-- just show current options:
SELECT * FROM alter_data_node('data_node_1');
node_name | host | port | database | available
-------------+---------+------+----------+-----------
data_node_1 | foo.bar | 8989 | new_db | t
(1 row)
\set ON_ERROR_STOP 0
-- test some error cases
SELECT * FROM alter_data_node(NULL);
ERROR: data node name cannot be NULL
SELECT * FROM alter_data_node('does_not_exist');
ERROR: server "does_not_exist" does not exist
SELECT * FROM alter_data_node('data_node_1', port=>89000);
ERROR: invalid port number 89000
-- cannot delete data node with "drop_database" since configuration is wrong
SELECT delete_data_node('data_node_1', drop_database=>true);
ERROR: could not connect to data node "data_node_1"
\set ON_ERROR_STOP 1
-- restore configuration for data_node_1
SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, database=>:'DN_DBNAME_1');
node_name | host | port | database | available
-------------+-----------+-------+----------------+-----------
data_node_1 | localhost | 55432 | db_data_node_1 | t
(1 row)
SELECT node_name, options FROM timescaledb_information.data_nodes;
node_name | options
-------------+------------------------------------------------------------------
data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true}
data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2}
data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3}
(3 rows)
DROP TABLE hyper1;
DROP TABLE hyper2;
DROP TABLE hyper3;
DROP TABLE hyper_1dim;
DROP VIEW chunk_query_data_node;
-- create new session to clear out connection cache
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
SELECT delete_data_node('data_node_1', drop_database=>true);
delete_data_node
------------------
t
(1 row)
SELECT delete_data_node('data_node_2', drop_database=>true);
delete_data_node
------------------
t
(1 row)
SELECT delete_data_node('data_node_3', drop_database=>true);
delete_data_node
------------------
t
(1 row)

@ -155,6 +155,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc,boolean,text)
add_reorder_policy(regclass,name,boolean,timestamp with time zone,text)
add_retention_policy(regclass,"any",boolean,interval,timestamp with time zone,text)
alter_data_node(name,text,name,integer,boolean)
alter_job(integer,interval,interval,integer,interval,boolean,jsonb,timestamp with time zone,boolean,regproc)
approximate_row_count(regclass)
attach_data_node(name,regclass,boolean,boolean)

@ -79,6 +79,8 @@ ROLLBACK;
\set ON_ERROR_STOP 0
-- Should not be possible to set a version:
ALTER SERVER data_node_3 VERSION '2';
-- Should not be possible to set "available"
ALTER SERVER data_node_3 OPTIONS (SET available 'true');
\set ON_ERROR_STOP 1
-- Make sure changing server owner is allowed
@ -777,3 +779,161 @@ DROP DATABASE :DN_DBNAME_3;
DROP DATABASE :DN_DBNAME_4;
DROP DATABASE :DN_DBNAME_5;
DROP DATABASE :DN_DBNAME_6;
-----------------------------------------------
-- Test alter_data_node()
-----------------------------------------------
SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_1', host => 'localhost', database => :'DN_DBNAME_1');
SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_2', host => 'localhost', database => :'DN_DBNAME_2');
SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_3', host => 'localhost', database => :'DN_DBNAME_3');
GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO :ROLE_1;
SET ROLE :ROLE_1;
CREATE TABLE hyper1 (time timestamptz, location int, temp float);
CREATE TABLE hyper2 (LIKE hyper1);
CREATE TABLE hyper3 (LIKE hyper1);
CREATE TABLE hyper_1dim (LIKE hyper1);
SELECT create_distributed_hypertable('hyper1', 'time', 'location', replication_factor=>1);
SELECT create_distributed_hypertable('hyper2', 'time', 'location', replication_factor=>2);
SELECT create_distributed_hypertable('hyper3', 'time', 'location', replication_factor=>3);
SELECT create_distributed_hypertable('hyper_1dim', 'time', chunk_time_interval=>interval '2 days', replication_factor=>3);
SELECT setseed(1);
INSERT INTO hyper1
SELECT t, (abs(timestamp_hash(t::timestamp)) % 3) + 1, random() * 30
FROM generate_series('2022-01-01 00:00:00'::timestamptz, '2022-01-05 00:00:00', '1 h') t;
INSERT INTO hyper2 SELECT * FROM hyper1;
INSERT INTO hyper3 SELECT * FROM hyper1;
INSERT INTO hyper_1dim SELECT * FROM hyper1;
-- create view to see the data nodes and default data node of all
-- chunks
CREATE VIEW chunk_query_data_node AS
SELECT ch.hypertable_name, format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass AS chunk, ch.data_nodes, fs.srvname default_data_node
FROM timescaledb_information.chunks ch
INNER JOIN pg_foreign_table ft ON (format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass = ft.ftrelid)
INNER JOIN pg_foreign_server fs ON (ft.ftserver = fs.oid)
ORDER BY 1, 2;
SELECT * FROM chunk_query_data_node;
-- test alter_data_node permissions
\set ON_ERROR_STOP 0
-- must be owner to alter a data node
SELECT * FROM alter_data_node('data_node_1', available=>false);
SELECT * FROM alter_data_node('data_node_1', port=>8989);
\set ON_ERROR_STOP 1
-- query some data from all hypertables to show its working before
-- simulating the node being down
SELECT time, location FROM hyper1 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper2 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper3 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1;
-- simulate a node being down by renaming the database for
-- data_node_1, but for that to work we need to reconnect the backend
-- to clear out the connection cache
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
ALTER DATABASE :DN_DBNAME_1 RENAME TO data_node_1_unavailable;
\set ON_ERROR_STOP 0
SELECT time, location FROM hyper1 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper2 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper3 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1;
\set ON_ERROR_STOP 1
-- alter the node as not available
SELECT * FROM alter_data_node('data_node_1', available=>false);
-- the node that is not available for reads should no longer be
-- query data node for chunks, except for those that have no
-- alternative (i.e., the chunk only has one data node).
SELECT * FROM chunk_query_data_node;
-- queries should work again, except on hyper1 which has no
-- replication
\set ON_ERROR_STOP 0
SELECT time, location FROM hyper1 ORDER BY time LIMIT 1;
\set ON_ERROR_STOP 1
SELECT time, location FROM hyper2 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper3 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1;
-- inserts should fail if going to chunks that exist on the
-- unavailable data node
\set ON_ERROR_STOP 0
INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1);
INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1);
\set ON_ERROR_STOP 1
-- inserts should work if going to a new chunk
INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1);
INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1);
SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks
WHERE hypertable_name IN ('hyper3', 'hyper_1dim')
AND range_start::timestamptz <= '2022-01-10 00:00:00'
AND range_end::timestamptz > '2022-01-10 00:00:00'
ORDER BY 1, 2;
-- re-enable the data node and the chunks should "switch back" to
-- using the data node. However, only the chunks for which the node is
-- "primary" should switch to using the data node for queries
ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1;
SELECT * FROM alter_data_node('data_node_1', available=>true);
SELECT * FROM chunk_query_data_node;
--queries should work again on all tables
SELECT time, location FROM hyper1 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper2 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper3 ORDER BY time LIMIT 1;
SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1;
-- save old port so that we can restore connectivity after we test
-- changing the connection information for the data node
WITH options AS (
SELECT unnest(options) opt
FROM timescaledb_information.data_nodes
WHERE node_name = 'data_node_1'
)
SELECT split_part(opt, '=', 2) AS old_port
FROM options WHERE opt LIKE 'port%' \gset
-- also test altering host, port and database
SELECT node_name, options FROM timescaledb_information.data_nodes;
SELECT * FROM alter_data_node('data_node_1', available=>true, host=>'foo.bar', port=>8989, database=>'new_db');
SELECT node_name, options FROM timescaledb_information.data_nodes;
-- just show current options:
SELECT * FROM alter_data_node('data_node_1');
\set ON_ERROR_STOP 0
-- test some error cases
SELECT * FROM alter_data_node(NULL);
SELECT * FROM alter_data_node('does_not_exist');
SELECT * FROM alter_data_node('data_node_1', port=>89000);
-- cannot delete data node with "drop_database" since configuration is wrong
SELECT delete_data_node('data_node_1', drop_database=>true);
\set ON_ERROR_STOP 1
-- restore configuration for data_node_1
SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, database=>:'DN_DBNAME_1');
SELECT node_name, options FROM timescaledb_information.data_nodes;
DROP TABLE hyper1;
DROP TABLE hyper2;
DROP TABLE hyper3;
DROP TABLE hyper_1dim;
DROP VIEW chunk_query_data_node;
-- create new session to clear out connection cache
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
SELECT delete_data_node('data_node_1', drop_database=>true);
SELECT delete_data_node('data_node_2', drop_database=>true);
SELECT delete_data_node('data_node_3', drop_database=>true);