From 2e6c6b5c58c5e139f8a34a0de30dd2590908f654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Wed, 8 Mar 2023 16:58:05 +0100 Subject: [PATCH] Refactor and optimize distributed COPY Refactor the code path that handles remote distributed COPY. The main changes include: * Use a hash table to lookup data node connections instead of a list. * Refactor the per-data node buffer code that accumulates rows into bigger CopyData messages. * Reduce the default number of rows in a CopyData message to 100. This seems to improve throughput, probably striking a better balance between message overhead and latency. * The number of rows to send in each CopyData message can now be changed via a new foreign data wrapper option. --- CHANGELOG.md | 1 + tsl/src/fdw/option.c | 17 +- tsl/src/nodes/data_node_copy.c | 24 +- tsl/src/remote/CMakeLists.txt | 3 +- tsl/src/remote/connection.c | 6 +- tsl/src/remote/connection.h | 4 +- tsl/src/remote/dist_copy.c | 905 ++++++++++---------- tsl/src/remote/dist_copy.h | 2 +- tsl/test/expected/dist_copy_format_long.out | 2 + tsl/test/expected/dist_remote_error-14.out | 36 +- tsl/test/expected/dist_remote_error-15.out | 36 +- tsl/test/expected/remote_copy-12.out | 58 +- tsl/test/expected/remote_copy-13.out | 58 +- tsl/test/expected/remote_copy-14.out | 58 +- tsl/test/expected/remote_copy-15.out | 58 +- tsl/test/sql/dist_copy_format_long.sql | 3 + tsl/test/sql/remote_copy.sql.in | 1 - 17 files changed, 633 insertions(+), 639 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f423bfdf8..e7e3cb74d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ accidentally triggering the load of a previous DB version.** * #5312 Add timeout support to ping_data_node() * #5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables * #5344 Enable JOINS for Hierarchical Continuous Aggregates +* #5417 Refactor and optimize distributed COPY **Bugfixes** * #5396 Fix SEGMENTBY columns predicates to be pushed down diff --git a/tsl/src/fdw/option.c b/tsl/src/fdw/option.c index 36d493da0..0c7c0492e 100644 --- a/tsl/src/fdw/option.c +++ b/tsl/src/fdw/option.c @@ -133,7 +133,8 @@ option_validate(List *options_list, Oid catalog) if (fetch_size <= 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s requires a non-negative integer value", def->defname))); + errmsg("%s requires a non-zero and positive integer value", + def->defname))); } else if (strcmp(def->defname, "available") == 0) { @@ -145,6 +146,18 @@ option_validate(List *options_list, Oid catalog) /* check and store list, warn about non existing tables */ (void) option_extract_join_ref_table_list(defGetString(def)); } + else if (strcmp(def->defname, "copy_rows_per_message") == 0) + { + int copy_rows_per_message; + + copy_rows_per_message = strtol(defGetString(def), NULL, 10); + + if (copy_rows_per_message <= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s requires a non-zero and positive integer value", + def->defname))); + } } } @@ -170,6 +183,8 @@ init_ts_fdw_options(void) { "available", ForeignServerRelationId }, /* join reference tables */ { "reference_tables", ForeignDataWrapperRelationId }, + /* Rows per CopyData when ingesting with COPY */ + { "copy_rows_per_message", ForeignDataWrapperRelationId }, { NULL, InvalidOid } }; diff --git a/tsl/src/nodes/data_node_copy.c b/tsl/src/nodes/data_node_copy.c index ae32efb9d..8ccd95de0 100644 --- a/tsl/src/nodes/data_node_copy.c +++ b/tsl/src/nodes/data_node_copy.c @@ -225,7 +225,6 @@ data_node_copy_exec(CustomScanState *node) ResultRelInfo *rri_chunk = cds->rri; const ChunkInsertState *cis = rri_chunk->ri_FdwState; MemoryContext oldmctx; - bool success; const TupleDesc rri_desc = RelationGetDescr(rri_chunk->ri_RelationDesc); if (NULL != rri_chunk->ri_projectReturning && rri_desc->constr && @@ -234,25 +233,20 @@ data_node_copy_exec(CustomScanState *node) ResetPerTupleExprContext(estate); oldmctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - success = remote_copy_send_slot(dncs->copy_ctx, slot, cis); + remote_copy_send_slot(dncs->copy_ctx, slot, cis); MemoryContextSwitchTo(oldmctx); - if (!success) - slot = ExecClearTuple(slot); - else + if (has_returning) { - if (has_returning) - { - ExprContext *econtext; + ExprContext *econtext; - Assert(NULL != rri_saved->ri_projectReturning); - econtext = rri_saved->ri_projectReturning->pi_exprContext; - econtext->ecxt_scantuple = slot; - } - - if (dncs->set_processed) - estate->es_processed++; + Assert(NULL != rri_saved->ri_projectReturning); + econtext = rri_saved->ri_projectReturning->pi_exprContext; + econtext->ecxt_scantuple = slot; } + + if (dncs->set_processed) + estate->es_processed++; } } while (!has_returning && !TupIsNull(slot)); diff --git a/tsl/src/remote/CMakeLists.txt b/tsl/src/remote/CMakeLists.txt index 9a55dffa9..3cb5b2f33 100644 --- a/tsl/src/remote/CMakeLists.txt +++ b/tsl/src/remote/CMakeLists.txt @@ -21,4 +21,5 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/txn_store.c ${CMAKE_CURRENT_SOURCE_DIR}/utils.c) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) -target_include_directories(${TSL_LIBRARY_NAME} PRIVATE ${PG_INCLUDEDIR}) +target_include_directories(${TSL_LIBRARY_NAME} + PRIVATE ${PG_INCLUDEDIR} ${PG_INCLUDEDIR}/postgresql) diff --git a/tsl/src/remote/connection.c b/tsl/src/remote/connection.c index 31ea3ecfd..30634dca9 100644 --- a/tsl/src/remote/connection.c +++ b/tsl/src/remote/connection.c @@ -2363,7 +2363,7 @@ err_end_copy: return false; } -bool +int remote_connection_put_copy_data(TSConnection *conn, const char *buffer, size_t len, TSConnectionError *err) { @@ -2371,13 +2371,13 @@ remote_connection_put_copy_data(TSConnection *conn, const char *buffer, size_t l res = PQputCopyData(remote_connection_get_pg_conn(conn), buffer, len); - if (res != 1) + if (res == -1) return fill_connection_error(err, ERRCODE_CONNECTION_EXCEPTION, "could not send COPY data", conn); - return true; + return res; } static bool diff --git a/tsl/src/remote/connection.h b/tsl/src/remote/connection.h index 28263cfe6..5de04def6 100644 --- a/tsl/src/remote/connection.h +++ b/tsl/src/remote/connection.h @@ -150,8 +150,8 @@ extern RemoteConnectionStats *remote_connection_stats_get(void); extern bool remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binary, TSConnectionError *err); extern bool remote_connection_end_copy(TSConnection *conn, TSConnectionError *err); -extern bool remote_connection_put_copy_data(TSConnection *conn, const char *buffer, size_t len, - TSConnectionError *err); +extern int remote_connection_put_copy_data(TSConnection *conn, const char *buffer, size_t len, + TSConnectionError *err); /* Error handling functions for connections */ extern void remote_connection_get_error(const TSConnection *conn, TSConnectionError *err); diff --git a/tsl/src/remote/dist_copy.c b/tsl/src/remote/dist_copy.c index 43376c9b9..c2dfcfd00 100644 --- a/tsl/src/remote/dist_copy.c +++ b/tsl/src/remote/dist_copy.c @@ -3,21 +3,25 @@ * Please see the included NOTICE for copyright information and * LICENSE-TIMESCALE for a copy of the license. */ -#include "dist_copy.h" - #include #include #include +#include #include #include +#include #include #include #include +#include #include #include +#include +#include "dist_copy.h" #include "compat/compat.h" #include "chunk.h" +#include "config.h" #include "copy.h" #include "data_node.h" #include "dimension.h" @@ -28,6 +32,7 @@ #include "nodes/chunk_dispatch/chunk_dispatch.h" #include "nodes/chunk_dispatch/chunk_insert_state.h" #include "partitioning.h" +#include "remote/connection.h" #include "remote/connection_cache.h" #include "remote/dist_txn.h" #include "ts_catalog/chunk_data_node.h" @@ -36,18 +41,14 @@ #define DEFAULT_PG_NULL_VALUE "\\N" /* - * Maximum number of rows in batch for insert. Note that arrays of this size are - * also allocated on stack. - * Don't forget to change the broken recv() tests in dist_remote_error that test - * some boundary conditions around this number. + * Default value for maximum number of rows to send in one CopyData + * message. The setting can be tweaked at runtime via: + * + * ALTER FOREIGN DATA WRAPPER timescaledb_fdw + * OPTIONS (ADD copy_rows_per_message '200'); + * */ -#define MAX_BATCH_ROWS 1024 - -/* - * Maximum bytes of COPY data in batch. This is also the default size of the - * output copy data buffer. - */ -#define MAX_BATCH_BYTES (1 * 1024 * 1024) +#define DEFAULT_COPY_ROWS_PER_MESSAGE 100 /* This contains the information needed to parse a dimension attribute out of a row of text copy * data @@ -66,6 +67,11 @@ typedef struct DataNodeConnection { TSConnectionId id; TSConnection *connection; + size_t bytes_in_message; + size_t rows_in_message; + size_t rows_sent; + size_t outbuf_size; + char *outbuf; } DataNodeConnection; /* This contains information about connections currently in use by the copy as well as how to create @@ -85,13 +91,7 @@ typedef struct CopyConnectionState * the dist_txn layer. Chunks are created interleaved with the actual COPY * operation, so we would have to somehow maintain these two layers in sync. */ - List *data_node_connections; - - /* - * Connections to which we have written something and have to finalize them. - */ - List *connections_in_use; - + HTAB *data_node_connections; bool using_binary; const char *outgoing_copy_cmd; } CopyConnectionState; @@ -126,23 +126,20 @@ typedef struct RemoteCopyContext /* Operation data */ CopyConnectionState connection_state; Hypertable *ht; + Oid user_id; List *attnums; + Point *point; void *data_context; /* TextCopyContext or BinaryCopyContext */ bool binary_operation; MemoryContext mctx; /* MemoryContext that holds the RemoteCopyContext */ bool dns_unavailable; /* are some DNs marked as "unavailable"? */ - - /* - * Incoming rows are batched before creating the chunks and sending them to - * data nodes. The following fields contain the current batch of rows. - */ - StringInfo *batch_row_data; - int batch_row_count; - int batch_size_bytes; - Point **batch_points; - int batch_ordinal; + uint64 num_rows; + uint32 copy_rows_per_message; } RemoteCopyContext; +/* From libpq-int.c */ +extern int ts_pqPutMsgStart(char msg_type, PGconn *conn); + /* * This will create and populate a CopyDimensionInfo struct from the passed in * dimensions and values. @@ -246,48 +243,35 @@ convert_datum_to_dim_idx(Datum datum, const Dimension *d) } } -static Point * +static void calculate_hyperspace_point_from_fields(char **data, CopyDimensionInfo *dimensions, - int num_dimensions) + int num_dimensions, Point *p) { - Point *p; int i; - p = palloc0(POINT_SIZE(num_dimensions)); - p->cardinality = num_dimensions; - p->num_coords = num_dimensions; + p->cardinality = p->num_coords = num_dimensions; for (i = 0; i < num_dimensions; ++i) { Datum datum = get_copy_dimension_datum(data, &dimensions[i]); p->coordinates[i] = convert_datum_to_dim_idx(datum, dimensions[i].dim); } - - return p; } /* * Look up or set up a COPY connection to the data node. */ -static TSConnection * -get_copy_connection_to_data_node(RemoteCopyContext *context, TSConnectionId required_id) +static DataNodeConnection * +get_copy_connection_to_data_node(RemoteCopyContext *context, Oid data_node_oid) { - MemoryContext old = MemoryContextSwitchTo(context->mctx); CopyConnectionState *state = &context->connection_state; - TSConnection *connection = NULL; - ListCell *lc; - foreach (lc, state->data_node_connections) - { - DataNodeConnection *entry = (DataNodeConnection *) lfirst(lc); - if (required_id.server_id == entry->id.server_id && - required_id.user_id == entry->id.user_id) - { - connection = entry->connection; - break; - } - } + TSConnectionId required_id = remote_connection_id(data_node_oid, context->user_id); + DataNodeConnection *entry; + bool found = false; - if (connection == NULL) + entry = hash_search(state->data_node_connections, &required_id, HASH_ENTER, &found); + + if (!found) { /* * Did not find a cached connection, create a new one and cache it. @@ -298,51 +282,49 @@ get_copy_connection_to_data_node(RemoteCopyContext *context, TSConnectionId requ * uses faster functions that do this for several connections in * parallel. */ - connection = remote_dist_txn_get_connection(required_id, REMOTE_TXN_NO_PREP_STMT); - - DataNodeConnection *entry = palloc(sizeof(DataNodeConnection)); - entry->connection = connection; + MemoryContext old = MemoryContextSwitchTo(context->mctx); + entry->connection = remote_dist_txn_get_connection(required_id, REMOTE_TXN_NO_PREP_STMT); entry->id = required_id; + entry->bytes_in_message = 0; + entry->rows_in_message = 0; + entry->rows_sent = 0; +#ifdef TS_DEBUG + /* Use a small output buffer in tests to make sure we test growing the + * buffer */ + entry->outbuf_size = 10 * 1024; +#else + /* Assume one row is 1k when allocating the output buffer. If not + * enough, we will grow it later. */ + entry->outbuf_size = context->copy_rows_per_message * 1024; +#endif - state->data_node_connections = lappend(state->data_node_connections, entry); + entry->outbuf = palloc(entry->outbuf_size); + MemoryContextSwitchTo(old); } /* * Begin COPY on the connection if needed. */ - TSConnectionStatus status = remote_connection_get_status(connection); + TSConnectionStatus status = remote_connection_get_status(entry->connection); if (status == CONN_IDLE) { TSConnectionError err; - if (!remote_connection_begin_copy(connection, - psprintf("%s /* batch %d conn %p */", + + if (!remote_connection_begin_copy(entry->connection, + psprintf("%s /* row " INT64_FORMAT " conn %p */", state->outgoing_copy_cmd, - context->batch_ordinal, - remote_connection_get_pg_conn(connection)), + context->num_rows, + remote_connection_get_pg_conn( + entry->connection)), state->using_binary, &err)) { remote_connection_error_elog(&err, ERROR); } - - /* - * Add the connection to the list of active connections to be - * flushed later. - * The normal distributed insert path (not dist_copy, but - * data_node_copy) doesn't reset the connections when it creates - * a new chunk. So the connection status will be idle after we - * created a new chunk, but it will still be in the list of - * active connections. Don't add duplicates. - */ - if (!list_member(state->connections_in_use, connection)) - { - state->connections_in_use = lappend(state->connections_in_use, connection); - } } else if (status == CONN_COPY_IN) { /* Ready to use. */ - Assert(list_member(state->connections_in_use, connection)); } else { @@ -353,9 +335,7 @@ get_copy_connection_to_data_node(RemoteCopyContext *context, TSConnectionId requ required_id.server_id); } - MemoryContextSwitchTo(old); - - return connection; + return entry; } /* @@ -367,7 +347,17 @@ flush_active_connections(CopyConnectionState *state) /* * The connections that we are going to flush on the current iteration. */ - List *to_flush = list_copy(state->connections_in_use); + List *to_flush = NIL; + HASH_SEQ_STATUS status; + DataNodeConnection *dnc; + + hash_seq_init(&status, state->data_node_connections); + + for (dnc = hash_seq_search(&status); dnc != NULL; dnc = hash_seq_search(&status)) + { + to_flush = lappend(to_flush, dnc->connection); + } + /* * The connections that were busy on the current iteration and that we have * to wait for. @@ -497,9 +487,14 @@ end_copy_on_success(CopyConnectionState *state) { List *to_end_copy = NIL; ListCell *lc; - foreach (lc, state->connections_in_use) + HASH_SEQ_STATUS status; + DataNodeConnection *dnc; + + hash_seq_init(&status, state->data_node_connections); + + for (dnc = hash_seq_search(&status); dnc != NULL; dnc = hash_seq_search(&status)) { - TSConnection *conn = lfirst(lc); + TSConnection *conn = dnc->connection; /* * We expect the connection to be in CONN_COPY_IN status. @@ -634,8 +629,6 @@ end_copy_on_success(CopyConnectionState *state) } list_free(to_end_copy); - list_free(state->connections_in_use); - state->connections_in_use = NIL; } static void @@ -644,10 +637,14 @@ end_copy_on_failure(CopyConnectionState *state) /* Exit the copy subprotocol. */ TSConnectionError err = { 0 }; bool failure = false; - ListCell *lc; - foreach (lc, state->connections_in_use) + HASH_SEQ_STATUS status; + DataNodeConnection *dnc; + + hash_seq_init(&status, state->data_node_connections); + + for (dnc = hash_seq_search(&status); dnc != NULL; dnc = hash_seq_search(&status)) { - TSConnection *conn = lfirst(lc); + TSConnection *conn = dnc->connection; if (remote_connection_get_status(conn) == CONN_COPY_IN && !remote_connection_end_copy(conn, &err)) @@ -656,30 +653,10 @@ end_copy_on_failure(CopyConnectionState *state) } } - list_free(state->connections_in_use); - state->connections_in_use = NIL; - if (failure) remote_connection_error_elog(&err, ERROR); } -static const List * -get_connections_for_chunk(RemoteCopyContext *context, int32 chunk_id, const List *chunk_data_nodes, - Oid userid) -{ - List *conns = NIL; - - ListCell *lc; - foreach (lc, chunk_data_nodes) - { - ChunkDataNode *cdn = lfirst(lc); - TSConnectionId required_id = remote_connection_id(cdn->foreign_server_oid, userid); - conns = lappend(conns, get_copy_connection_to_data_node(context, required_id)); - } - - return conns; -} - /* * Extract a quoted list of identifiers from a DefElem with arg type T_list. */ @@ -697,7 +674,7 @@ name_list_to_string(const DefElem *def) Node *name = (Node *) lfirst(lc); if (!first) - appendStringInfo(&string, ", "); + appendStringInfoString(&string, ", "); else first = false; @@ -1008,6 +985,23 @@ generate_binary_copy_context(ExprContext *econtext, const Hypertable *ht, const return ctx; } +static uint32 +get_copy_rows_per_message(void) +{ + ForeignDataWrapper *fdw = GetForeignDataWrapperByName(EXTENSION_FDW_NAME, false); + ListCell *lc; + + foreach (lc, fdw->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "copy_rows_per_message") == 0) + return strtol(defGetString(def), NULL, 10); + } + + return DEFAULT_COPY_ROWS_PER_MESSAGE; +} + RemoteCopyContext * remote_copy_begin(const CopyStmt *stmt, Hypertable *ht, ExprContext *per_tuple_ctx, List *attnums, bool binary_copy) @@ -1016,24 +1010,30 @@ remote_copy_begin(const CopyStmt *stmt, Hypertable *ht, ExprContext *per_tuple_c AllocSetContextCreate(CurrentMemoryContext, "Remote COPY", ALLOCSET_DEFAULT_SIZES); RemoteCopyContext *context; MemoryContext oldmctx; + struct HASHCTL hctl = { + .keysize = sizeof(TSConnectionId), + .entrysize = sizeof(DataNodeConnection), + .hcxt = mctx, + }; oldmctx = MemoryContextSwitchTo(mctx); context = palloc0(sizeof(RemoteCopyContext)); context->ht = ht; + context->user_id = GetUserId(); context->attnums = attnums; context->mctx = mctx; context->binary_operation = binary_copy; - context->connection_state.connections_in_use = NIL; - context->connection_state.data_node_connections = NIL; + context->connection_state.data_node_connections = + hash_create("COPY connections", + list_length(ht->data_nodes), + &hctl, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); context->connection_state.using_binary = binary_copy; context->connection_state.outgoing_copy_cmd = deparse_copy_cmd(stmt, ht, binary_copy); context->dns_unavailable = data_node_some_unavailable(); - - context->batch_row_data = palloc0(sizeof(StringInfo) * MAX_BATCH_ROWS); - context->batch_points = palloc0(sizeof(Point *) * MAX_BATCH_ROWS); - context->batch_row_count = 0; - context->batch_size_bytes = 0; - context->batch_ordinal = 0; + context->num_rows = 0; + context->copy_rows_per_message = get_copy_rows_per_message(); + context->point = palloc0(POINT_SIZE(ht->space->num_dimensions)); if (binary_copy) context->data_context = generate_binary_copy_context(per_tuple_ctx, ht, attnums); @@ -1075,13 +1075,11 @@ is_special_character(char c) * assumption that the text encoding used in the client (from which we received * the data) is the same as the destination data node's encoding. */ -static StringInfo -parse_next_text_row(CopyFromState cstate, List *attnums, TextCopyContext *ctx) +static bool +parse_next_text_row(CopyFromState cstate, List *attnums, TextCopyContext *ctx, StringInfo row_data) { - StringInfo row_data = makeStringInfo(); - if (!NextCopyFromRawFields(cstate, &ctx->fields, &ctx->nfields)) - return NULL; + return false; /* check for overflowing/missing fields */ if (ctx->nfields != list_length(attnums)) @@ -1167,13 +1165,13 @@ parse_next_text_row(CopyFromState cstate, List *attnums, TextCopyContext *ctx) row_data->len = len; Assert(len <= row_data->maxlen); - return row_data; + return true; } -static StringInfo -generate_binary_copy_data(Datum *values, bool *nulls, List *attnums, FmgrInfo *out_functions) +static void +write_binary_copy_data(Datum *values, bool *nulls, List *attnums, FmgrInfo *out_functions, + StringInfo row_data) { - StringInfo row_data = makeStringInfo(); uint16 buf16; uint32 buf32; ListCell *lc; @@ -1203,36 +1201,29 @@ generate_binary_copy_data(Datum *values, bool *nulls, List *attnums, FmgrInfo *o appendBinaryStringInfo(row_data, VARDATA(outputbytes), output_length); } } - - return row_data; } -static StringInfo -parse_next_binary_row(CopyFromState cstate, List *attnums, BinaryCopyContext *ctx) +static bool +parse_next_binary_row(CopyFromState cstate, List *attnums, BinaryCopyContext *ctx, + StringInfo row_data) { MemoryContext old = MemoryContextSwitchTo(ctx->econtext->ecxt_per_tuple_memory); bool result = NextCopyFrom(cstate, ctx->econtext, ctx->values, ctx->nulls); MemoryContextSwitchTo(old); if (!result) - return NULL; + return false; - return generate_binary_copy_data(ctx->values, ctx->nulls, attnums, ctx->out_functions); + write_binary_copy_data(ctx->values, ctx->nulls, attnums, ctx->out_functions, row_data); + return true; } -static Point * -get_current_point_for_text_copy(TextCopyContext *ctx) +static void +calculate_hyperspace_point_from_binary(Datum *values, bool *nulls, const Hyperspace *space, + Point *p) { - return calculate_hyperspace_point_from_fields(ctx->fields, ctx->dimensions, ctx->ndimensions); -} - -static Point * -calculate_hyperspace_point_from_binary(Datum *values, bool *nulls, const Hyperspace *space) -{ - Point *p; int i; - p = palloc0(POINT_SIZE(space->num_dimensions)); p->cardinality = space->num_dimensions; p->num_coords = space->num_dimensions; @@ -1248,287 +1239,39 @@ calculate_hyperspace_point_from_binary(Datum *values, bool *nulls, const Hypersp NameStr(dim->fd.column_name)))); p->coordinates[i] = convert_datum_to_dim_idx(datum, dim); } - - return p; } static Point * -get_current_point_for_binary_copy(BinaryCopyContext *ctx, const Hyperspace *hs) +read_next_copy_row(RemoteCopyContext *context, CopyFromState cstate, StringInfo row_data) { - return calculate_hyperspace_point_from_binary(ctx->values, ctx->nulls, hs); -} + bool read_row; -static bool -read_next_copy_row(RemoteCopyContext *context, CopyFromState cstate) -{ - Point *point = NULL; - Hypertable *ht = context->ht; - StringInfo row_data; + if (context->binary_operation) + read_row = parse_next_binary_row(cstate, context->attnums, context->data_context, row_data); + else + read_row = parse_next_text_row(cstate, context->attnums, context->data_context, row_data); + + if (!read_row) + return NULL; if (context->binary_operation) { - row_data = parse_next_binary_row(cstate, context->attnums, context->data_context); + BinaryCopyContext *ctx = context->data_context; + calculate_hyperspace_point_from_binary(ctx->values, + ctx->nulls, + context->ht->space, + context->point); } else { - row_data = parse_next_text_row(cstate, context->attnums, context->data_context); + TextCopyContext *ctx = context->data_context; + calculate_hyperspace_point_from_fields(ctx->fields, + ctx->dimensions, + ctx->ndimensions, + context->point); } - if (row_data == NULL) - { - return false; - } - - if (context->binary_operation) - { - point = get_current_point_for_binary_copy(context->data_context, ht->space); - } - else - { - point = get_current_point_for_text_copy(context->data_context); - } - - Assert(context->batch_row_count < MAX_BATCH_ROWS); - context->batch_row_data[context->batch_row_count] = row_data; - context->batch_points[context->batch_row_count] = point; - - context->batch_row_count++; - context->batch_size_bytes += row_data->len; - - return true; -} - -static bool -send_copy_data(StringInfo row_data, const List *connections) -{ - ListCell *lc; - - foreach (lc, connections) - { - TSConnection *conn = lfirst(lc); - TSConnectionError err; - - if (!remote_connection_put_copy_data(conn, row_data->data, row_data->len, &err)) - remote_connection_error_elog(&err, ERROR); - } - - return true; -} - -/* - * Rows for sending to a particular data node. - */ -typedef struct DataNodeRows -{ - int data_node_id; - Oid server_oid; - TSConnection *connection; - int rows_total; - - /* Array of indices into the batch row array. */ - int *row_indices; -} DataNodeRows; - -static void -remote_copy_process_and_send_data(RemoteCopyContext *context) -{ - Hypertable *ht = context->ht; - const int n = context->batch_row_count; - Assert(n <= MAX_BATCH_ROWS); - int32 chunk_id = INVALID_CHUNK_ID; - List *chunk_data_nodes = NIL; - - /* - * This list tracks the per-batch insert states of the data nodes - * (DataNodeRows). - */ - List *data_nodes = NIL; - - /* For each row, find or create the destination chunk. */ - bool did_end_copy = false; - for (int row_in_batch = 0; row_in_batch < n; row_in_batch++) - { - Point *point = context->batch_points[row_in_batch]; - bool found; - - Chunk *chunk = ts_hypertable_find_chunk_for_point(ht, point); - if (chunk == NULL) - { - if (!did_end_copy) - { - /* - * The data node connections have to be flushed and the COPY - * query ended before creating - * a new chunk. They might have outstanding COPY data from the - * previous batch. - */ - end_copy_on_success(&context->connection_state); - did_end_copy = true; - } - chunk = ts_hypertable_create_chunk_for_point(ht, point, &found); - } - else - found = true; - - /* - * Get the filtered list of "available" DNs for this chunk but only if it's replicated. We - * only fetch the filtered list once. Assuming that inserts will typically go to the same - * chunk we should be able to reuse this filtered list a few more times - * - * The worse case scenario is one in which INSERT1 goes into CHUNK1, INSERT2 goes into - * CHUNK2, INSERT3 goes into CHUNK1,... in which case we will end up refreshing the list - * everytime - * - * We will also enter the below loop if we KNOW that any of the DNs has been marked - * unavailable before we started this transaction. If not, then we know that every chunk's - * datanode list is fine and no stale chunk metadata updates are needed. - */ - if (context->dns_unavailable && found && ht->fd.replication_factor > 1) - { - if (chunk_id != chunk->fd.id) - chunk_id = INVALID_CHUNK_ID; - - if (chunk_id == INVALID_CHUNK_ID) - { - if (chunk_data_nodes) - list_free(chunk_data_nodes); - chunk_data_nodes = - ts_chunk_data_node_scan_by_chunk_id_filter(chunk->fd.id, CurrentMemoryContext); - chunk_id = chunk->fd.id; - } - - Assert(chunk_id == chunk->fd.id); - Assert(chunk_data_nodes != NIL); - /* - * If the chunk was not created as part of this insert, we need to check whether any - * of the chunk's data nodes are currently unavailable and in that case consider the - * chunk stale on those data nodes. Do that by removing the AN's chunk-datanode - * mapping for the unavailable data nodes. - * - * Note that the metadata will only get updated once since we assign the chunk's - * data_node list to the list of available DNs the first time this - * dist_update_stale_chunk_metadata API gets called. So both chunk_data_nodes and - * chunk->data_nodes will point to the same list and no subsequent metadata updates will - * occur. - */ - if (ht->fd.replication_factor > list_length(chunk_data_nodes)) - ts_cm_functions->dist_update_stale_chunk_metadata(chunk, chunk_data_nodes); - } - - /* - * For remote copy, we don't use chunk insert states on the AN. - * So we need to explicitly set the chunk as partial when copies - * are directed to previously compressed chunks. - */ - if (ts_chunk_is_compressed(chunk) && (!ts_chunk_is_partial(chunk))) - ts_chunk_set_partial(chunk); - - /* - * Schedule the row for sending to the data nodes containing the chunk. - */ - ListCell *lc; - foreach (lc, chunk->data_nodes) - { - ChunkDataNode *chunk_data_node = lfirst(lc); - /* Find the existing insert state for this data node. */ - DataNodeRows *data_node_rows = NULL; - ListCell *lc2; - foreach (lc2, data_nodes) - { - data_node_rows = lfirst(lc2); - if (chunk_data_node->foreign_server_oid == data_node_rows->server_oid) - { - break; - } - } - - if (lc2 == NULL) - { - /* No insert state for this data node yet. Create it. */ - data_node_rows = palloc(sizeof(DataNodeRows)); - data_node_rows->connection = NULL; - data_node_rows->server_oid = chunk_data_node->foreign_server_oid; - data_node_rows->rows_total = 0; - /* - * Not every tuple in a batch might be sent to every data node, - * but we allocate the maximum possible size to avoid resizing. - */ - data_node_rows->row_indices = palloc(sizeof(int) * context->batch_row_count); - data_nodes = lappend(data_nodes, data_node_rows); - } - - Assert(data_node_rows->server_oid == chunk_data_node->foreign_server_oid); - - data_node_rows->row_indices[data_node_rows->rows_total] = row_in_batch; - data_node_rows->rows_total++; - } - } - - /* - * Flush the previous batch to avoid growing the outgoing buffers - * indefinitely if some data node is not keeping up. It would be more - * efficient to check for buffer growth and only flush then, but libpq - * doesn't provide a way to know the outgoing buffer size. It also doesn't - * provide any way to control the outgoing buffer size. - * Don't do it if we have ended the COPY above to create new chunks. - * The number 11 is an arbitrary prime, growing the output buffer to at - * most 11ki rows sounds reasonable. - */ - if (!did_end_copy && context->batch_ordinal % 11 == 0) - { - flush_active_connections(&context->connection_state); - } - - /* - * Actually send the data to the data nodes. We don't interleave the data - * nodes here, because the batches are relatively small. - */ - StringInfoData copy_data = { .data = palloc(MAX_BATCH_BYTES), .maxlen = MAX_BATCH_BYTES }; - ListCell *lc; - foreach (lc, data_nodes) - { - DataNodeRows *dn = lfirst(lc); - TSConnectionId required_id = remote_connection_id(dn->server_oid, GetUserId()); - Assert(dn->connection == NULL); - dn->connection = get_copy_connection_to_data_node(context, required_id); - PGconn *pg_conn = remote_connection_get_pg_conn(dn->connection); - - resetStringInfo(©_data); - for (int row = 0; row < dn->rows_total; row++) - { - StringInfo row_data = context->batch_row_data[dn->row_indices[row]]; - appendBinaryStringInfo(©_data, row_data->data, row_data->len); - } - - /* - * Send the copy data to the remote server. - * It can't really return 0 ("would block") until it runs out - * of memory. It just grows the buffer and tries to flush in - * pqPutMsgEnd(). - */ - int res = PQputCopyData(pg_conn, copy_data.data, copy_data.len); - - if (res == -1) - { - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_EXCEPTION), - errmsg("could not send COPY data"), - errdetail("%s", PQerrorMessage(pg_conn)))); - } - - /* - * We don't have to specially flush the data here, because the flush is - * attempted after finishing each protocol message (pqPutMsgEnd()). - */ - } - - /* reset static vars before returning */ - chunk_id = INVALID_CHUNK_ID; - if (chunk_data_nodes) - { - list_free(chunk_data_nodes); - chunk_data_nodes = NIL; - } + return context->point; } /* @@ -1545,58 +1288,301 @@ remote_copy_end_on_success(RemoteCopyContext *context) MemoryContextDelete(context->mctx); } +/* + * Write data to a CopyData message. + * + * The function allows writing multiple rows to the same CopyData message to + * reduce overhead. The CopyData message will automatically be "ended" and + * flushed when it reaches its max number of rows. + * + * Returns 0 on success, -1 on failure and 1 if the CopyData message could not + * be completely flushed. + */ +static int +write_copy_data(RemoteCopyContext *context, DataNodeConnection *dnc, const char *data, size_t len, + bool endmsg) +{ + PGconn *conn = remote_connection_get_pg_conn(dnc->connection); + + /* + * Check if the message buffer is big enough to fit the additional + * data. Otherwise compute the required size and double that. + */ + if (dnc->bytes_in_message + len > dnc->outbuf_size) + { + char *newbuf; + size_t newsize = (dnc->bytes_in_message + len) * 2; + MemoryContext old = MemoryContextSwitchTo(context->mctx); + newbuf = repalloc(dnc->outbuf, newsize); + dnc->outbuf = newbuf; + dnc->outbuf_size = newsize; + MemoryContextSwitchTo(old); + } + + memcpy(dnc->outbuf + dnc->bytes_in_message, data, len); + dnc->bytes_in_message += len; + dnc->rows_in_message++; + dnc->rows_sent++; + + if (endmsg || dnc->rows_in_message >= context->copy_rows_per_message) + { + int ret = PQputCopyData(conn, dnc->outbuf, dnc->bytes_in_message); + + if (ret == 0) + { + /* Not queued, full buffers and could not allocate memory. This is + * a very unlikely situation, and it is possible to try to flush + * and wait on writeability instead of failing. */ + elog(ERROR, "could not allocate memory for COPY data"); + } + else if (ret == -1) + { + return -1; + } + else + { + /* Successfully queued */ + Assert(ret == 1); + } + + /* Clear output buffer and flush */ + dnc->bytes_in_message = 0; + dnc->rows_in_message = 0; + + return PQflush(conn); + } + + return 0; +} + +/* + * Flush every pending message on data node connections. + */ +static void +write_copy_data_end(RemoteCopyContext *context) +{ + HASH_SEQ_STATUS status; + DataNodeConnection *dnc; + int num_blocked_nodes = 0; + + hash_seq_init(&status, context->connection_state.data_node_connections); + + for (dnc = hash_seq_search(&status); dnc != NULL; dnc = hash_seq_search(&status)) + { + if (dnc->bytes_in_message > 0) + { + PGconn *conn = remote_connection_get_pg_conn(dnc->connection); + int ret = PQputCopyData(conn, dnc->outbuf, dnc->bytes_in_message); + + if (ret == 0) + { + /* Not queued, full buffers and could not allocate memory */ + elog(ERROR, "could not allocate memory for COPY data"); + } + else if (ret == -1) + { + remote_connection_elog(dnc->connection, ERROR); + } + else + { + /* Successfully queued */ + Assert(ret == 1); + } + + ret = PQflush(conn); + + switch (ret) + { + case -1: + /* failure */ + remote_connection_elog(dnc->connection, ERROR); + break; + case 0: + /* flushed */ + break; + default: + Assert(ret == 1); + /* partial flush */ + num_blocked_nodes++; + break; + } + + dnc->bytes_in_message = 0; + dnc->rows_in_message = 0; + } + } + + if (num_blocked_nodes > 0) + flush_active_connections(&context->connection_state); +} + +static void +send_row_to_data_nodes(RemoteCopyContext *context, List *data_nodes, StringInfo row_data, + bool endmsg) +{ + ListCell *lc; + int num_blocked_nodes = 0; + + foreach (lc, data_nodes) + { + ChunkDataNode *chunk_data_node = lfirst(lc); + /* Find the existing insert state for this data node. */ + DataNodeConnection *dnc = + get_copy_connection_to_data_node(context, chunk_data_node->foreign_server_oid); + int ret; + + ret = write_copy_data(context, dnc, row_data->data, row_data->len, endmsg); + + if (ret == -1) + remote_connection_elog(dnc->connection, ERROR); + else if (ret == 1) + { + /* Could not flush completely */ + num_blocked_nodes++; + } + else + { + Assert(ret == 0); + } + } + + if (num_blocked_nodes > 0) + flush_active_connections(&context->connection_state); +} + uint64 remote_distributed_copy(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums) { - MemoryContext oldmctx = CurrentMemoryContext; + MemoryContext orig_mctx = CurrentMemoryContext; EState *estate = ccstate->estate; Hypertable *ht = ccstate->dispatch->hypertable; - RemoteCopyContext *context = remote_copy_begin(stmt, - ht, - GetPerTupleExprContext(estate), - attnums, - copy_should_send_binary(stmt)); - uint64 processed = 0; - - MemoryContext batch_context = - AllocSetContextCreate(CurrentMemoryContext, "Remote COPY batch", ALLOCSET_DEFAULT_SIZES); + RemoteCopyContext *context; + uint64 processed; + context = remote_copy_begin(stmt, + ht, + GetPerTupleExprContext(estate), + attnums, + copy_should_send_binary(stmt)); PG_TRY(); { - MemoryContextSwitchTo(batch_context); + StringInfoData row_data; + List *chunk_data_nodes = NIL; + int32 chunk_id = INVALID_CHUNK_ID; + Chunk *chunk = NULL; + + initStringInfo(&row_data); + + MemoryContextSwitchTo(GetPerTupleMemoryContext(ccstate->estate)); + while (true) { + bool found; + Point *p; + + Assert(CurrentMemoryContext == GetPerTupleMemoryContext(ccstate->estate)); + CHECK_FOR_INTERRUPTS(); - ResetPerTupleExprContext(ccstate->estate); + resetStringInfo(&row_data); - /* Actually process the next row. */ - bool eof = !read_next_copy_row(context, ccstate->cstate); - if (!eof && context->batch_row_count < MAX_BATCH_ROWS && - context->batch_size_bytes < MAX_BATCH_BYTES) + p = read_next_copy_row(context, ccstate->cstate, &row_data); + + if (p == NULL) + break; + + chunk = ts_hypertable_find_chunk_for_point(ht, p); + + if (chunk == NULL) { /* - * Accumulate more rows into the current batch. + * Since at least one connection will switch out of COPY_IN + * mode to create the chunk, we need to first end the current + * CopyData message and flush the data. Do it on all + * connections, even though only one connection might be + * affected (unless replication_factor > 1). */ - continue; + write_copy_data_end(context); + + /* No need to exit out of the COPY_IN mode in order to create + * the chunk on the same connection; it is handled + * automatically by remote_dist_txn_get_connection(). */ + chunk = ts_hypertable_create_chunk_for_point(ht, p, &found); + } + else + found = true; + + /* + * Get the filtered list of "available" DNs for this chunk but only if it's replicated. + * We only fetch the filtered list once. Assuming that inserts will typically go to the + * same chunk we should be able to reuse this filtered list a few more times + * + * The worse case scenario is one in which INSERT1 goes into CHUNK1, INSERT2 goes into + * CHUNK2, INSERT3 goes into CHUNK1,... in which case we will end up refreshing the list + * everytime + * + * We will also enter the below loop if we KNOW that any of the DNs has been marked + * unavailable before we started this transaction. If not, then we know that every + * chunk's datanode list is fine and no stale chunk metadata updates are needed. + */ + if (context->dns_unavailable && found && ht->fd.replication_factor > 1) + { + MemoryContext oldmctx = MemoryContextSwitchTo(context->mctx); + + if (chunk_id != chunk->fd.id) + chunk_id = INVALID_CHUNK_ID; + + if (chunk_id == INVALID_CHUNK_ID) + { + if (chunk_data_nodes) + list_free(chunk_data_nodes); + chunk_data_nodes = + ts_chunk_data_node_scan_by_chunk_id_filter(chunk->fd.id, + CurrentMemoryContext); + chunk_id = chunk->fd.id; + } + + Assert(chunk_id == chunk->fd.id); + Assert(chunk_data_nodes != NIL); + /* + * If the chunk was not created as part of this insert, we need to check whether any + * of the chunk's data nodes are currently unavailable and in that case consider the + * chunk stale on those data nodes. Do that by removing the AN's chunk-datanode + * mapping for the unavailable data nodes. + * + * Note that the metadata will only get updated once since we assign the chunk's + * data_node list to the list of available DNs the first time this + * dist_update_stale_chunk_metadata API gets called. So both chunk_data_nodes and + * chunk->data_nodes will point to the same list and no subsequent metadata updates + * will occur. + */ + if (ht->fd.replication_factor > list_length(chunk_data_nodes)) + ts_cm_functions->dist_update_stale_chunk_metadata(chunk, chunk_data_nodes); + + MemoryContextSwitchTo(oldmctx); } /* - * Send out the current batch. + * For remote copy, we don't use chunk insert states on the AN. + * So we need to explicitly set the chunk as partial when copies + * are directed to previously compressed chunks. */ - remote_copy_process_and_send_data(context); + if (ts_chunk_is_compressed(chunk) && (!ts_chunk_is_partial(chunk))) + ts_chunk_set_partial(chunk); - processed += context->batch_row_count; - context->batch_row_count = 0; - context->batch_size_bytes = 0; - context->batch_ordinal++; - MemoryContextReset(batch_context); - - if (eof) - { - break; - } + /* + * Write the copy data to the data node connections. + */ + send_row_to_data_nodes(context, chunk->data_nodes, &row_data, false); + context->num_rows++; } + + /* End the final CopyData messages, if any, and flush. */ + write_copy_data_end(context); + MemoryContextSwitchTo(orig_mctx); + + if (chunk_data_nodes) + list_free(chunk_data_nodes); } PG_CATCH(); { @@ -1607,8 +1593,9 @@ remote_distributed_copy(const CopyStmt *stmt, CopyChunkState *ccstate, List *att } PG_END_TRY(); + processed = context->num_rows; + remote_copy_end_on_success(context); - MemoryContextSwitchTo(oldmctx); return processed; } @@ -1620,12 +1607,13 @@ remote_distributed_copy(const CopyStmt *stmt, CopyChunkState *ccstate, List *att * data is already "routed" to the "right" chunk as indicated by the chunk * insert state. */ -bool +void remote_copy_send_slot(RemoteCopyContext *context, TupleTableSlot *slot, const ChunkInsertState *cis) { ListCell *lc; - bool result; - StringInfo row_data; + StringInfoData row_data; + + initStringInfo(&row_data); /* Pre-materialize all attributes since we will access all of them */ slot_getallattrs(slot); @@ -1644,18 +1632,17 @@ remote_copy_send_slot(RemoteCopyContext *context, TupleTableSlot *slot, const Ch binctx->values[i] = slot_getattr(slot, attnum, &binctx->nulls[i]); } - row_data = generate_binary_copy_data(binctx->values, - binctx->nulls, - context->attnums, - binctx->out_functions); + write_binary_copy_data(binctx->values, + binctx->nulls, + context->attnums, + binctx->out_functions, + &row_data); } else { TextCopyContext *textctx = context->data_context; char delim = textctx->delimiter; - row_data = makeStringInfo(); - foreach (lc, context->attnums) { AttrNumber attnum = lfirst_int(lc); @@ -1668,25 +1655,19 @@ remote_copy_send_slot(RemoteCopyContext *context, TupleTableSlot *slot, const Ch value = slot_getattr(slot, attnum, &isnull); if (isnull) - appendStringInfo(row_data, "%s%c", textctx->null_string, delim); + appendStringInfo(&row_data, "%s%c", textctx->null_string, delim); else { int off = AttrNumberGetAttrOffset(attnum); const char *output = OutputFunctionCall(&textctx->out_functions[off], value); - appendStringInfo(row_data, "%s%c", output, delim); + appendStringInfo(&row_data, "%s%c", output, delim); } } } PG_TRY(); { - const List *connections; - - connections = - get_connections_for_chunk(context, cis->chunk_id, cis->chunk_data_nodes, cis->user_id); - Assert(list_length(connections) == list_length(cis->chunk_data_nodes)); - Assert(list_length(connections) > 0); - result = send_copy_data(row_data, connections); + send_row_to_data_nodes(context, cis->chunk_data_nodes, &row_data, true); } PG_CATCH(); { @@ -1696,6 +1677,4 @@ remote_copy_send_slot(RemoteCopyContext *context, TupleTableSlot *slot, const Ch PG_RE_THROW(); } PG_END_TRY(); - - return result; } diff --git a/tsl/src/remote/dist_copy.h b/tsl/src/remote/dist_copy.h index 43d23c2ef..ee23639c0 100644 --- a/tsl/src/remote/dist_copy.h +++ b/tsl/src/remote/dist_copy.h @@ -19,7 +19,7 @@ extern RemoteCopyContext *remote_copy_begin(const CopyStmt *stmt, Hypertable *ht ExprContext *per_tuple_ctx, List *attnums, bool binary_copy); extern void remote_copy_end_on_success(RemoteCopyContext *context); -extern bool remote_copy_send_slot(RemoteCopyContext *context, TupleTableSlot *slot, +extern void remote_copy_send_slot(RemoteCopyContext *context, TupleTableSlot *slot, const ChunkInsertState *cis); extern const char *remote_copy_get_copycmd(RemoteCopyContext *context); diff --git a/tsl/test/expected/dist_copy_format_long.out b/tsl/test/expected/dist_copy_format_long.out index 4fbfbc2d4..e0ce04161 100644 --- a/tsl/test/expected/dist_copy_format_long.out +++ b/tsl/test/expected/dist_copy_format_long.out @@ -31,6 +31,8 @@ SELECT 1 FROM add_data_node('data_node_3', host => 'localhost', GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO PUBLIC; -- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes GRANT CREATE ON SCHEMA public TO :ROLE_1; +-- buffer a lot of rows per message to test buffer expansion +ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD copy_rows_per_message '1000'); SET ROLE :ROLE_1; -- Aim to about 100 partitions, the data is from 1995 to 2022. create table uk_price_paid(price integer, "date" date, postcode1 text, postcode2 text, type smallint, is_new bool, duration smallint, addr1 text, addr2 text, street text, locality text, town text, district text, country text, category smallint); diff --git a/tsl/test/expected/dist_remote_error-14.out b/tsl/test/expected/dist_remote_error-14.out index de42836df..c8ae7cb1e 100644 --- a/tsl/test/expected/dist_remote_error-14.out +++ b/tsl/test/expected/dist_remote_error-14.out @@ -322,20 +322,20 @@ select * from metrics_dist_ss; Output: metrics_dist_ss_1."time", metrics_dist_ss_1.device_id, metrics_dist_ss_1.v0, metrics_dist_ss_1.v1, metrics_dist_ss_1.v2, metrics_dist_ss_1.v3 Data node: db_dist_remote_error_1 Fetcher Type: Prepared statement - Chunks: _dist_hyper_5_96_chunk, _dist_hyper_5_99_chunk, _dist_hyper_5_102_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[54, 55, 56]) + Chunks: _dist_hyper_5_123_chunk, _dist_hyper_5_126_chunk, _dist_hyper_5_129_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[57, 58, 59]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_2 (actual rows=13680 loops=1) Output: metrics_dist_ss_2."time", metrics_dist_ss_2.device_id, metrics_dist_ss_2.v0, metrics_dist_ss_2.v1, metrics_dist_ss_2.v2, metrics_dist_ss_2.v3 Data node: db_dist_remote_error_2 Fetcher Type: Prepared statement - Chunks: _dist_hyper_5_97_chunk, _dist_hyper_5_100_chunk, _dist_hyper_5_103_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[48, 49, 50]) + Chunks: _dist_hyper_5_124_chunk, _dist_hyper_5_127_chunk, _dist_hyper_5_130_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[49, 50, 51]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_3 (actual rows=4560 loops=1) Output: metrics_dist_ss_3."time", metrics_dist_ss_3.device_id, metrics_dist_ss_3.v0, metrics_dist_ss_3.v1, metrics_dist_ss_3.v2, metrics_dist_ss_3.v3 Data node: db_dist_remote_error_3 Fetcher Type: Prepared statement - Chunks: _dist_hyper_5_98_chunk, _dist_hyper_5_101_chunk, _dist_hyper_5_104_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[32, 33, 34]) + Chunks: _dist_hyper_5_125_chunk, _dist_hyper_5_128_chunk, _dist_hyper_5_131_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[38, 39, 40]) (21 rows) set timescaledb.remote_data_fetcher = 'copy'; @@ -350,20 +350,20 @@ select * from metrics_dist_ss; Output: metrics_dist_ss_1."time", metrics_dist_ss_1.device_id, metrics_dist_ss_1.v0, metrics_dist_ss_1.v1, metrics_dist_ss_1.v2, metrics_dist_ss_1.v3 Data node: db_dist_remote_error_1 Fetcher Type: COPY - Chunks: _dist_hyper_5_96_chunk, _dist_hyper_5_99_chunk, _dist_hyper_5_102_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[54, 55, 56]) + Chunks: _dist_hyper_5_123_chunk, _dist_hyper_5_126_chunk, _dist_hyper_5_129_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[57, 58, 59]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_2 (actual rows=13680 loops=1) Output: metrics_dist_ss_2."time", metrics_dist_ss_2.device_id, metrics_dist_ss_2.v0, metrics_dist_ss_2.v1, metrics_dist_ss_2.v2, metrics_dist_ss_2.v3 Data node: db_dist_remote_error_2 Fetcher Type: COPY - Chunks: _dist_hyper_5_97_chunk, _dist_hyper_5_100_chunk, _dist_hyper_5_103_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[48, 49, 50]) + Chunks: _dist_hyper_5_124_chunk, _dist_hyper_5_127_chunk, _dist_hyper_5_130_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[49, 50, 51]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_3 (actual rows=4560 loops=1) Output: metrics_dist_ss_3."time", metrics_dist_ss_3.device_id, metrics_dist_ss_3.v0, metrics_dist_ss_3.v1, metrics_dist_ss_3.v2, metrics_dist_ss_3.v3 Data node: db_dist_remote_error_3 Fetcher Type: COPY - Chunks: _dist_hyper_5_98_chunk, _dist_hyper_5_101_chunk, _dist_hyper_5_104_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[32, 33, 34]) + Chunks: _dist_hyper_5_125_chunk, _dist_hyper_5_128_chunk, _dist_hyper_5_131_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[38, 39, 40]) (21 rows) set timescaledb.remote_data_fetcher = 'cursor'; @@ -378,20 +378,20 @@ select * from metrics_dist_ss; Output: metrics_dist_ss_1."time", metrics_dist_ss_1.device_id, metrics_dist_ss_1.v0, metrics_dist_ss_1.v1, metrics_dist_ss_1.v2, metrics_dist_ss_1.v3 Data node: db_dist_remote_error_1 Fetcher Type: Cursor - Chunks: _dist_hyper_5_96_chunk, _dist_hyper_5_99_chunk, _dist_hyper_5_102_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[54, 55, 56]) + Chunks: _dist_hyper_5_123_chunk, _dist_hyper_5_126_chunk, _dist_hyper_5_129_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[57, 58, 59]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_2 (actual rows=13680 loops=1) Output: metrics_dist_ss_2."time", metrics_dist_ss_2.device_id, metrics_dist_ss_2.v0, metrics_dist_ss_2.v1, metrics_dist_ss_2.v2, metrics_dist_ss_2.v3 Data node: db_dist_remote_error_2 Fetcher Type: Cursor - Chunks: _dist_hyper_5_97_chunk, _dist_hyper_5_100_chunk, _dist_hyper_5_103_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[48, 49, 50]) + Chunks: _dist_hyper_5_124_chunk, _dist_hyper_5_127_chunk, _dist_hyper_5_130_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[49, 50, 51]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_3 (actual rows=4560 loops=1) Output: metrics_dist_ss_3."time", metrics_dist_ss_3.device_id, metrics_dist_ss_3.v0, metrics_dist_ss_3.v1, metrics_dist_ss_3.v2, metrics_dist_ss_3.v3 Data node: db_dist_remote_error_3 Fetcher Type: Cursor - Chunks: _dist_hyper_5_98_chunk, _dist_hyper_5_101_chunk, _dist_hyper_5_104_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[32, 33, 34]) + Chunks: _dist_hyper_5_125_chunk, _dist_hyper_5_128_chunk, _dist_hyper_5_131_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[38, 39, 40]) (21 rows) -- Incorrect int output, to cover the error handling in tuplefactory. diff --git a/tsl/test/expected/dist_remote_error-15.out b/tsl/test/expected/dist_remote_error-15.out index a434714c6..707a57453 100644 --- a/tsl/test/expected/dist_remote_error-15.out +++ b/tsl/test/expected/dist_remote_error-15.out @@ -325,20 +325,20 @@ select * from metrics_dist_ss; Output: metrics_dist_ss_1."time", metrics_dist_ss_1.device_id, metrics_dist_ss_1.v0, metrics_dist_ss_1.v1, metrics_dist_ss_1.v2, metrics_dist_ss_1.v3 Data node: db_dist_remote_error_1 Fetcher Type: Prepared statement - Chunks: _dist_hyper_5_96_chunk, _dist_hyper_5_99_chunk, _dist_hyper_5_102_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[54, 55, 56]) + Chunks: _dist_hyper_5_123_chunk, _dist_hyper_5_126_chunk, _dist_hyper_5_129_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[57, 58, 59]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_2 (actual rows=13680 loops=1) Output: metrics_dist_ss_2."time", metrics_dist_ss_2.device_id, metrics_dist_ss_2.v0, metrics_dist_ss_2.v1, metrics_dist_ss_2.v2, metrics_dist_ss_2.v3 Data node: db_dist_remote_error_2 Fetcher Type: Prepared statement - Chunks: _dist_hyper_5_97_chunk, _dist_hyper_5_100_chunk, _dist_hyper_5_103_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[48, 49, 50]) + Chunks: _dist_hyper_5_124_chunk, _dist_hyper_5_127_chunk, _dist_hyper_5_130_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[49, 50, 51]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_3 (actual rows=4560 loops=1) Output: metrics_dist_ss_3."time", metrics_dist_ss_3.device_id, metrics_dist_ss_3.v0, metrics_dist_ss_3.v1, metrics_dist_ss_3.v2, metrics_dist_ss_3.v3 Data node: db_dist_remote_error_3 Fetcher Type: Prepared statement - Chunks: _dist_hyper_5_98_chunk, _dist_hyper_5_101_chunk, _dist_hyper_5_104_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[32, 33, 34]) + Chunks: _dist_hyper_5_125_chunk, _dist_hyper_5_128_chunk, _dist_hyper_5_131_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[38, 39, 40]) (21 rows) set timescaledb.remote_data_fetcher = 'copy'; @@ -353,20 +353,20 @@ select * from metrics_dist_ss; Output: metrics_dist_ss_1."time", metrics_dist_ss_1.device_id, metrics_dist_ss_1.v0, metrics_dist_ss_1.v1, metrics_dist_ss_1.v2, metrics_dist_ss_1.v3 Data node: db_dist_remote_error_1 Fetcher Type: COPY - Chunks: _dist_hyper_5_96_chunk, _dist_hyper_5_99_chunk, _dist_hyper_5_102_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[54, 55, 56]) + Chunks: _dist_hyper_5_123_chunk, _dist_hyper_5_126_chunk, _dist_hyper_5_129_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[57, 58, 59]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_2 (actual rows=13680 loops=1) Output: metrics_dist_ss_2."time", metrics_dist_ss_2.device_id, metrics_dist_ss_2.v0, metrics_dist_ss_2.v1, metrics_dist_ss_2.v2, metrics_dist_ss_2.v3 Data node: db_dist_remote_error_2 Fetcher Type: COPY - Chunks: _dist_hyper_5_97_chunk, _dist_hyper_5_100_chunk, _dist_hyper_5_103_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[48, 49, 50]) + Chunks: _dist_hyper_5_124_chunk, _dist_hyper_5_127_chunk, _dist_hyper_5_130_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[49, 50, 51]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_3 (actual rows=4560 loops=1) Output: metrics_dist_ss_3."time", metrics_dist_ss_3.device_id, metrics_dist_ss_3.v0, metrics_dist_ss_3.v1, metrics_dist_ss_3.v2, metrics_dist_ss_3.v3 Data node: db_dist_remote_error_3 Fetcher Type: COPY - Chunks: _dist_hyper_5_98_chunk, _dist_hyper_5_101_chunk, _dist_hyper_5_104_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[32, 33, 34]) + Chunks: _dist_hyper_5_125_chunk, _dist_hyper_5_128_chunk, _dist_hyper_5_131_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[38, 39, 40]) (21 rows) set timescaledb.remote_data_fetcher = 'cursor'; @@ -381,20 +381,20 @@ select * from metrics_dist_ss; Output: metrics_dist_ss_1."time", metrics_dist_ss_1.device_id, metrics_dist_ss_1.v0, metrics_dist_ss_1.v1, metrics_dist_ss_1.v2, metrics_dist_ss_1.v3 Data node: db_dist_remote_error_1 Fetcher Type: Cursor - Chunks: _dist_hyper_5_96_chunk, _dist_hyper_5_99_chunk, _dist_hyper_5_102_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[54, 55, 56]) + Chunks: _dist_hyper_5_123_chunk, _dist_hyper_5_126_chunk, _dist_hyper_5_129_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[57, 58, 59]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_2 (actual rows=13680 loops=1) Output: metrics_dist_ss_2."time", metrics_dist_ss_2.device_id, metrics_dist_ss_2.v0, metrics_dist_ss_2.v1, metrics_dist_ss_2.v2, metrics_dist_ss_2.v3 Data node: db_dist_remote_error_2 Fetcher Type: Cursor - Chunks: _dist_hyper_5_97_chunk, _dist_hyper_5_100_chunk, _dist_hyper_5_103_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[48, 49, 50]) + Chunks: _dist_hyper_5_124_chunk, _dist_hyper_5_127_chunk, _dist_hyper_5_130_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[49, 50, 51]) -> Custom Scan (DataNodeScan) on public.metrics_dist_ss metrics_dist_ss_3 (actual rows=4560 loops=1) Output: metrics_dist_ss_3."time", metrics_dist_ss_3.device_id, metrics_dist_ss_3.v0, metrics_dist_ss_3.v1, metrics_dist_ss_3.v2, metrics_dist_ss_3.v3 Data node: db_dist_remote_error_3 Fetcher Type: Cursor - Chunks: _dist_hyper_5_98_chunk, _dist_hyper_5_101_chunk, _dist_hyper_5_104_chunk - Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[32, 33, 34]) + Chunks: _dist_hyper_5_125_chunk, _dist_hyper_5_128_chunk, _dist_hyper_5_131_chunk + Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist_ss WHERE _timescaledb_internal.chunks_in(public.metrics_dist_ss.*, ARRAY[38, 39, 40]) (21 rows) -- Incorrect int output, to cover the error handling in tuplefactory. diff --git a/tsl/test/expected/remote_copy-12.out b/tsl/test/expected/remote_copy-12.out index d1b66b997..a404a7c3a 100644 --- a/tsl/test/expected/remote_copy-12.out +++ b/tsl/test/expected/remote_copy-12.out @@ -142,14 +142,14 @@ SELECT * FROM _timescaledb_catalog.chunk ORDER BY 1; 6 | 1 | _timescaledb_internal | _dist_hyper_1_6_chunk | | f | 0 | f 7 | 1 | _timescaledb_internal | _dist_hyper_1_7_chunk | | f | 0 | f 8 | 1 | _timescaledb_internal | _dist_hyper_1_8_chunk | | f | 0 | f - 9 | 1 | _timescaledb_internal | _dist_hyper_1_9_chunk | | f | 0 | f - 10 | 1 | _timescaledb_internal | _dist_hyper_1_10_chunk | | f | 0 | f - 11 | 1 | _timescaledb_internal | _dist_hyper_1_11_chunk | | f | 0 | f 12 | 1 | _timescaledb_internal | _dist_hyper_1_12_chunk | | f | 0 | f 13 | 1 | _timescaledb_internal | _dist_hyper_1_13_chunk | | f | 0 | f 14 | 1 | _timescaledb_internal | _dist_hyper_1_14_chunk | | f | 0 | f 15 | 1 | _timescaledb_internal | _dist_hyper_1_15_chunk | | f | 0 | f 16 | 1 | _timescaledb_internal | _dist_hyper_1_16_chunk | | f | 0 | f + 17 | 1 | _timescaledb_internal | _dist_hyper_1_17_chunk | | f | 0 | f + 18 | 1 | _timescaledb_internal | _dist_hyper_1_18_chunk | | f | 0 | f + 19 | 1 | _timescaledb_internal | _dist_hyper_1_19_chunk | | f | 0 | f (16 rows) SELECT * FROM _timescaledb_catalog.chunk_data_node ORDER BY 1, 3; @@ -171,22 +171,22 @@ SELECT * FROM _timescaledb_catalog.chunk_data_node ORDER BY 1, 3; 7 | 5 | db_remote_copy_2 8 | 8 | db_remote_copy_1 8 | 6 | db_remote_copy_2 - 9 | 9 | db_remote_copy_1 - 9 | 7 | db_remote_copy_2 - 10 | 10 | db_remote_copy_1 - 10 | 8 | db_remote_copy_2 - 11 | 11 | db_remote_copy_1 - 11 | 9 | db_remote_copy_2 - 12 | 12 | db_remote_copy_1 - 12 | 10 | db_remote_copy_2 - 13 | 13 | db_remote_copy_1 - 13 | 11 | db_remote_copy_2 - 14 | 14 | db_remote_copy_1 - 14 | 12 | db_remote_copy_2 - 15 | 15 | db_remote_copy_1 - 15 | 13 | db_remote_copy_2 - 16 | 16 | db_remote_copy_1 - 16 | 3 | db_remote_copy_3 + 12 | 11 | db_remote_copy_1 + 12 | 8 | db_remote_copy_2 + 13 | 12 | db_remote_copy_1 + 13 | 9 | db_remote_copy_2 + 14 | 13 | db_remote_copy_1 + 14 | 10 | db_remote_copy_2 + 15 | 14 | db_remote_copy_1 + 15 | 11 | db_remote_copy_2 + 16 | 15 | db_remote_copy_1 + 16 | 12 | db_remote_copy_2 + 17 | 16 | db_remote_copy_1 + 17 | 13 | db_remote_copy_2 + 18 | 17 | db_remote_copy_1 + 18 | 14 | db_remote_copy_2 + 19 | 18 | db_remote_copy_1 + 19 | 6 | db_remote_copy_3 (32 rows) SELECT * FROM _timescaledb_catalog.hypertable_data_node ORDER BY 3; @@ -208,14 +208,14 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (16 rows) \c :DATA_NODE_1 @@ -253,14 +253,14 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (16 rows) \c :DATA_NODE_2 @@ -293,13 +293,13 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk + _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk (13 rows) \c :DATA_NODE_3 @@ -316,7 +316,7 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; ---------------------------------------------- _timescaledb_internal._dist_hyper_1_1_chunk _timescaledb_internal._dist_hyper_1_2_chunk - _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (3 rows) \c :TEST_DBNAME :ROLE_SUPERUSER; diff --git a/tsl/test/expected/remote_copy-13.out b/tsl/test/expected/remote_copy-13.out index ca8782ba4..99c08ec2f 100644 --- a/tsl/test/expected/remote_copy-13.out +++ b/tsl/test/expected/remote_copy-13.out @@ -142,14 +142,14 @@ SELECT * FROM _timescaledb_catalog.chunk ORDER BY 1; 6 | 1 | _timescaledb_internal | _dist_hyper_1_6_chunk | | f | 0 | f 7 | 1 | _timescaledb_internal | _dist_hyper_1_7_chunk | | f | 0 | f 8 | 1 | _timescaledb_internal | _dist_hyper_1_8_chunk | | f | 0 | f - 9 | 1 | _timescaledb_internal | _dist_hyper_1_9_chunk | | f | 0 | f - 10 | 1 | _timescaledb_internal | _dist_hyper_1_10_chunk | | f | 0 | f - 11 | 1 | _timescaledb_internal | _dist_hyper_1_11_chunk | | f | 0 | f 12 | 1 | _timescaledb_internal | _dist_hyper_1_12_chunk | | f | 0 | f 13 | 1 | _timescaledb_internal | _dist_hyper_1_13_chunk | | f | 0 | f 14 | 1 | _timescaledb_internal | _dist_hyper_1_14_chunk | | f | 0 | f 15 | 1 | _timescaledb_internal | _dist_hyper_1_15_chunk | | f | 0 | f 16 | 1 | _timescaledb_internal | _dist_hyper_1_16_chunk | | f | 0 | f + 17 | 1 | _timescaledb_internal | _dist_hyper_1_17_chunk | | f | 0 | f + 18 | 1 | _timescaledb_internal | _dist_hyper_1_18_chunk | | f | 0 | f + 19 | 1 | _timescaledb_internal | _dist_hyper_1_19_chunk | | f | 0 | f (16 rows) SELECT * FROM _timescaledb_catalog.chunk_data_node ORDER BY 1, 3; @@ -171,22 +171,22 @@ SELECT * FROM _timescaledb_catalog.chunk_data_node ORDER BY 1, 3; 7 | 5 | db_remote_copy_2 8 | 8 | db_remote_copy_1 8 | 6 | db_remote_copy_2 - 9 | 9 | db_remote_copy_1 - 9 | 7 | db_remote_copy_2 - 10 | 10 | db_remote_copy_1 - 10 | 8 | db_remote_copy_2 - 11 | 11 | db_remote_copy_1 - 11 | 9 | db_remote_copy_2 - 12 | 12 | db_remote_copy_1 - 12 | 10 | db_remote_copy_2 - 13 | 13 | db_remote_copy_1 - 13 | 11 | db_remote_copy_2 - 14 | 14 | db_remote_copy_1 - 14 | 12 | db_remote_copy_2 - 15 | 15 | db_remote_copy_1 - 15 | 13 | db_remote_copy_2 - 16 | 16 | db_remote_copy_1 - 16 | 3 | db_remote_copy_3 + 12 | 11 | db_remote_copy_1 + 12 | 8 | db_remote_copy_2 + 13 | 12 | db_remote_copy_1 + 13 | 9 | db_remote_copy_2 + 14 | 13 | db_remote_copy_1 + 14 | 10 | db_remote_copy_2 + 15 | 14 | db_remote_copy_1 + 15 | 11 | db_remote_copy_2 + 16 | 15 | db_remote_copy_1 + 16 | 12 | db_remote_copy_2 + 17 | 16 | db_remote_copy_1 + 17 | 13 | db_remote_copy_2 + 18 | 17 | db_remote_copy_1 + 18 | 14 | db_remote_copy_2 + 19 | 18 | db_remote_copy_1 + 19 | 6 | db_remote_copy_3 (32 rows) SELECT * FROM _timescaledb_catalog.hypertable_data_node ORDER BY 3; @@ -208,14 +208,14 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (16 rows) \c :DATA_NODE_1 @@ -253,14 +253,14 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (16 rows) \c :DATA_NODE_2 @@ -293,13 +293,13 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk + _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk (13 rows) \c :DATA_NODE_3 @@ -316,7 +316,7 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; ---------------------------------------------- _timescaledb_internal._dist_hyper_1_1_chunk _timescaledb_internal._dist_hyper_1_2_chunk - _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (3 rows) \c :TEST_DBNAME :ROLE_SUPERUSER; diff --git a/tsl/test/expected/remote_copy-14.out b/tsl/test/expected/remote_copy-14.out index ca8782ba4..99c08ec2f 100644 --- a/tsl/test/expected/remote_copy-14.out +++ b/tsl/test/expected/remote_copy-14.out @@ -142,14 +142,14 @@ SELECT * FROM _timescaledb_catalog.chunk ORDER BY 1; 6 | 1 | _timescaledb_internal | _dist_hyper_1_6_chunk | | f | 0 | f 7 | 1 | _timescaledb_internal | _dist_hyper_1_7_chunk | | f | 0 | f 8 | 1 | _timescaledb_internal | _dist_hyper_1_8_chunk | | f | 0 | f - 9 | 1 | _timescaledb_internal | _dist_hyper_1_9_chunk | | f | 0 | f - 10 | 1 | _timescaledb_internal | _dist_hyper_1_10_chunk | | f | 0 | f - 11 | 1 | _timescaledb_internal | _dist_hyper_1_11_chunk | | f | 0 | f 12 | 1 | _timescaledb_internal | _dist_hyper_1_12_chunk | | f | 0 | f 13 | 1 | _timescaledb_internal | _dist_hyper_1_13_chunk | | f | 0 | f 14 | 1 | _timescaledb_internal | _dist_hyper_1_14_chunk | | f | 0 | f 15 | 1 | _timescaledb_internal | _dist_hyper_1_15_chunk | | f | 0 | f 16 | 1 | _timescaledb_internal | _dist_hyper_1_16_chunk | | f | 0 | f + 17 | 1 | _timescaledb_internal | _dist_hyper_1_17_chunk | | f | 0 | f + 18 | 1 | _timescaledb_internal | _dist_hyper_1_18_chunk | | f | 0 | f + 19 | 1 | _timescaledb_internal | _dist_hyper_1_19_chunk | | f | 0 | f (16 rows) SELECT * FROM _timescaledb_catalog.chunk_data_node ORDER BY 1, 3; @@ -171,22 +171,22 @@ SELECT * FROM _timescaledb_catalog.chunk_data_node ORDER BY 1, 3; 7 | 5 | db_remote_copy_2 8 | 8 | db_remote_copy_1 8 | 6 | db_remote_copy_2 - 9 | 9 | db_remote_copy_1 - 9 | 7 | db_remote_copy_2 - 10 | 10 | db_remote_copy_1 - 10 | 8 | db_remote_copy_2 - 11 | 11 | db_remote_copy_1 - 11 | 9 | db_remote_copy_2 - 12 | 12 | db_remote_copy_1 - 12 | 10 | db_remote_copy_2 - 13 | 13 | db_remote_copy_1 - 13 | 11 | db_remote_copy_2 - 14 | 14 | db_remote_copy_1 - 14 | 12 | db_remote_copy_2 - 15 | 15 | db_remote_copy_1 - 15 | 13 | db_remote_copy_2 - 16 | 16 | db_remote_copy_1 - 16 | 3 | db_remote_copy_3 + 12 | 11 | db_remote_copy_1 + 12 | 8 | db_remote_copy_2 + 13 | 12 | db_remote_copy_1 + 13 | 9 | db_remote_copy_2 + 14 | 13 | db_remote_copy_1 + 14 | 10 | db_remote_copy_2 + 15 | 14 | db_remote_copy_1 + 15 | 11 | db_remote_copy_2 + 16 | 15 | db_remote_copy_1 + 16 | 12 | db_remote_copy_2 + 17 | 16 | db_remote_copy_1 + 17 | 13 | db_remote_copy_2 + 18 | 17 | db_remote_copy_1 + 18 | 14 | db_remote_copy_2 + 19 | 18 | db_remote_copy_1 + 19 | 6 | db_remote_copy_3 (32 rows) SELECT * FROM _timescaledb_catalog.hypertable_data_node ORDER BY 3; @@ -208,14 +208,14 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (16 rows) \c :DATA_NODE_1 @@ -253,14 +253,14 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (16 rows) \c :DATA_NODE_2 @@ -293,13 +293,13 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk + _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk (13 rows) \c :DATA_NODE_3 @@ -316,7 +316,7 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; ---------------------------------------------- _timescaledb_internal._dist_hyper_1_1_chunk _timescaledb_internal._dist_hyper_1_2_chunk - _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (3 rows) \c :TEST_DBNAME :ROLE_SUPERUSER; diff --git a/tsl/test/expected/remote_copy-15.out b/tsl/test/expected/remote_copy-15.out index ca8782ba4..99c08ec2f 100644 --- a/tsl/test/expected/remote_copy-15.out +++ b/tsl/test/expected/remote_copy-15.out @@ -142,14 +142,14 @@ SELECT * FROM _timescaledb_catalog.chunk ORDER BY 1; 6 | 1 | _timescaledb_internal | _dist_hyper_1_6_chunk | | f | 0 | f 7 | 1 | _timescaledb_internal | _dist_hyper_1_7_chunk | | f | 0 | f 8 | 1 | _timescaledb_internal | _dist_hyper_1_8_chunk | | f | 0 | f - 9 | 1 | _timescaledb_internal | _dist_hyper_1_9_chunk | | f | 0 | f - 10 | 1 | _timescaledb_internal | _dist_hyper_1_10_chunk | | f | 0 | f - 11 | 1 | _timescaledb_internal | _dist_hyper_1_11_chunk | | f | 0 | f 12 | 1 | _timescaledb_internal | _dist_hyper_1_12_chunk | | f | 0 | f 13 | 1 | _timescaledb_internal | _dist_hyper_1_13_chunk | | f | 0 | f 14 | 1 | _timescaledb_internal | _dist_hyper_1_14_chunk | | f | 0 | f 15 | 1 | _timescaledb_internal | _dist_hyper_1_15_chunk | | f | 0 | f 16 | 1 | _timescaledb_internal | _dist_hyper_1_16_chunk | | f | 0 | f + 17 | 1 | _timescaledb_internal | _dist_hyper_1_17_chunk | | f | 0 | f + 18 | 1 | _timescaledb_internal | _dist_hyper_1_18_chunk | | f | 0 | f + 19 | 1 | _timescaledb_internal | _dist_hyper_1_19_chunk | | f | 0 | f (16 rows) SELECT * FROM _timescaledb_catalog.chunk_data_node ORDER BY 1, 3; @@ -171,22 +171,22 @@ SELECT * FROM _timescaledb_catalog.chunk_data_node ORDER BY 1, 3; 7 | 5 | db_remote_copy_2 8 | 8 | db_remote_copy_1 8 | 6 | db_remote_copy_2 - 9 | 9 | db_remote_copy_1 - 9 | 7 | db_remote_copy_2 - 10 | 10 | db_remote_copy_1 - 10 | 8 | db_remote_copy_2 - 11 | 11 | db_remote_copy_1 - 11 | 9 | db_remote_copy_2 - 12 | 12 | db_remote_copy_1 - 12 | 10 | db_remote_copy_2 - 13 | 13 | db_remote_copy_1 - 13 | 11 | db_remote_copy_2 - 14 | 14 | db_remote_copy_1 - 14 | 12 | db_remote_copy_2 - 15 | 15 | db_remote_copy_1 - 15 | 13 | db_remote_copy_2 - 16 | 16 | db_remote_copy_1 - 16 | 3 | db_remote_copy_3 + 12 | 11 | db_remote_copy_1 + 12 | 8 | db_remote_copy_2 + 13 | 12 | db_remote_copy_1 + 13 | 9 | db_remote_copy_2 + 14 | 13 | db_remote_copy_1 + 14 | 10 | db_remote_copy_2 + 15 | 14 | db_remote_copy_1 + 15 | 11 | db_remote_copy_2 + 16 | 15 | db_remote_copy_1 + 16 | 12 | db_remote_copy_2 + 17 | 16 | db_remote_copy_1 + 17 | 13 | db_remote_copy_2 + 18 | 17 | db_remote_copy_1 + 18 | 14 | db_remote_copy_2 + 19 | 18 | db_remote_copy_1 + 19 | 6 | db_remote_copy_3 (32 rows) SELECT * FROM _timescaledb_catalog.hypertable_data_node ORDER BY 3; @@ -208,14 +208,14 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (16 rows) \c :DATA_NODE_1 @@ -253,14 +253,14 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (16 rows) \c :DATA_NODE_2 @@ -293,13 +293,13 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; _timescaledb_internal._dist_hyper_1_6_chunk _timescaledb_internal._dist_hyper_1_7_chunk _timescaledb_internal._dist_hyper_1_8_chunk - _timescaledb_internal._dist_hyper_1_9_chunk - _timescaledb_internal._dist_hyper_1_10_chunk - _timescaledb_internal._dist_hyper_1_11_chunk _timescaledb_internal._dist_hyper_1_12_chunk _timescaledb_internal._dist_hyper_1_13_chunk _timescaledb_internal._dist_hyper_1_14_chunk _timescaledb_internal._dist_hyper_1_15_chunk + _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_17_chunk + _timescaledb_internal._dist_hyper_1_18_chunk (13 rows) \c :DATA_NODE_3 @@ -316,7 +316,7 @@ select * from show_chunks('"+ri(k33_'')"') ORDER BY 1; ---------------------------------------------- _timescaledb_internal._dist_hyper_1_1_chunk _timescaledb_internal._dist_hyper_1_2_chunk - _timescaledb_internal._dist_hyper_1_16_chunk + _timescaledb_internal._dist_hyper_1_19_chunk (3 rows) \c :TEST_DBNAME :ROLE_SUPERUSER; diff --git a/tsl/test/sql/dist_copy_format_long.sql b/tsl/test/sql/dist_copy_format_long.sql index 1a5d232b8..16fb8aa45 100644 --- a/tsl/test/sql/dist_copy_format_long.sql +++ b/tsl/test/sql/dist_copy_format_long.sql @@ -20,9 +20,12 @@ SELECT 1 FROM add_data_node('data_node_3', host => 'localhost', GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO PUBLIC; -- though user on access node has required GRANTS, this will propagate GRANTS to the connected data nodes GRANT CREATE ON SCHEMA public TO :ROLE_1; +-- buffer a lot of rows per message to test buffer expansion +ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD copy_rows_per_message '1000'); SET ROLE :ROLE_1; + -- Aim to about 100 partitions, the data is from 1995 to 2022. create table uk_price_paid(price integer, "date" date, postcode1 text, postcode2 text, type smallint, is_new bool, duration smallint, addr1 text, addr2 text, street text, locality text, town text, district text, country text, category smallint); select create_distributed_hypertable('uk_price_paid', 'date', 'postcode2', diff --git a/tsl/test/sql/remote_copy.sql.in b/tsl/test/sql/remote_copy.sql.in index 6b1f666eb..f5e81392f 100644 --- a/tsl/test/sql/remote_copy.sql.in +++ b/tsl/test/sql/remote_copy.sql.in @@ -16,7 +16,6 @@ FROM ( ) a; GRANT USAGE ON FOREIGN SERVER :DATA_NODE_1, :DATA_NODE_2, :DATA_NODE_3 TO PUBLIC; GRANT CREATE ON SCHEMA public TO :ROLE_1; - SET timescaledb.hide_data_node_name_in_errors = 'on'; -- Start out testing text copy code