diff --git a/src/chunk.c b/src/chunk.c index 3b1079508..6f399392e 100644 --- a/src/chunk.c +++ b/src/chunk.c @@ -1475,7 +1475,7 @@ ts_chunk_find_for_point(const Hypertable *ht, const Point *p) * Create a chunk through insertion of a tuple at a given point. */ Chunk * -ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *schema, +ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, const char *schema, const char *prefix) { /* @@ -1506,6 +1506,8 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *sche * release the lock early. */ UnlockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock); + if (found) + *found = true; return chunk; } @@ -1517,11 +1519,15 @@ ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *sche chunk = chunk_resurrect(ht, chunk_id); if (chunk != NULL) { + if (found) + *found = true; return chunk; } } /* Create the chunk normally. */ + if (found) + *found = false; if (hypertable_is_distributed_member(ht)) ereport(ERROR, (errcode(ERRCODE_TS_INTERNAL_ERROR), diff --git a/src/chunk.h b/src/chunk.h index 517444b34..5240f7092 100644 --- a/src/chunk.h +++ b/src/chunk.h @@ -145,8 +145,8 @@ typedef struct DisplayKeyData extern void ts_chunk_formdata_fill(FormData_chunk *fd, const TupleInfo *ti); extern Chunk *ts_chunk_find_for_point(const Hypertable *ht, const Point *p); -extern Chunk *ts_chunk_create_for_point(const Hypertable *ht, const Point *p, const char *schema, - const char *prefix); +extern Chunk *ts_chunk_create_for_point(const Hypertable *ht, const Point *p, bool *found, + const char *schema, const char *prefix); List *ts_chunk_id_find_in_subspace(Hypertable *ht, List *dimension_vecs); extern TSDLLEXPORT Chunk *ts_chunk_create_base(int32 id, int16 num_constraints, const char relkind); diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index f585ce3fc..05891cfb4 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -387,6 +387,13 @@ update_compressed_chunk_relstats_default(Oid uncompressed_relid, Oid compressed_ error_no_default_fn_community(); } +static void +dist_update_stale_chunk_metadata_default(Chunk *new_chunk, List *chunk_data_nodes) +{ + error_no_default_fn_community(); + pg_unreachable(); +} + TS_FUNCTION_INFO_V1(ts_tsl_loaded); PGDLLEXPORT Datum @@ -541,6 +548,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .dist_remote_chunk_info = error_no_default_fn_pg_community, .dist_remote_compressed_chunk_info = error_no_default_fn_pg_community, .dist_remote_hypertable_index_info = error_no_default_fn_pg_community, + .dist_update_stale_chunk_metadata = dist_update_stale_chunk_metadata_default, .validate_as_data_node = error_no_default_fn_community, .func_call_on_data_nodes = func_call_on_data_nodes_default, .chunk_get_relstats = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index f05808301..0e3a04eab 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -188,6 +188,7 @@ typedef struct CrossModuleFunctions PGFunction dist_remote_chunk_info; PGFunction dist_remote_compressed_chunk_info; PGFunction dist_remote_hypertable_index_info; + void (*dist_update_stale_chunk_metadata)(Chunk *new_chunk, List *chunk_data_nodes); void (*validate_as_data_node)(void); void (*func_call_on_data_nodes)(FunctionCallInfo fcinfo, List *data_node_oids); PGFunction distributed_exec; diff --git a/src/hypertable.c b/src/hypertable.c index 4bf9c0e59..3dd3f58f3 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -1067,12 +1067,13 @@ hypertable_chunk_store_add(const Hypertable *h, const Chunk *input_chunk) * Create a chunk for the point, given that it does not exist yet. */ Chunk * -ts_hypertable_create_chunk_for_point(const Hypertable *h, const Point *point) +ts_hypertable_create_chunk_for_point(const Hypertable *h, const Point *point, bool *found) { Assert(ts_subspace_store_get(h->chunk_cache, point) == NULL); Chunk *chunk = ts_chunk_create_for_point(h, point, + found, NameStr(h->fd.associated_schema_name), NameStr(h->fd.associated_table_prefix)); diff --git a/src/hypertable.h b/src/hypertable.h index 8d7b11e2e..30cbcecb7 100644 --- a/src/hypertable.h +++ b/src/hypertable.h @@ -135,7 +135,7 @@ extern TSDLLEXPORT int32 ts_hypertable_relid_to_id(Oid relid); extern TSDLLEXPORT Chunk *ts_hypertable_find_chunk_for_point(const Hypertable *h, const Point *point); extern TSDLLEXPORT Chunk *ts_hypertable_create_chunk_for_point(const Hypertable *h, - const Point *point); + const Point *point, bool *found); extern Oid ts_hypertable_relid(RangeVar *rv); extern TSDLLEXPORT bool ts_is_hypertable(Oid relid); extern bool ts_hypertable_has_tablespace(const Hypertable *ht, Oid tspc_oid); diff --git a/src/nodes/chunk_dispatch.c b/src/nodes/chunk_dispatch.c index 05eaef65e..51eebab01 100644 --- a/src/nodes/chunk_dispatch.c +++ b/src/nodes/chunk_dispatch.c @@ -14,9 +14,11 @@ #include "compat/compat.h" #include "chunk_dispatch.h" #include "chunk_insert_state.h" +#include "errors.h" #include "subspace_store.h" #include "dimension.h" #include "guc.h" +#include "ts_catalog/chunk_data_node.h" ChunkDispatch * ts_chunk_dispatch_create(Hypertable *ht, EState *estate, int eflags) @@ -144,10 +146,31 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point, * locking the hypertable. This serves as a fast path for the usual case * where the chunk already exists. */ + bool found; Chunk *new_chunk = ts_hypertable_find_chunk_for_point(dispatch->hypertable, point); if (new_chunk == NULL) { - new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point); + new_chunk = ts_hypertable_create_chunk_for_point(dispatch->hypertable, point, &found); + } + else + found = true; + + /* get the filtered list of "available" DNs for this chunk but only if it's replicated */ + if (found && dispatch->hypertable->fd.replication_factor > 1) + { + List *chunk_data_nodes = + ts_chunk_data_node_scan_by_chunk_id_filter(new_chunk->fd.id, CurrentMemoryContext); + + /* + * If the chunk was not created as part of this insert, we need to check whether any + * of the chunk's data nodes are currently unavailable and in that case consider the + * chunk stale on those data nodes. Do that by removing the AN's chunk-datanode + * mapping for the unavailable data nodes. + */ + if (dispatch->hypertable->fd.replication_factor > list_length(chunk_data_nodes)) + ts_cm_functions->dist_update_stale_chunk_metadata(new_chunk, chunk_data_nodes); + + list_free(chunk_data_nodes); } if (NULL == new_chunk) diff --git a/src/ts_catalog/chunk_data_node.c b/src/ts_catalog/chunk_data_node.c index 78e3afd19..ec1d1eb7c 100644 --- a/src/ts_catalog/chunk_data_node.c +++ b/src/ts_catalog/chunk_data_node.c @@ -14,6 +14,9 @@ #include #include "ts_catalog/chunk_data_node.h" +#include "cache.h" +#include "hypercube.h" +#include "hypertable_cache.h" #include "scanner.h" #include "chunk.h" @@ -124,6 +127,37 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data) return SCAN_CONTINUE; } +/* return a filtered list of "available" ChunkDataNode entries */ +static ScanTupleResult +chunk_data_node_tuple_found_filter(TupleInfo *ti, void *data) +{ + List **nodes = data; + bool should_free; + HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); + Form_chunk_data_node form = (Form_chunk_data_node) GETSTRUCT(tuple); + ForeignServer *server; + + server = GetForeignServerByName(NameStr(form->node_name), false); + + if (ts_data_node_is_available_by_server(server)) + { + ChunkDataNode *chunk_data_node; + MemoryContext old; + + old = MemoryContextSwitchTo(ti->mctx); + chunk_data_node = palloc(sizeof(ChunkDataNode)); + memcpy(&chunk_data_node->fd, form, sizeof(FormData_chunk_data_node)); + chunk_data_node->foreign_server_oid = server->serverid; + *nodes = lappend(*nodes, chunk_data_node); + MemoryContextSwitchTo(old); + } + + if (should_free) + heap_freetuple(tuple); + + return SCAN_CONTINUE; +} + static int ts_chunk_data_node_scan_by_chunk_id_and_node_internal(int32 chunk_id, const char *node_name, bool scan_by_remote_chunk_id, @@ -210,6 +244,22 @@ ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx) return chunk_data_nodes; } +/* Returns a filtered List of available ChunkDataNode structs. */ +List * +ts_chunk_data_node_scan_by_chunk_id_filter(int32 chunk_id, MemoryContext mctx) +{ + List *chunk_data_nodes = NIL; + + ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id, + NULL, + false, + chunk_data_node_tuple_found_filter, + &chunk_data_nodes, + AccessShareLock, + mctx); + return chunk_data_nodes; +} + static ChunkDataNode * chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name, bool scan_by_remote_chunk_id, MemoryContext mctx) diff --git a/src/ts_catalog/chunk_data_node.h b/src/ts_catalog/chunk_data_node.h index 83a35c785..ecfbef900 100644 --- a/src/ts_catalog/chunk_data_node.h +++ b/src/ts_catalog/chunk_data_node.h @@ -7,6 +7,7 @@ #define TIMESCALEDB_CHUNK_DATA_NODE_H #include "ts_catalog/catalog.h" +#include "chunk.h" #include "export.h" #include "scan_iterator.h" @@ -17,6 +18,8 @@ typedef struct ChunkDataNode } ChunkDataNode; extern TSDLLEXPORT List *ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx); +extern TSDLLEXPORT List *ts_chunk_data_node_scan_by_chunk_id_filter(int32 chunk_id, + MemoryContext mctx); extern TSDLLEXPORT ChunkDataNode * ts_chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name, MemoryContext mctx); @@ -37,5 +40,4 @@ extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanItera int32 chunk_id); extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, const char *node_name); - #endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */ diff --git a/tsl/src/chunk.c b/tsl/src/chunk.c index cc7d2d90a..a269917f7 100644 --- a/tsl/src/chunk.c +++ b/tsl/src/chunk.c @@ -765,3 +765,72 @@ chunk_drop_stale_chunks(PG_FUNCTION_ARGS) ts_chunk_drop_stale_chunks(node_name, chunks_array); PG_RETURN_VOID(); } +/* + * Update and refresh the DN list for a given chunk. We remove metadata for this chunk + * for unavailable DNs + */ +void +chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes) +{ + List *serveroids = NIL, *removeoids = NIL; + ChunkDataNode *cdn; + ListCell *lc; + + /* check that at least one data node is available for this chunk on the AN */ + if (chunk_data_nodes == NIL) + ereport(ERROR, + (errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES), + (errmsg("insufficient number of available data nodes"), + errhint("Increase the number of available data nodes on hypertable " + "\"%s\".", + get_rel_name(new_chunk->hypertable_relid))))); + + foreach (lc, chunk_data_nodes) + { + cdn = lfirst(lc); + serveroids = lappend_oid(serveroids, cdn->foreign_server_oid); + } + + foreach (lc, new_chunk->data_nodes) + { + cdn = lfirst(lc); + + /* + * check if this DN is a part of chunk_data_nodes. If not + * found in chunk_data_nodes, then we need to remove this + * chunk id to node name mapping and also update the primary + * foreign server if necessary. It's possible that this metadata + * might have been already cleared earlier in which case the + * data_nodes list for the chunk will be the same as the + * "serveroids" list and no unnecesary metadata update function + * calls will occur. + */ + if (!list_member_oid(serveroids, cdn->foreign_server_oid)) + { + chunk_update_foreign_server_if_needed(new_chunk, cdn->foreign_server_oid, false); + ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id, + NameStr(cdn->fd.node_name)); + + removeoids = lappend_oid(removeoids, cdn->foreign_server_oid); + } + } + + /* remove entries from new_chunk->data_nodes matching removeoids */ + foreach (lc, removeoids) + { + ListCell *l; + Oid serveroid = lfirst_oid(lc); + + /* this contrived code to ensure PG12+ compatible in-place list delete */ + foreach (l, new_chunk->data_nodes) + { + cdn = lfirst(l); + + if (cdn->foreign_server_oid == serveroid) + { + new_chunk->data_nodes = list_delete_ptr(new_chunk->data_nodes, cdn); + break; + } + } + } +} diff --git a/tsl/src/chunk.h b/tsl/src/chunk.h index 8efab4337..15d5a942e 100644 --- a/tsl/src/chunk.h +++ b/tsl/src/chunk.h @@ -20,5 +20,6 @@ extern Datum chunk_drop_stale_chunks(PG_FUNCTION_ARGS); extern void ts_chunk_drop_stale_chunks(const char *node_name, ArrayType *chunks_array); extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type); extern Datum chunk_create_replica_table(PG_FUNCTION_ARGS); +extern void chunk_update_stale_metadata(Chunk *new_chunk, List *chunk_data_nodes); #endif /* TIMESCALEDB_TSL_CHUNK_H */ diff --git a/tsl/src/chunk_api.c b/tsl/src/chunk_api.c index c8d8c060b..34ca99a0a 100644 --- a/tsl/src/chunk_api.c +++ b/tsl/src/chunk_api.c @@ -474,6 +474,14 @@ chunk_api_create_on_data_nodes(const Chunk *chunk, const Hypertable *ht, ListCell *lc; TupleDesc tupdesc; AttInMetadata *attinmeta; + + /* + * In case of "unavailable" datanodes, the chunk->data_nodes list is already pruned + * and doesn't contain "unavailable" datanodes. So this chunk creation will never + * happen on such "unavailable" datanodes. By the same logic, metadata update on the + * AN for the chunk->datanode mappings will only happen for the listed "live" DNs + * and not for the "unavailable" ones + */ List *target_data_nodes = data_nodes ? data_nodes : chunk->data_nodes; get_create_chunk_result_type(&tupdesc); diff --git a/tsl/src/fdw/modify_exec.c b/tsl/src/fdw/modify_exec.c index 1bed493c6..32956a0b4 100644 --- a/tsl/src/fdw/modify_exec.c +++ b/tsl/src/fdw/modify_exec.c @@ -26,6 +26,8 @@ #include "scan_plan.h" #include "modify_exec.h" +#include "modify_plan.h" +#include "tsl/src/chunk.h" /* * This enum describes what's kept in the fdw_private list for a ModifyTable @@ -79,7 +81,9 @@ typedef struct TsFdwModifyState AttrNumber ctid_attno; /* attnum of input resjunk ctid column */ bool prepared; - int num_data_nodes; + int num_data_nodes; /* number of "available" datanodes */ + int num_all_data_nodes; /* number of all datanodes assigned to this "rel" */ + List *stale_data_nodes; /* DNs marked stale for this chunk */ StmtParams *stmt_params; /* prepared statement paremeters */ TsFdwDataNodeState data_nodes[FLEXIBLE_ARRAY_MEMBER]; } TsFdwModifyState; @@ -110,11 +114,22 @@ create_foreign_modify(EState *estate, Relation rel, CmdType operation, Oid check ListCell *lc; Oid user_id = OidIsValid(check_as_user) ? check_as_user : GetUserId(); int i = 0; - int num_data_nodes = server_id_list == NIL ? 1 : list_length(server_id_list); + int num_data_nodes, num_all_data_nodes; + int32 hypertable_id = ts_chunk_get_hypertable_id_by_relid(rel->rd_id); + List *all_replicas = NIL, *avail_replicas = NIL; - /* Begin constructing TsFdwModifyState. */ - fmstate = (TsFdwModifyState *) palloc0(TS_FDW_MODIFY_STATE_SIZE(num_data_nodes)); - fmstate->rel = rel; + if (hypertable_id == INVALID_HYPERTABLE_ID) + { + num_data_nodes = num_all_data_nodes = 1; + } + else + { + int32 chunk_id = ts_chunk_get_id_by_relid(rel->rd_id); + + all_replicas = ts_chunk_data_node_scan_by_chunk_id(chunk_id, CurrentMemoryContext); + avail_replicas = ts_chunk_data_node_scan_by_chunk_id_filter(chunk_id, CurrentMemoryContext); + num_all_data_nodes = list_length(all_replicas); + } /* * Identify which user to do the remote access as. This should match what @@ -131,6 +146,8 @@ create_foreign_modify(EState *estate, Relation rel, CmdType operation, Oid check * in the FDW planning callback. */ + fmstate = + (TsFdwModifyState *) palloc0(TS_FDW_MODIFY_STATE_SIZE(list_length(server_id_list))); foreach (lc, server_id_list) { Oid server_id = lfirst_oid(lc); @@ -138,26 +155,53 @@ create_foreign_modify(EState *estate, Relation rel, CmdType operation, Oid check initialize_fdw_data_node_state(&fmstate->data_nodes[i++], id); } + num_data_nodes = list_length(server_id_list); + Assert(num_data_nodes == list_length(avail_replicas)); } else { /* * If there is no chunk insert state and no data nodes from planning, * this is an INSERT, UPDATE, or DELETE on a standalone foreign table. - * We must get the data node from the foreign table's metadata. + * + * If it's a regular foreign table then we must get the data node from + * the foreign table's metadata. + * + * Otherwise, we use the list of "available" DNs from earlier */ - ForeignTable *table = GetForeignTable(rel->rd_id); - TSConnectionId id = remote_connection_id(table->serverid, user_id); + if (hypertable_id == INVALID_HYPERTABLE_ID) + { + ForeignTable *table = GetForeignTable(rel->rd_id); + TSConnectionId id = remote_connection_id(table->serverid, user_id); - initialize_fdw_data_node_state(&fmstate->data_nodes[0], id); + Assert(num_data_nodes == 1 && num_all_data_nodes == 1); + fmstate = (TsFdwModifyState *) palloc0(TS_FDW_MODIFY_STATE_SIZE(num_data_nodes)); + initialize_fdw_data_node_state(&fmstate->data_nodes[0], id); + } + else + { + /* we use only the available replicas */ + fmstate = + (TsFdwModifyState *) palloc0(TS_FDW_MODIFY_STATE_SIZE(list_length(avail_replicas))); + foreach (lc, avail_replicas) + { + ChunkDataNode *node = lfirst(lc); + TSConnectionId id = remote_connection_id(node->foreign_server_oid, user_id); + + initialize_fdw_data_node_state(&fmstate->data_nodes[i++], id); + } + num_data_nodes = list_length(avail_replicas); + } } /* Set up remote query information. */ + fmstate->rel = rel; fmstate->query = query; fmstate->target_attrs = target_attrs; fmstate->has_returning = has_returning; fmstate->prepared = false; /* PREPARE will happen later */ fmstate->num_data_nodes = num_data_nodes; + fmstate->num_all_data_nodes = num_all_data_nodes; /* Prepare for input conversion of RETURNING results. */ if (fmstate->has_returning) @@ -393,6 +437,52 @@ response_type(AttConvInMetadata *att_conv_metadata) return att_conv_metadata == NULL || att_conv_metadata->binary ? FORMAT_BINARY : FORMAT_TEXT; } +static void +fdw_chunk_update_stale_metadata(TsFdwModifyState *fmstate) +{ + List *all_data_nodes; + Relation rel = fmstate->rel; + + if (fmstate->num_all_data_nodes == fmstate->num_data_nodes) + return; + + if (fmstate->num_all_data_nodes > fmstate->num_data_nodes) + { + Chunk *chunk = ts_chunk_get_by_relid(rel->rd_id, true); + /* get filtered list */ + List *serveroids = get_chunk_data_nodes(rel->rd_id); + ListCell *lc; + Assert(list_length(serveroids) == fmstate->num_data_nodes); + + all_data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk->fd.id, CurrentMemoryContext); + Assert(list_length(all_data_nodes) == fmstate->num_all_data_nodes); + + foreach (lc, all_data_nodes) + { + ChunkDataNode *cdn = lfirst(lc); + /* + * check if this DN is a part of serveroids. If not + * found in serveroids, then we need to remove this + * chunk id to node name mapping and also update the primary + * foreign server if necessary. It's possible that this metadata + * might have been already cleared earlier but we have no way of + * knowing that here. + */ + if (!list_member_oid(serveroids, cdn->foreign_server_oid) && + !list_member_oid(fmstate->stale_data_nodes, cdn->foreign_server_oid)) + { + chunk_update_foreign_server_if_needed(chunk, cdn->foreign_server_oid, false); + ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id, + NameStr(cdn->fd.node_name)); + + /* append this DN serveroid to the list of DNs marked stale for this chunk */ + fmstate->stale_data_nodes = + lappend_oid(fmstate->stale_data_nodes, cdn->foreign_server_oid); + } + } + } +} + TupleTableSlot * fdw_exec_foreign_insert(TsFdwModifyState *fmstate, EState *estate, TupleTableSlot *slot, TupleTableSlot *planslot) @@ -458,6 +548,14 @@ fdw_exec_foreign_insert(TsFdwModifyState *fmstate, EState *estate, TupleTableSlo */ pfree(reqset); + /* + * If rows are affected on DNs and a DN was excluded because of being + * "unavailable" then we need to update metadata on the AN to mark + * this chunk as "stale" for that "unavailable" DN + */ + if (n_rows > 0 && fmstate->num_all_data_nodes > fmstate->num_data_nodes) + fdw_chunk_update_stale_metadata(fmstate); + /* Return NULL if nothing was inserted on the remote end */ return (n_rows > 0) ? slot : NULL; } @@ -544,6 +642,14 @@ fdw_exec_foreign_update_or_delete(TsFdwModifyState *fmstate, EState *estate, Tup pfree(reqset); stmt_params_reset(params); + /* + * If rows are affected on DNs and a DN was excluded because of being + * "unavailable" then we need to update metadata on the AN to mark + * this chunk as "stale" for that "unavailable" DN + */ + if (n_rows > 0 && fmstate->num_all_data_nodes > fmstate->num_data_nodes) + fdw_chunk_update_stale_metadata(fmstate); + /* Return NULL if nothing was updated on the remote end */ return (n_rows > 0) ? slot : NULL; } diff --git a/tsl/src/fdw/modify_plan.c b/tsl/src/fdw/modify_plan.c index 08a474f83..5e8f6d4b5 100644 --- a/tsl/src/fdw/modify_plan.c +++ b/tsl/src/fdw/modify_plan.c @@ -10,6 +10,7 @@ #include #include "deparse.h" +#include "errors.h" #include "modify_plan.h" #include "ts_catalog/chunk_data_node.h" @@ -51,16 +52,30 @@ get_update_attrs(RangeTblEntry *rte) return attrs; } -static List * +/* get a list of "live" DNs associated with this chunk */ +List * get_chunk_data_nodes(Oid relid) { int32 chunk_id = ts_chunk_get_id_by_relid(relid); Assert(chunk_id != 0); - List *chunk_data_nodes = ts_chunk_data_node_scan_by_chunk_id(chunk_id, CurrentMemoryContext); + List *chunk_data_nodes = + ts_chunk_data_node_scan_by_chunk_id_filter(chunk_id, CurrentMemoryContext); List *serveroids = NIL; ListCell *lc; + /* check that alteast one data node is available for this chunk */ + if (chunk_data_nodes == NIL) + { + Hypertable *ht = ts_hypertable_get_by_id(ts_chunk_get_hypertable_id_by_relid(relid)); + + ereport(ERROR, + (errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES), + (errmsg("insufficient number of available data nodes"), + errhint("Increase the number of available data nodes on hypertable \"%s\".", + get_rel_name(ht->main_table_relid))))); + } + foreach (lc, chunk_data_nodes) { ChunkDataNode *data_node = lfirst(lc); diff --git a/tsl/src/fdw/modify_plan.h b/tsl/src/fdw/modify_plan.h index 87b17ddb2..c53506420 100644 --- a/tsl/src/fdw/modify_plan.h +++ b/tsl/src/fdw/modify_plan.h @@ -10,5 +10,6 @@ extern List *fdw_plan_foreign_modify(PlannerInfo *root, ModifyTable *plan, Index result_relation, int subplan_index); +extern List *get_chunk_data_nodes(Oid relid); #endif /* TIMESCALEDB_TSL_FDW_MODIFY_PLAN_H */ diff --git a/tsl/src/fdw/scan_exec.c b/tsl/src/fdw/scan_exec.c index aeeb75240..78b74e404 100644 --- a/tsl/src/fdw/scan_exec.c +++ b/tsl/src/fdw/scan_exec.c @@ -281,18 +281,23 @@ fdw_scan_init(ScanState *ss, TsFdwScanState *fsstate, Bitmapset *scanrelids, Lis List *fdw_exprs, int eflags) { int num_params; + Oid server_oid; + ForeignServer *server; if ((eflags & EXEC_FLAG_EXPLAIN_ONLY) && !ts_guc_enable_remote_explain) return; + /* Check if the server is "available" for use before setting up a connection to it */ + server_oid = intVal(list_nth(fdw_private, FdwScanPrivateServerId)); + server = GetForeignServer(server_oid); + if (!ts_data_node_is_available_by_server(server)) + ereport(ERROR, (errmsg("data node \"%s\" is not available", server->servername))); + /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = get_connection(ss, - intVal(list_nth(fdw_private, FdwScanPrivateServerId)), - scanrelids, - fdw_exprs); + fsstate->conn = get_connection(ss, server_oid, scanrelids, fdw_exprs); /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); diff --git a/tsl/src/init.c b/tsl/src/init.c index 1bd7f1625..655e3b67e 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -224,6 +224,7 @@ CrossModuleFunctions tsl_cm_functions = { .dist_remote_chunk_info = dist_util_remote_chunk_info, .dist_remote_compressed_chunk_info = dist_util_remote_compressed_chunk_info, .dist_remote_hypertable_index_info = dist_util_remote_hypertable_index_info, + .dist_update_stale_chunk_metadata = chunk_update_stale_metadata, .validate_as_data_node = validate_data_node_settings, .distributed_exec = ts_dist_cmd_exec, .create_distributed_restore_point = create_distributed_restore_point, diff --git a/tsl/src/remote/dist_copy.c b/tsl/src/remote/dist_copy.c index 52077bbef..40c3c9459 100644 --- a/tsl/src/remote/dist_copy.c +++ b/tsl/src/remote/dist_copy.c @@ -1345,6 +1345,7 @@ remote_copy_process_and_send_data(RemoteCopyContext *context) for (int row_in_batch = 0; row_in_batch < n; row_in_batch++) { Point *point = context->batch_points[row_in_batch]; + bool found; Chunk *chunk = ts_hypertable_find_chunk_for_point(ht, point); if (chunk == NULL) @@ -1360,7 +1361,33 @@ remote_copy_process_and_send_data(RemoteCopyContext *context) end_copy_on_success(&context->connection_state); did_end_copy = true; } - chunk = ts_hypertable_create_chunk_for_point(ht, point); + chunk = ts_hypertable_create_chunk_for_point(ht, point, &found); + } + else + found = true; + + /* get the filtered list of "available" DNs for this chunk but only if it's replicated */ + if (found && ht->fd.replication_factor > 1) + { + List *chunk_data_nodes = + ts_chunk_data_node_scan_by_chunk_id_filter(chunk->fd.id, CurrentMemoryContext); + + /* + * If the chunk was not created as part of this insert, we need to check whether any + * of the chunk's data nodes are currently unavailable and in that case consider the + * chunk stale on those data nodes. Do that by removing the AN's chunk-datanode + * mapping for the unavailable data nodes. + * + * Note that the metadata will only get updated once since we assign the chunk's + * data_node list to the list of available DNs the first time this + * dist_update_stale_chunk_metadata API gets called. So both chunk_data_nodes and + * chunk->data_nodes will point to the same list and no subsequent metadata updates will + * occur. + */ + if (ht->fd.replication_factor > list_length(chunk_data_nodes)) + ts_cm_functions->dist_update_stale_chunk_metadata(chunk, chunk_data_nodes); + + list_free(chunk_data_nodes); } /* diff --git a/tsl/src/remote/dist_ddl.c b/tsl/src/remote/dist_ddl.c index 8ee799387..50ee22234 100644 --- a/tsl/src/remote/dist_ddl.c +++ b/tsl/src/remote/dist_ddl.c @@ -238,7 +238,19 @@ dist_ddl_state_add_data_node_list_from_table(const char *schema, const char *nam static void dist_ddl_state_add_data_node_list_from_ht(Hypertable *ht) { + ListCell *lc; + dist_ddl_state.data_node_list = ts_hypertable_get_data_node_name_list(ht); + + /* Check that all DNs are "available" for this DDL operation, fail otherwise */ + foreach (lc, dist_ddl_state.data_node_list) + { + const char *data_node_name = lfirst(lc); + ForeignServer *server = GetForeignServerByName(data_node_name, false); + + if (!ts_data_node_is_available_by_server(server)) + ereport(ERROR, (errmsg("some data nodes are not available for DDL commands"))); + } } static void diff --git a/tsl/test/expected/data_node.out b/tsl/test/expected/data_node.out index 8b1708b18..d6c5670f1 100644 --- a/tsl/test/expected/data_node.out +++ b/tsl/test/expected/data_node.out @@ -1769,7 +1769,7 @@ SELECT * FROM chunk_query_data_node; -- replication \set ON_ERROR_STOP 0 SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; -ERROR: could not connect to "data_node_1" +ERROR: data node "data_node_1" is not available \set ON_ERROR_STOP 1 SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; time | location @@ -1789,19 +1789,163 @@ SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; Sat Jan 01 00:00:00 2022 PST | 1 (1 row) --- inserts should fail if going to chunks that exist on the --- unavailable data node -\set ON_ERROR_STOP 0 +-- inserts should continue to work and should go to the "live" +-- datanodes INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1); -ERROR: could not connect to "data_node_1" +INSERT INTO hyper3 VALUES ('2022-01-03 00:00:05', 1, 1); INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1); -ERROR: could not connect to "data_node_1" -\set ON_ERROR_STOP 1 --- inserts should work if going to a new chunk +INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:05', 1, 1); +-- Check that the metadata on the AN removes the association with +-- the "unavailable" DN for existing chunks that are being written into +-- above +SELECT * FROM chunk_query_data_node WHERE hypertable_name IN ('hyper3', 'hyper_1dim'); + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 +(6 rows) + +-- Also, inserts should work if going to a new chunk INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); WARNING: insufficient number of data nodes +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:05', 1, 1); INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); WARNING: insufficient number of data nodes +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:05', 1, 1); +-- Also check that new chunks only use the "available" DNs +SELECT * FROM chunk_query_data_node WHERE hypertable_name IN ('hyper3', 'hyper_1dim'); + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_24_chunk | {data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_25_chunk | {data_node_2,data_node_3} | data_node_2 +(8 rows) + +-- Updates/Deletes should also work +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +SELECT * FROM hyper3 WHERE time = '2022-01-03 00:00:00'; + time | location | temp +------------------------------+----------+------ + Mon Jan 03 00:00:00 2022 PST | 1 | 10 + Mon Jan 03 00:00:00 2022 PST | 2 | 10 +(2 rows) + +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:05'; +SELECT * FROM hyper3 WHERE time = '2022-01-03 00:00:05'; + time | location | temp +------------------------------+----------+------ + Mon Jan 03 00:00:05 2022 PST | 1 | 10 +(1 row) + +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +SELECT * FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; + time | location | temp +------------------------------+----------+------ + Mon Jan 03 00:00:00 2022 PST | 2 | 10 + Mon Jan 03 00:00:00 2022 PST | 1 | 10 +(2 rows) + +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:05'; +SELECT * FROM hyper_1dim WHERE time = '2022-01-03 00:00:05'; + time | location | temp +------------------------------+----------+------ + Mon Jan 03 00:00:05 2022 PST | 1 | 10 +(1 row) + +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:05'; +SELECT * FROM hyper3 WHERE time = '2022-01-03 00:00:00'; + time | location | temp +------+----------+------ +(0 rows) + +SELECT * FROM hyper3 WHERE time = '2022-01-03 00:00:05'; + time | location | temp +------+----------+------ +(0 rows) + +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:05'; +SELECT * FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; + time | location | temp +------+----------+------ +(0 rows) + +SELECT * FROM hyper_1dim WHERE time = '2022-01-03 00:00:05'; + time | location | temp +------+----------+------ +(0 rows) + +-- Inserts directly into chunks using FDW should also work and should go to the +-- available DNs appropriately +INSERT INTO _timescaledb_internal._dist_hyper_12_24_chunk VALUES ('2022-01-11 00:00:00', 1, 1); +INSERT INTO _timescaledb_internal._dist_hyper_13_25_chunk VALUES ('2022-01-11 00:00:00', 1, 1); +SELECT * FROM test.remote_exec(ARRAY['data_node_2', 'data_node_3'], $$ SELECT * FROM _timescaledb_internal._dist_hyper_12_24_chunk WHERE time = '2022-01-11 00:00:00'; $$); +NOTICE: [data_node_2]: SELECT * FROM _timescaledb_internal._dist_hyper_12_24_chunk WHERE time = '2022-01-11 00:00:00' +NOTICE: [data_node_2]: +time |location|temp +----------------------------+--------+---- +Tue Jan 11 00:00:00 2022 PST| 1| 1 +(1 row) + + +NOTICE: [data_node_3]: SELECT * FROM _timescaledb_internal._dist_hyper_12_24_chunk WHERE time = '2022-01-11 00:00:00' +NOTICE: [data_node_3]: +time |location|temp +----------------------------+--------+---- +Tue Jan 11 00:00:00 2022 PST| 1| 1 +(1 row) + + + remote_exec +------------- + +(1 row) + +SELECT * FROM test.remote_exec(ARRAY['data_node_2', 'data_node_3'], $$ SELECT * FROM _timescaledb_internal._dist_hyper_13_25_chunk WHERE time = '2022-01-11 00:00:00'; $$); +NOTICE: [data_node_2]: SELECT * FROM _timescaledb_internal._dist_hyper_13_25_chunk WHERE time = '2022-01-11 00:00:00' +NOTICE: [data_node_2]: +time |location|temp +----------------------------+--------+---- +Tue Jan 11 00:00:00 2022 PST| 1| 1 +(1 row) + + +NOTICE: [data_node_3]: SELECT * FROM _timescaledb_internal._dist_hyper_13_25_chunk WHERE time = '2022-01-11 00:00:00' +NOTICE: [data_node_3]: +time |location|temp +----------------------------+--------+---- +Tue Jan 11 00:00:00 2022 PST| 1| 1 +(1 row) + + + remote_exec +------------- + +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name IN ('hyper3', 'hyper_1dim'); + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_24_chunk | {data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_25_chunk | {data_node_2,data_node_3} | data_node_2 +(8 rows) + SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks WHERE hypertable_name IN ('hyper3', 'hyper_1dim') AND range_start::timestamptz <= '2022-01-10 00:00:00' @@ -1813,17 +1957,80 @@ ORDER BY 1, 2; hyper_1dim | _dist_hyper_13_25_chunk | {data_node_2,data_node_3} (2 rows) +-- DDL should error out even if one DN is unavailable +\set ON_ERROR_STOP 0 +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ERROR: some data nodes are not available for DDL commands +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; +ERROR: some data nodes are not available for DDL commands +\set ON_ERROR_STOP 1 +-- Mark all DNs unavailable. Metadata should still retain last DN but all +-- activity should fail +SELECT * FROM alter_data_node('data_node_2', available=>false); +WARNING: could not switch data node on 2 chunks + node_name | host | port | database | available +-------------+-----------+-------+----------------+----------- + data_node_2 | localhost | 55432 | db_data_node_2 | f +(1 row) + +SELECT * FROM alter_data_node('data_node_3', available=>false); +WARNING: could not switch data node on 11 chunks + node_name | host | port | database | available +-------------+-----------+-------+----------------+----------- + data_node_3 | localhost | 55432 | db_data_node_3 | f +(1 row) + +\set ON_ERROR_STOP 0 +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +ERROR: insufficient number of available data nodes +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); +ERROR: insufficient number of available data nodes +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +ERROR: insufficient number of available data nodes +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +ERROR: insufficient number of available data nodes +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +ERROR: insufficient number of available data nodes +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +ERROR: insufficient number of available data nodes +SELECT count(*) FROM hyper3; +ERROR: data node "data_node_3" is not available +SELECT count(*) FROM hyper_1dim; +ERROR: data node "data_node_3" is not available +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ERROR: some data nodes are not available for DDL commands +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; +ERROR: some data nodes are not available for DDL commands +\set ON_ERROR_STOP 1 -- re-enable the data node and the chunks should "switch back" to -- using the data node. However, only the chunks for which the node is -- "primary" should switch to using the data node for queries ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1; WARNING: you need to manually restart any running background workers after this command SELECT * FROM alter_data_node('data_node_1', available=>true); +WARNING: insufficient number of data nodes +WARNING: insufficient number of data nodes node_name | host | port | database | available -------------+-----------+-------+----------------+----------- data_node_1 | localhost | 55432 | db_data_node_1 | t (1 row) +SELECT * FROM alter_data_node('data_node_2', available=>true); +WARNING: insufficient number of data nodes +WARNING: insufficient number of data nodes +WARNING: insufficient number of data nodes +WARNING: insufficient number of data nodes + node_name | host | port | database | available +-------------+-----------+-------+----------------+----------- + data_node_2 | localhost | 55432 | db_data_node_2 | t +(1 row) + +SELECT * FROM alter_data_node('data_node_3', available=>true); + node_name | host | port | database | available +-------------+-----------+-------+----------------+----------- + data_node_3 | localhost | 55432 | db_data_node_3 | t +(1 row) + SELECT * FROM chunk_query_data_node; hypertable_name | chunk | data_nodes | default_data_node -----------------+-----------------------------------------------+---------------------------------------+------------------- @@ -1833,14 +2040,14 @@ SELECT * FROM chunk_query_data_node; hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_1 hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2 hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3 - hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 - hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_2,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_2,data_node_3} | data_node_2 hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 - hyper3 | _timescaledb_internal._dist_hyper_12_24_chunk | {data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_24_chunk | {data_node_2,data_node_3} | data_node_3 hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 - hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 - hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 - hyper_1dim | _timescaledb_internal._dist_hyper_13_25_chunk | {data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_25_chunk | {data_node_2,data_node_3} | data_node_3 (14 rows) --queries should work again on all tables @@ -1868,6 +2075,9 @@ SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; Sat Jan 01 00:00:00 2022 PST | 1 (1 row) +-- DDL should also work again +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; -- save old port so that we can restore connectivity after we test -- changing the connection information for the data node WITH options AS ( @@ -1878,12 +2088,12 @@ WITH options AS ( SELECT split_part(opt, '=', 2) AS old_port FROM options WHERE opt LIKE 'port%' \gset -- also test altering host, port and database -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; node_name | options -------------+------------------------------------------------------------------ - data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} - data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2,available=true} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3,available=true} (3 rows) SELECT * FROM alter_data_node('data_node_1', available=>true, host=>'foo.bar', port=>8989, database=>'new_db'); @@ -1892,12 +2102,12 @@ SELECT * FROM alter_data_node('data_node_1', available=>true, host=>'foo.bar', p data_node_1 | foo.bar | 8989 | new_db | t (1 row) -SELECT node_name, options FROM timescaledb_information.data_nodes; - node_name | options --------------+------------------------------------------------------- +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; + node_name | options +-------------+------------------------------------------------------------------ data_node_1 | {host=foo.bar,port=8989,dbname=new_db,available=true} - data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} - data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2,available=true} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3,available=true} (3 rows) -- just show current options: @@ -1926,12 +2136,12 @@ SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, data_node_1 | localhost | 55432 | db_data_node_1 | t (1 row) -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; node_name | options -------------+------------------------------------------------------------------ data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true} - data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} - data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2,available=true} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3,available=true} (3 rows) DROP TABLE hyper1; diff --git a/tsl/test/expected/dist_copy_available_dns.out b/tsl/test/expected/dist_copy_available_dns.out new file mode 100644 index 000000000..6720e64e0 --- /dev/null +++ b/tsl/test/expected/dist_copy_available_dns.out @@ -0,0 +1,343 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- Test distributed COPY with a bigger data set to help find rare effects. +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +\set DN_DBNAME_1 :TEST_DBNAME _1 +\set DN_DBNAME_2 :TEST_DBNAME _2 +\set DN_DBNAME_3 :TEST_DBNAME _3 +SELECT 1 FROM add_data_node('data_node_1', host => 'localhost', + database => :'DN_DBNAME_1'); + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM add_data_node('data_node_2', host => 'localhost', + database => :'DN_DBNAME_2'); + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM add_data_node('data_node_3', host => 'localhost', + database => :'DN_DBNAME_3'); + ?column? +---------- + 1 +(1 row) + +GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO PUBLIC; +-- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes +GRANT CREATE ON SCHEMA public TO :ROLE_1; +SET ROLE :ROLE_1; +CREATE VIEW chunk_query_data_node AS +SELECT ch.hypertable_name, format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass AS chunk, ch.data_nodes, fs.srvname default_data_node + FROM timescaledb_information.chunks ch + INNER JOIN pg_foreign_table ft ON (format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass = ft.ftrelid) + INNER JOIN pg_foreign_server fs ON (ft.ftserver = fs.oid) + ORDER BY 1 DESC, 2 DESC; +create table uk_price_paid(price integer, "date" date, postcode1 text, postcode2 text, type smallint, is_new bool, duration smallint, addr1 text, addr2 text, street text, locality text, town text, district text, country text, category smallint); +-- Aim to about 100 partitions, the data is from 1995 to 2022. +select create_distributed_hypertable('uk_price_paid', 'date', chunk_time_interval => interval '270 day', replication_factor=>3); +NOTICE: adding not-null constraint to column "date" + create_distributed_hypertable +------------------------------- + (1,public,uk_price_paid,t) +(1 row) + +create table uk_price_paid_space2(like uk_price_paid); +select create_distributed_hypertable('uk_price_paid_space2', 'date', 'postcode2', 2, chunk_time_interval => interval '270 day', replication_factor => 2); +WARNING: insufficient number of partitions for dimension "postcode2" + create_distributed_hypertable +----------------------------------- + (2,public,uk_price_paid_space2,t) +(1 row) + +\copy uk_price_paid_space2 from program 'zcat < data/prices-10k-random-1.tsv.gz'; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid_space2' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +----------------------+----------------------------------------------+---------------------------+------------------- + uk_price_paid_space2 | _timescaledb_internal._dist_hyper_2_76_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid_space2 | _timescaledb_internal._dist_hyper_2_75_chunk | {data_node_1,data_node_2} | data_node_1 + uk_price_paid_space2 | _timescaledb_internal._dist_hyper_2_74_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid_space2 | _timescaledb_internal._dist_hyper_2_73_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid_space2 | _timescaledb_internal._dist_hyper_2_72_chunk | {data_node_1,data_node_2} | data_node_1 +(5 rows) + +set timescaledb.max_open_chunks_per_insert = 1; +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz'; +select count(*) from uk_price_paid; + count +------- + 10000 +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + uk_price_paid | _timescaledb_internal._dist_hyper_1_114_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_113_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_112_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_111_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_110_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 +(5 rows) + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_1', available=>false); + node_name | host | port | database | available +-------------+-----------+-------+------------------------------+----------- + data_node_1 | localhost | 55432 | db_dist_copy_available_dns_1 | f +(1 row) + +SET ROLE :ROLE_1; +set timescaledb.max_open_chunks_per_insert = 2; +-- we will write to the same set of chunks and update AN metadata for down DN +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz'; +select count(*) from uk_price_paid; + count +------- + 20000 +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------+------------------- + uk_price_paid | _timescaledb_internal._dist_hyper_1_114_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_113_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_112_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_111_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_110_chunk | {data_node_2,data_node_3} | data_node_2 +(5 rows) + +set timescaledb.max_open_chunks_per_insert = 1117; +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_1', available=>true); + node_name | host | port | database | available +-------------+-----------+-------+------------------------------+----------- + data_node_1 | localhost | 55432 | db_dist_copy_available_dns_1 | t +(1 row) + +SET ROLE :ROLE_1; +TRUNCATE uk_price_paid; +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz'; +select count(*) from uk_price_paid; + count +------- + 10000 +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + uk_price_paid | _timescaledb_internal._dist_hyper_1_152_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_151_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_150_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_149_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_148_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 +(5 rows) + +select hypertable_name, replication_factor from timescaledb_information.hypertables +where hypertable_name like 'uk_price_paid%' order by hypertable_name; + hypertable_name | replication_factor +----------------------+-------------------- + uk_price_paid | 3 + uk_price_paid_space2 | 2 +(2 rows) + +-- 0, 1, 2 rows +\copy uk_price_paid from stdin +select count(*) from uk_price_paid; + count +------- + 10000 +(1 row) + +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz | head -1'; +select count(*) from uk_price_paid; + count +------- + 10001 +(1 row) + +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz | head -2'; +select count(*) from uk_price_paid; + count +------- + 10003 +(1 row) + +select count(*), sum(price), sum(price) / count(*) from uk_price_paid; + count | sum | ?column? +-------+------------+---------- + 10003 | 2055811013 | 205519 +(1 row) + +-- Make binary file. +\copy (select * from uk_price_paid) to 'prices-10k.pgbinary' with (format binary); +-- Binary input with binary data transfer. +set timescaledb.enable_connection_binary_data = true; +set timescaledb.dist_copy_transfer_format = 'binary'; +create table uk_price_paid_bin(like uk_price_paid); +select create_distributed_hypertable('uk_price_paid_bin', 'date', 'postcode2', + chunk_time_interval => interval '90 day', replication_factor => 3); + create_distributed_hypertable +-------------------------------- + (3,public,uk_price_paid_bin,t) +(1 row) + +\copy uk_price_paid_bin from 'prices-10k.pgbinary' with (format binary); +select count(*), sum(price), sum(price) / count(*) from uk_price_paid_bin; + count | sum | ?column? +-------+------------+---------- + 10003 | 2055811013 | 205519 +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid_bin' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +-------------------+-----------------------------------------------+---------------------------------------+------------------- + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_485_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_484_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_483_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_482_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_481_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 +(5 rows) + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_1', available=>false); + node_name | host | port | database | available +-------------+-----------+-------+------------------------------+----------- + data_node_1 | localhost | 55432 | db_dist_copy_available_dns_1 | f +(1 row) + +SET ROLE :ROLE_1; +-- Text input with explicit format option and binary data transfer. This will +-- update AN metadata for the down DN +\copy uk_price_paid_bin from program 'zcat < data/prices-10k-random-1.tsv.gz' with (format text); +select count(*), sum(price), sum(price) / count(*) from uk_price_paid_bin; + count | sum | ?column? +-------+------------+---------- + 20003 | 4111499026 | 205544 +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid_bin' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +-------------------+-----------------------------------------------+---------------------------+------------------- + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_485_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_484_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_483_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_482_chunk | {data_node_2,data_node_3} | data_node_3 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_481_chunk | {data_node_2,data_node_3} | data_node_2 +(5 rows) + +-- Text input with text data transfer. +\copy uk_price_paid_bin from program 'zcat < data/prices-10k-random-1.tsv.gz'; +select count(*), sum(price), sum(price) / count(*) from uk_price_paid_bin; + count | sum | ?column? +-------+------------+---------- + 30003 | 6167187039 | 205552 +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid_bin' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +-------------------+-----------------------------------------------+---------------------------+------------------- + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_485_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_484_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_483_chunk | {data_node_2,data_node_3} | data_node_2 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_482_chunk | {data_node_2,data_node_3} | data_node_3 + uk_price_paid_bin | _timescaledb_internal._dist_hyper_3_481_chunk | {data_node_2,data_node_3} | data_node_2 +(5 rows) + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_1', available=>true); + node_name | host | port | database | available +-------------+-----------+-------+------------------------------+----------- + data_node_1 | localhost | 55432 | db_dist_copy_available_dns_1 | t +(1 row) + +SET ROLE :ROLE_1; +TRUNCATE uk_price_paid; +SET timescaledb.enable_distributed_insert_with_copy=false; +INSERT INTO uk_price_paid SELECT * FROM uk_price_paid_bin; +select count(*), sum(price), sum(price) / count(*) from uk_price_paid; + count | sum | ?column? +-------+------------+---------- + 30003 | 6167187039 | 205552 +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + uk_price_paid | _timescaledb_internal._dist_hyper_1_523_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_522_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + uk_price_paid | _timescaledb_internal._dist_hyper_1_521_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_520_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_519_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 +(5 rows) + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_2', available=>false); + node_name | host | port | database | available +-------------+-----------+-------+------------------------------+----------- + data_node_2 | localhost | 55432 | db_dist_copy_available_dns_2 | f +(1 row) + +SET ROLE :ROLE_1; +INSERT INTO uk_price_paid SELECT * FROM uk_price_paid_bin; +select count(*), sum(price), sum(price) / count(*) from uk_price_paid; + count | sum | ?column? +-------+-------------+---------- + 60006 | 12334374078 | 205552 +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------+------------------- + uk_price_paid | _timescaledb_internal._dist_hyper_1_523_chunk | {data_node_1,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_522_chunk | {data_node_1,data_node_3} | data_node_3 + uk_price_paid | _timescaledb_internal._dist_hyper_1_521_chunk | {data_node_1,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_520_chunk | {data_node_1,data_node_3} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_519_chunk | {data_node_1,data_node_3} | data_node_1 +(5 rows) + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_2', available=>true); + node_name | host | port | database | available +-------------+-----------+-------+------------------------------+----------- + data_node_2 | localhost | 55432 | db_dist_copy_available_dns_2 | t +(1 row) + +SET ROLE :ROLE_1; +truncate uk_price_paid; +SET timescaledb.enable_distributed_insert_with_copy=true; +INSERT INTO uk_price_paid SELECT * FROM uk_price_paid_bin; +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_3', available=>false); + node_name | host | port | database | available +-------------+-----------+-------+------------------------------+----------- + data_node_3 | localhost | 55432 | db_dist_copy_available_dns_3 | f +(1 row) + +SET ROLE :ROLE_1; +INSERT INTO uk_price_paid SELECT * FROM uk_price_paid_bin; +select count(*), sum(price), sum(price) / count(*) from uk_price_paid; + count | sum | ?column? +-------+-------------+---------- + 60006 | 12334374078 | 205552 +(1 row) + +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------+------------------- + uk_price_paid | _timescaledb_internal._dist_hyper_1_561_chunk | {data_node_1,data_node_2} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_560_chunk | {data_node_1,data_node_2} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_559_chunk | {data_node_1,data_node_2} | data_node_1 + uk_price_paid | _timescaledb_internal._dist_hyper_1_558_chunk | {data_node_1,data_node_2} | data_node_2 + uk_price_paid | _timescaledb_internal._dist_hyper_1_557_chunk | {data_node_1,data_node_2} | data_node_2 +(5 rows) + +-- Teardown +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +DROP DATABASE :DN_DBNAME_1; +DROP DATABASE :DN_DBNAME_2; +DROP DATABASE :DN_DBNAME_3; diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 6dd63edc0..186e7eb20 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -81,6 +81,7 @@ if(CMAKE_BUILD_TYPE MATCHES Debug) dist_api_calls.sql dist_commands.sql dist_compression.sql + dist_copy_available_dns.sql dist_copy_format_long.sql dist_copy_long.sql dist_ddl.sql diff --git a/tsl/test/sql/data_node.sql b/tsl/test/sql/data_node.sql index f66260b80..44b6bea12 100644 --- a/tsl/test/sql/data_node.sql +++ b/tsl/test/sql/data_node.sql @@ -865,29 +865,89 @@ SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; - --- inserts should fail if going to chunks that exist on the --- unavailable data node -\set ON_ERROR_STOP 0 +-- inserts should continue to work and should go to the "live" +-- datanodes INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1); +INSERT INTO hyper3 VALUES ('2022-01-03 00:00:05', 1, 1); INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1); -\set ON_ERROR_STOP 1 +INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:05', 1, 1); --- inserts should work if going to a new chunk +-- Check that the metadata on the AN removes the association with +-- the "unavailable" DN for existing chunks that are being written into +-- above +SELECT * FROM chunk_query_data_node WHERE hypertable_name IN ('hyper3', 'hyper_1dim'); + +-- Also, inserts should work if going to a new chunk INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:05', 1, 1); INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:05', 1, 1); +-- Also check that new chunks only use the "available" DNs +SELECT * FROM chunk_query_data_node WHERE hypertable_name IN ('hyper3', 'hyper_1dim'); + +-- Updates/Deletes should also work +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +SELECT * FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:05'; +SELECT * FROM hyper3 WHERE time = '2022-01-03 00:00:05'; +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +SELECT * FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:05'; +SELECT * FROM hyper_1dim WHERE time = '2022-01-03 00:00:05'; +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:05'; +SELECT * FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +SELECT * FROM hyper3 WHERE time = '2022-01-03 00:00:05'; +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:05'; +SELECT * FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +SELECT * FROM hyper_1dim WHERE time = '2022-01-03 00:00:05'; + +-- Inserts directly into chunks using FDW should also work and should go to the +-- available DNs appropriately +INSERT INTO _timescaledb_internal._dist_hyper_12_24_chunk VALUES ('2022-01-11 00:00:00', 1, 1); +INSERT INTO _timescaledb_internal._dist_hyper_13_25_chunk VALUES ('2022-01-11 00:00:00', 1, 1); +SELECT * FROM test.remote_exec(ARRAY['data_node_2', 'data_node_3'], $$ SELECT * FROM _timescaledb_internal._dist_hyper_12_24_chunk WHERE time = '2022-01-11 00:00:00'; $$); +SELECT * FROM test.remote_exec(ARRAY['data_node_2', 'data_node_3'], $$ SELECT * FROM _timescaledb_internal._dist_hyper_13_25_chunk WHERE time = '2022-01-11 00:00:00'; $$); + +SELECT * FROM chunk_query_data_node WHERE hypertable_name IN ('hyper3', 'hyper_1dim'); SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks WHERE hypertable_name IN ('hyper3', 'hyper_1dim') AND range_start::timestamptz <= '2022-01-10 00:00:00' AND range_end::timestamptz > '2022-01-10 00:00:00' ORDER BY 1, 2; +-- DDL should error out even if one DN is unavailable +\set ON_ERROR_STOP 0 +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; +\set ON_ERROR_STOP 1 + +-- Mark all DNs unavailable. Metadata should still retain last DN but all +-- activity should fail +SELECT * FROM alter_data_node('data_node_2', available=>false); +SELECT * FROM alter_data_node('data_node_3', available=>false); +\set ON_ERROR_STOP 0 +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); +UPDATE hyper3 SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +UPDATE hyper_1dim SET temp = 10 WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper3 WHERE time = '2022-01-03 00:00:00'; +DELETE FROM hyper_1dim WHERE time = '2022-01-03 00:00:00'; +SELECT count(*) FROM hyper3; +SELECT count(*) FROM hyper_1dim; +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; +\set ON_ERROR_STOP 1 + -- re-enable the data node and the chunks should "switch back" to -- using the data node. However, only the chunks for which the node is -- "primary" should switch to using the data node for queries ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1; SELECT * FROM alter_data_node('data_node_1', available=>true); +SELECT * FROM alter_data_node('data_node_2', available=>true); +SELECT * FROM alter_data_node('data_node_3', available=>true); SELECT * FROM chunk_query_data_node; --queries should work again on all tables @@ -896,6 +956,10 @@ SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; +-- DDL should also work again +ALTER TABLE hyper3 ADD COLUMN temp2 int; +ALTER TABLE hyper_1dim ADD COLUMN temp2 int; + -- save old port so that we can restore connectivity after we test -- changing the connection information for the data node WITH options AS ( @@ -907,10 +971,10 @@ SELECT split_part(opt, '=', 2) AS old_port FROM options WHERE opt LIKE 'port%' \gset -- also test altering host, port and database -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; SELECT * FROM alter_data_node('data_node_1', available=>true, host=>'foo.bar', port=>8989, database=>'new_db'); -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; -- just show current options: SELECT * FROM alter_data_node('data_node_1'); @@ -925,7 +989,7 @@ SELECT delete_data_node('data_node_1', drop_database=>true); -- restore configuration for data_node_1 SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, database=>:'DN_DBNAME_1'); -SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT node_name, options FROM timescaledb_information.data_nodes order by node_name; DROP TABLE hyper1; DROP TABLE hyper2; diff --git a/tsl/test/sql/dist_copy_available_dns.sql b/tsl/test/sql/dist_copy_available_dns.sql new file mode 100644 index 000000000..6c1722a1c --- /dev/null +++ b/tsl/test/sql/dist_copy_available_dns.sql @@ -0,0 +1,155 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +-- Test distributed COPY with a bigger data set to help find rare effects. + +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; + +\set DN_DBNAME_1 :TEST_DBNAME _1 +\set DN_DBNAME_2 :TEST_DBNAME _2 +\set DN_DBNAME_3 :TEST_DBNAME _3 + +SELECT 1 FROM add_data_node('data_node_1', host => 'localhost', + database => :'DN_DBNAME_1'); +SELECT 1 FROM add_data_node('data_node_2', host => 'localhost', + database => :'DN_DBNAME_2'); +SELECT 1 FROM add_data_node('data_node_3', host => 'localhost', + database => :'DN_DBNAME_3'); +GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO PUBLIC; +-- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes +GRANT CREATE ON SCHEMA public TO :ROLE_1; +SET ROLE :ROLE_1; + +CREATE VIEW chunk_query_data_node AS +SELECT ch.hypertable_name, format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass AS chunk, ch.data_nodes, fs.srvname default_data_node + FROM timescaledb_information.chunks ch + INNER JOIN pg_foreign_table ft ON (format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass = ft.ftrelid) + INNER JOIN pg_foreign_server fs ON (ft.ftserver = fs.oid) + ORDER BY 1 DESC, 2 DESC; + +create table uk_price_paid(price integer, "date" date, postcode1 text, postcode2 text, type smallint, is_new bool, duration smallint, addr1 text, addr2 text, street text, locality text, town text, district text, country text, category smallint); +-- Aim to about 100 partitions, the data is from 1995 to 2022. +select create_distributed_hypertable('uk_price_paid', 'date', chunk_time_interval => interval '270 day', replication_factor=>3); + +create table uk_price_paid_space2(like uk_price_paid); +select create_distributed_hypertable('uk_price_paid_space2', 'date', 'postcode2', 2, chunk_time_interval => interval '270 day', replication_factor => 2); + +\copy uk_price_paid_space2 from program 'zcat < data/prices-10k-random-1.tsv.gz'; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid_space2' LIMIT 5; + +set timescaledb.max_open_chunks_per_insert = 1; + +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz'; +select count(*) from uk_price_paid; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_1', available=>false); +SET ROLE :ROLE_1; + +set timescaledb.max_open_chunks_per_insert = 2; + +-- we will write to the same set of chunks and update AN metadata for down DN +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz'; +select count(*) from uk_price_paid; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + +set timescaledb.max_open_chunks_per_insert = 1117; + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_1', available=>true); +SET ROLE :ROLE_1; + +TRUNCATE uk_price_paid; + +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz'; +select count(*) from uk_price_paid; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + +select hypertable_name, replication_factor from timescaledb_information.hypertables +where hypertable_name like 'uk_price_paid%' order by hypertable_name; + +-- 0, 1, 2 rows +\copy uk_price_paid from stdin +\. + +select count(*) from uk_price_paid; + +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz | head -1'; + +select count(*) from uk_price_paid; + +\copy uk_price_paid from program 'zcat < data/prices-10k-random-1.tsv.gz | head -2'; + +select count(*) from uk_price_paid; + + +select count(*), sum(price), sum(price) / count(*) from uk_price_paid; + +-- Make binary file. +\copy (select * from uk_price_paid) to 'prices-10k.pgbinary' with (format binary); + +-- Binary input with binary data transfer. +set timescaledb.enable_connection_binary_data = true; +set timescaledb.dist_copy_transfer_format = 'binary'; +create table uk_price_paid_bin(like uk_price_paid); +select create_distributed_hypertable('uk_price_paid_bin', 'date', 'postcode2', + chunk_time_interval => interval '90 day', replication_factor => 3); + +\copy uk_price_paid_bin from 'prices-10k.pgbinary' with (format binary); +select count(*), sum(price), sum(price) / count(*) from uk_price_paid_bin; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid_bin' LIMIT 5; + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_1', available=>false); +SET ROLE :ROLE_1; + +-- Text input with explicit format option and binary data transfer. This will +-- update AN metadata for the down DN +\copy uk_price_paid_bin from program 'zcat < data/prices-10k-random-1.tsv.gz' with (format text); +select count(*), sum(price), sum(price) / count(*) from uk_price_paid_bin; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid_bin' LIMIT 5; + +-- Text input with text data transfer. +\copy uk_price_paid_bin from program 'zcat < data/prices-10k-random-1.tsv.gz'; +select count(*), sum(price), sum(price) / count(*) from uk_price_paid_bin; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid_bin' LIMIT 5; + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_1', available=>true); +SET ROLE :ROLE_1; + +TRUNCATE uk_price_paid; + +SET timescaledb.enable_distributed_insert_with_copy=false; +INSERT INTO uk_price_paid SELECT * FROM uk_price_paid_bin; +select count(*), sum(price), sum(price) / count(*) from uk_price_paid; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_2', available=>false); +SET ROLE :ROLE_1; +INSERT INTO uk_price_paid SELECT * FROM uk_price_paid_bin; +select count(*), sum(price), sum(price) / count(*) from uk_price_paid; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_2', available=>true); +SET ROLE :ROLE_1; + +truncate uk_price_paid; +SET timescaledb.enable_distributed_insert_with_copy=true; +INSERT INTO uk_price_paid SELECT * FROM uk_price_paid_bin; +SET ROLE :ROLE_CLUSTER_SUPERUSER; +SELECT * FROM alter_data_node('data_node_3', available=>false); +SET ROLE :ROLE_1; +INSERT INTO uk_price_paid SELECT * FROM uk_price_paid_bin; +select count(*), sum(price), sum(price) / count(*) from uk_price_paid; +SELECT * FROM chunk_query_data_node WHERE hypertable_name = 'uk_price_paid' LIMIT 5; + +-- Teardown +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +DROP DATABASE :DN_DBNAME_1; +DROP DATABASE :DN_DBNAME_2; +DROP DATABASE :DN_DBNAME_3;