Improve memory handling for remote COPY

This change improves memory usage in the `COPY` code used for
distributed hypertables. The following issues have been addressed:

* `PGresult` objects were not cleared, leading to memory leaks.
* The caching of chunk connections didn't work since the lookup
  compared ephemeral chunk pointers instead of chunk IDs. The effect
  was that cached chunk connection state was reallocated every time
  instead of being reused. This likely also caused worse performance.

To address these issues, the following changes are made:

* All `PGresult` objects are now cleared with `PQclear`.
* Lookup for chunk connections now compares chunk IDs instead of chunk
  pointers.
* The per-tuple memory context is moved the to the outer processing
  loop to ensure that everything in the loop is allocated on the
  per-tuple memory context, which is also reset at every iteration of
  the loop.
* The use of memory contexts is also simplified to have only one
  memory context for state that should survive across resets of the
  per-tuple memory context.

Fixes #2677
This commit is contained in:
Erik Nordström 2020-12-01 14:28:34 +01:00 committed by Erik Nordström
parent 7c76fd4d09
commit 2ecb53e7bb
6 changed files with 182 additions and 115 deletions

View File

@ -441,8 +441,6 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, void (*call
/* Close any trigger target relations */ /* Close any trigger target relations */
ExecCleanUpTriggerState(estate); ExecCleanUpTriggerState(estate);
copy_chunk_state_destroy(ccstate);
/* /*
* If we skipped writing WAL, then we need to sync the heap (but not * If we skipped writing WAL, then we need to sync the heap (but not
* indexes since those use WAL anyway) * indexes since those use WAL anyway)
@ -663,10 +661,11 @@ timescaledb_DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *proces
ccstate->where_clause = where_clause; ccstate->where_clause = where_clause;
if (hypertable_is_distributed(ht)) if (hypertable_is_distributed(ht))
ts_cm_functions->distributed_copy(stmt, processed, ccstate, attnums); *processed = ts_cm_functions->distributed_copy(stmt, ccstate, attnums);
else else
*processed = copyfrom(ccstate, pstate->p_rtable, ht, CopyFromErrorCallback, cstate); *processed = copyfrom(ccstate, pstate->p_rtable, ht, CopyFromErrorCallback, cstate);
copy_chunk_state_destroy(ccstate);
EndCopyFrom(cstate); EndCopyFrom(cstate);
free_parsestate(pstate); free_parsestate(pstate);
table_close(rel, NoLock); table_close(rel, NoLock);
@ -732,6 +731,7 @@ timescaledb_move_from_table_to_chunks(Hypertable *ht, LOCKMODE lockmode)
scandesc = table_beginscan(rel, snapshot, 0, NULL); scandesc = table_beginscan(rel, snapshot, 0, NULL);
ccstate = copy_chunk_state_create(ht, rel, next_copy_from_table_to_chunks, NULL, scandesc); ccstate = copy_chunk_state_create(ht, rel, next_copy_from_table_to_chunks, NULL, scandesc);
copyfrom(ccstate, pstate->p_rtable, ht, copy_table_to_chunk_error_callback, scandesc); copyfrom(ccstate, pstate->p_rtable, ht, copy_table_to_chunk_error_callback, scandesc);
copy_chunk_state_destroy(ccstate);
heap_endscan(scandesc); heap_endscan(scandesc);
UnregisterSnapshot(snapshot); UnregisterSnapshot(snapshot);
table_close(rel, lockmode); table_close(rel, lockmode);

View File

@ -236,11 +236,12 @@ data_node_dispatch_path_create_default(PlannerInfo *root, ModifyTablePath *mtpat
return NULL; return NULL;
} }
static void static uint64
distributed_copy_default(const CopyStmt *stmt, uint64 *processed, CopyChunkState *ccstate, distributed_copy_default(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums)
List *attnums)
{ {
error_no_default_fn_community(); error_no_default_fn_community();
return 0;
} }
static bool static bool

View File

@ -139,8 +139,7 @@ typedef struct CrossModuleFunctions
void (*create_chunk_on_data_nodes)(Chunk *chunk, Hypertable *ht); void (*create_chunk_on_data_nodes)(Chunk *chunk, Hypertable *ht);
Path *(*data_node_dispatch_path_create)(PlannerInfo *root, ModifyTablePath *mtpath, Path *(*data_node_dispatch_path_create)(PlannerInfo *root, ModifyTablePath *mtpath,
Index hypertable_rti, int subpath_index); Index hypertable_rti, int subpath_index);
void (*distributed_copy)(const CopyStmt *stmt, uint64 *processed, CopyChunkState *ccstate, uint64 (*distributed_copy)(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums);
List *attnums);
bool (*set_distributed_id)(Datum id); bool (*set_distributed_id)(Datum id);
void (*set_distributed_peer_id)(Datum id); void (*set_distributed_peer_id)(Datum id);
bool (*is_frontend_session)(void); bool (*is_frontend_session)(void);

View File

@ -38,7 +38,7 @@
*/ */
typedef struct ChunkConnectionList typedef struct ChunkConnectionList
{ {
Chunk *chunk; int32 chunk_id;
List *connections; List *connections;
} ChunkConnectionList; } ChunkConnectionList;
@ -63,7 +63,6 @@ typedef struct CopyConnectionState
List *cached_connections; List *cached_connections;
List *connections_in_use; List *connections_in_use;
bool using_binary; bool using_binary;
MemoryContext mctx;
const char *outgoing_copy_cmd; const char *outgoing_copy_cmd;
} CopyConnectionState; } CopyConnectionState;
@ -71,12 +70,10 @@ typedef struct CopyConnectionState
*/ */
typedef struct TextCopyContext typedef struct TextCopyContext
{ {
MemoryContext orig_context;
int ndimensions; int ndimensions;
CopyDimensionInfo *dimensions; CopyDimensionInfo *dimensions;
char delimiter; char delimiter;
char *null_string; char *null_string;
MemoryContext tuple_context;
char **fields; char **fields;
int nfields; int nfields;
} TextCopyContext; } TextCopyContext;
@ -85,9 +82,8 @@ typedef struct TextCopyContext
*/ */
typedef struct BinaryCopyContext typedef struct BinaryCopyContext
{ {
MemoryContext orig_context; ExprContext *econtext;
FmgrInfo *out_functions; FmgrInfo *out_functions;
EState *estate;
Datum *values; Datum *values;
bool *nulls; bool *nulls;
} BinaryCopyContext; } BinaryCopyContext;
@ -98,8 +94,10 @@ typedef struct RemoteCopyContext
{ {
/* Operation data */ /* Operation data */
CopyConnectionState connection_state; CopyConnectionState connection_state;
CopyChunkState *ccstate;
void *data_context; /* TextCopyContext or BinaryCopyContext */ void *data_context; /* TextCopyContext or BinaryCopyContext */
bool binary_operation; bool binary_operation;
MemoryContext mctx; /* MemoryContext that holds the RemoteCopyContext */
/* Data for the current read row */ /* Data for the current read row */
StringInfo row_data; StringInfo row_data;
@ -112,7 +110,7 @@ typedef struct RemoteCopyContext
static CopyDimensionInfo * static CopyDimensionInfo *
generate_copy_dimensions(Dimension *dims, int ndimensions, List *attnums, Hypertable *ht) generate_copy_dimensions(Dimension *dims, int ndimensions, List *attnums, Hypertable *ht)
{ {
CopyDimensionInfo *result = palloc(ndimensions * sizeof(CopyDimensionInfo)); CopyDimensionInfo *result = palloc0(ndimensions * sizeof(CopyDimensionInfo));
int idx; int idx;
for (idx = 0; idx < ndimensions; ++idx) for (idx = 0; idx < ndimensions; ++idx)
@ -254,6 +252,7 @@ static void
start_remote_copy_on_new_connection(CopyConnectionState *state, TSConnection *connection) start_remote_copy_on_new_connection(CopyConnectionState *state, TSConnection *connection)
{ {
PGconn *pg_conn = remote_connection_get_pg_conn(connection); PGconn *pg_conn = remote_connection_get_pg_conn(connection);
if (PQisnonblocking(pg_conn)) if (PQisnonblocking(pg_conn))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -261,12 +260,28 @@ start_remote_copy_on_new_connection(CopyConnectionState *state, TSConnection *co
if (!list_member_ptr(state->connections_in_use, connection)) if (!list_member_ptr(state->connections_in_use, connection))
{ {
PGresult *res = PQexec(pg_conn, state->outgoing_copy_cmd); PGresult *volatile res = NULL;
if (PQresultStatus(res) != PGRES_COPY_IN) PG_TRY();
ereport(ERROR, {
(errcode(ERRCODE_CONNECTION_FAILURE), res = PQexec(pg_conn, state->outgoing_copy_cmd);
errmsg("unable to start remote COPY on data node (%d)", PQresultStatus(res))));
if (PQresultStatus(res) != PGRES_COPY_IN)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unable to start remote COPY on data node"),
errdetail("Remote command error: %s", PQresultErrorMessage(res))));
PQclear(res);
}
PG_CATCH();
{
if (NULL != res)
PQclear(res);
PG_RE_THROW();
}
PG_END_TRY();
if (state->using_binary) if (state->using_binary)
send_binary_copy_header(pg_conn); send_binary_copy_header(pg_conn);
@ -276,15 +291,15 @@ start_remote_copy_on_new_connection(CopyConnectionState *state, TSConnection *co
} }
static ChunkConnectionList * static ChunkConnectionList *
create_connection_list_for_chunk(CopyConnectionState *state, Chunk *chunk) create_connection_list_for_chunk(CopyConnectionState *state, const Chunk *chunk)
{ {
ChunkConnectionList *chunk_connections; ChunkConnectionList *chunk_connections;
ListCell *lc; ListCell *lc;
MemoryContext oldcontext = MemoryContextSwitchTo(state->mctx); chunk_connections = palloc0(sizeof(ChunkConnectionList));
chunk_connections = palloc(sizeof(ChunkConnectionList)); chunk_connections->chunk_id = chunk->fd.id;
chunk_connections->chunk = ts_chunk_copy(chunk);
chunk_connections->connections = NIL; chunk_connections->connections = NIL;
foreach (lc, chunk->data_nodes) foreach (lc, chunk->data_nodes)
{ {
ChunkDataNode *cdn = lfirst(lc); ChunkDataNode *cdn = lfirst(lc);
@ -295,7 +310,7 @@ create_connection_list_for_chunk(CopyConnectionState *state, Chunk *chunk)
chunk_connections->connections = lappend(chunk_connections->connections, connection); chunk_connections->connections = lappend(chunk_connections->connections, connection);
} }
state->cached_connections = lappend(state->cached_connections, chunk_connections); state->cached_connections = lappend(state->cached_connections, chunk_connections);
MemoryContextSwitchTo(oldcontext);
return chunk_connections; return chunk_connections;
} }
@ -306,54 +321,122 @@ send_end_binary_copy_data(PGconn *connection)
return PQputCopyData(connection, (char *) &buf, sizeof(buf)); return PQputCopyData(connection, (char *) &buf, sizeof(buf));
} }
static char *
get_error_field_copy(PGresult *res, int fieldcode)
{
const char *msg = PQresultErrorField(res, fieldcode);
if (NULL == msg)
return NULL;
return pchomp(msg);
}
static void
clear_results(const List *results, bool report_error)
{
ListCell *lc;
const char *primary = NULL;
const char *detail = NULL;
const char *hint = NULL;
if (NIL == results)
return;
foreach (lc, results)
{
PGresult *res = lfirst(lc);
if (PQresultStatus(res) != PGRES_COMMAND_OK && NULL == primary && report_error)
{
primary = get_error_field_copy(res, PG_DIAG_MESSAGE_PRIMARY);
detail = get_error_field_copy(res, PG_DIAG_MESSAGE_DETAIL);
hint = get_error_field_copy(res, PG_DIAG_MESSAGE_HINT);
}
PQclear(res);
}
if (NULL != primary)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("%s", primary),
detail == NULL ? 0 : errdetail("%s", detail),
hint == NULL ? 0 : errhint("%s", hint)));
}
static void static void
finish_outstanding_copies(CopyConnectionState *state) finish_outstanding_copies(CopyConnectionState *state)
{ {
ListCell *lc; ListCell *lc;
List *results = NIL; List *volatile results = NIL;
PGresult *volatile res = NULL;
foreach (lc, state->connections_in_use) PG_TRY();
{ {
TSConnection *conn = lfirst(lc); foreach (lc, state->connections_in_use)
PGconn *pg_conn = remote_connection_get_pg_conn(conn); {
PGresult PG_USED_FOR_ASSERTS_ONLY *res; TSConnection *conn = lfirst(lc);
PGconn *pg_conn = remote_connection_get_pg_conn(conn);
if (state->using_binary) if (state->using_binary)
if (send_end_binary_copy_data(pg_conn) != 1) if (send_end_binary_copy_data(pg_conn) != 1)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("%s", PQerrorMessage(pg_conn))));
if (PQputCopyEnd(pg_conn, NULL) == -1)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION), (errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("%s", PQerrorMessage(pg_conn)))); errmsg("%s", PQerrorMessage(pg_conn))));
if (PQputCopyEnd(pg_conn, NULL) == -1) res = PQgetResult(pg_conn);
ereport(ERROR, Assert(res != NULL);
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQerrorMessage(pg_conn)))); results = lappend(results, res);
results = lappend(results, PQgetResult(pg_conn)); /* Need to get result a second time to move the connection out of copy mode */
/* Need to get result a second time to move the connection out of copy mode */ res = PQgetResult(pg_conn);
res = PQgetResult(pg_conn);
if (res != NULL) if (res != NULL)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR), (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("COPY command resulted in unexpected state"))); errmsg("COPY command resulted in unexpected state")));
}
} }
PG_CATCH();
{
if (res != NULL)
PQclear(res);
foreach (lc, results) clear_results(results, false);
if (PQresultStatus(lfirst(lc)) != PGRES_COMMAND_OK) PG_RE_THROW();
ereport(ERROR, }
(errcode(ERRCODE_INTERNAL_ERROR), PG_END_TRY();
errmsg("error during copy completion: %s", PQresultErrorMessage(lfirst(lc)))));
Assert(res == NULL);
clear_results(results, true);
} }
static List * static List *
get_connections_for_chunk(CopyConnectionState *state, Chunk *chunk) get_connections_for_chunk(RemoteCopyContext *context, const Chunk *chunk)
{ {
CopyConnectionState *state = &context->connection_state;
MemoryContext oldmctx;
ListCell *lc; ListCell *lc;
List *conns;
foreach (lc, state->cached_connections) foreach (lc, state->cached_connections)
if (((ChunkConnectionList *) lfirst(lc))->chunk == chunk) {
return ((ChunkConnectionList *) lfirst(lc))->connections; ChunkConnectionList *chunkconns = lfirst(lc);
return create_connection_list_for_chunk(state, chunk)->connections; if (chunkconns->chunk_id == chunk->fd.id)
return chunkconns->connections;
}
oldmctx = MemoryContextSwitchTo(context->mctx);
conns = create_connection_list_for_chunk(state, chunk)->connections;
MemoryContextSwitchTo(oldmctx);
return conns;
} }
static bool static bool
@ -566,16 +649,12 @@ validate_options(List *copy_options, char *delimiter, char **null_string)
static TextCopyContext * static TextCopyContext *
generate_text_copy_context(const CopyStmt *stmt, Hypertable *ht, List *attnums) generate_text_copy_context(const CopyStmt *stmt, Hypertable *ht, List *attnums)
{ {
TextCopyContext *ctx = palloc(sizeof(TextCopyContext)); TextCopyContext *ctx = palloc0(sizeof(TextCopyContext));
ctx->ndimensions = ht->space->num_dimensions; ctx->ndimensions = ht->space->num_dimensions;
validate_options(stmt->options, &ctx->delimiter, &ctx->null_string); validate_options(stmt->options, &ctx->delimiter, &ctx->null_string);
ctx->dimensions = ctx->dimensions =
generate_copy_dimensions(ht->space->dimensions, ctx->ndimensions, attnums, ht); generate_copy_dimensions(ht->space->dimensions, ctx->ndimensions, attnums, ht);
ctx->tuple_context =
AllocSetContextCreate(CurrentMemoryContext, "COPY", ALLOCSET_DEFAULT_SIZES);
ctx->orig_context = MemoryContextSwitchTo(ctx->tuple_context);
return ctx; return ctx;
} }
@ -590,7 +669,7 @@ get_copy_conversion_functions(Hypertable *ht, List *copy_attnums, FmgrInfo **fun
Relation rel = relation_open(ht->main_table_relid, AccessShareLock); Relation rel = relation_open(ht->main_table_relid, AccessShareLock);
TupleDesc tupDesc = RelationGetDescr(rel); TupleDesc tupDesc = RelationGetDescr(rel);
*functions = palloc(tupDesc->natts * sizeof(FmgrInfo)); *functions = palloc0(tupDesc->natts * sizeof(FmgrInfo));
foreach (lc, copy_attnums) foreach (lc, copy_attnums)
{ {
int offset = AttrNumberGetAttrOffset(lfirst_int(lc)); int offset = AttrNumberGetAttrOffset(lfirst_int(lc));
@ -607,36 +686,47 @@ get_copy_conversion_functions(Hypertable *ht, List *copy_attnums, FmgrInfo **fun
} }
static BinaryCopyContext * static BinaryCopyContext *
generate_binary_copy_context(const CopyStmt *stmt, Hypertable *ht, List *attnums) generate_binary_copy_context(const CopyStmt *stmt, ExprContext *econtext, Hypertable *ht,
List *attnums)
{ {
BinaryCopyContext *ctx = palloc(sizeof(BinaryCopyContext)); BinaryCopyContext *ctx = palloc0(sizeof(BinaryCopyContext));
int columns = get_copy_conversion_functions(ht, attnums, &ctx->out_functions); int columns = get_copy_conversion_functions(ht, attnums, &ctx->out_functions);
ctx->estate = CreateExecutorState();
ctx->values = palloc(columns * sizeof(Datum));
ctx->nulls = palloc(columns * sizeof(bool));
ctx->orig_context = MemoryContextSwitchTo(GetPerTupleMemoryContext(ctx->estate)); ctx->econtext = econtext;
ctx->values = palloc0(columns * sizeof(Datum));
ctx->nulls = palloc0(columns * sizeof(bool));
return ctx; return ctx;
} }
static RemoteCopyContext * static RemoteCopyContext *
begin_remote_copy_operation(const CopyStmt *stmt, Hypertable *ht, List *attnums) begin_remote_copy_operation(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums)
{ {
RemoteCopyContext *context = palloc(sizeof(RemoteCopyContext));
bool binary_copy = copy_should_send_binary(); bool binary_copy = copy_should_send_binary();
MemoryContext mctx =
AllocSetContextCreate(CurrentMemoryContext, "Remote COPY", ALLOCSET_DEFAULT_SIZES);
Hypertable *ht = ccstate->dispatch->hypertable;
ExprContext *econtext = GetPerTupleExprContext(ccstate->estate);
RemoteCopyContext *context;
MemoryContext oldmctx;
oldmctx = MemoryContextSwitchTo(mctx);
context = palloc0(sizeof(RemoteCopyContext));
context->ccstate = ccstate;
context->mctx = mctx;
context->binary_operation = binary_copy; context->binary_operation = binary_copy;
context->connection_state.connections_in_use = NIL; context->connection_state.connections_in_use = NIL;
context->connection_state.cached_connections = NIL; context->connection_state.cached_connections = NIL;
context->connection_state.mctx = CurrentMemoryContext;
context->connection_state.using_binary = binary_copy; context->connection_state.using_binary = binary_copy;
context->connection_state.outgoing_copy_cmd = deparse_copy_cmd(stmt, ht, binary_copy); context->connection_state.outgoing_copy_cmd = deparse_copy_cmd(stmt, ht, binary_copy);
if (binary_copy) if (binary_copy)
context->data_context = generate_binary_copy_context(stmt, ht, attnums); context->data_context = generate_binary_copy_context(stmt, econtext, ht, attnums);
else else
context->data_context = generate_text_copy_context(stmt, ht, attnums); context->data_context = generate_text_copy_context(stmt, ht, attnums);
MemoryContextSwitchTo(oldmctx);
return context; return context;
} }
@ -646,8 +736,6 @@ parse_next_text_row(CopyState cstate, List *attnums, TextCopyContext *ctx)
StringInfo row_data; StringInfo row_data;
int i; int i;
MemoryContextReset(ctx->tuple_context);
if (!NextCopyFromRawFields(cstate, &ctx->fields, &ctx->nfields)) if (!NextCopyFromRawFields(cstate, &ctx->fields, &ctx->nfields))
return NULL; return NULL;
@ -708,13 +796,11 @@ generate_binary_copy_data(Datum *values, bool *nulls, List *attnums, FmgrInfo *o
static StringInfo static StringInfo
parse_next_binary_row(CopyState cstate, List *attnums, BinaryCopyContext *ctx) parse_next_binary_row(CopyState cstate, List *attnums, BinaryCopyContext *ctx)
{ {
ResetPerTupleExprContext(ctx->estate);
#if PG12_GE #if PG12_GE
if (!NextCopyFrom(cstate, GetPerTupleExprContext(ctx->estate), ctx->values, ctx->nulls)) if (!NextCopyFrom(cstate, ctx->econtext, ctx->values, ctx->nulls))
return NULL; return NULL;
#else #else
if (!NextCopyFrom(cstate, GetPerTupleExprContext(ctx->estate), ctx->values, ctx->nulls, NULL)) if (!NextCopyFrom(cstate, ctx->econtext, ctx->values, ctx->nulls, NULL))
return NULL; return NULL;
#endif #endif
@ -780,10 +866,10 @@ reset_copy_connection_state(CopyConnectionState *state)
state->connections_in_use = NIL; state->connections_in_use = NIL;
} }
static Chunk * static const Chunk *
get_target_chunk(Hypertable *ht, Point *p, CopyConnectionState *state) get_target_chunk(Hypertable *ht, Point *p, CopyConnectionState *state)
{ {
Chunk *chunk = ts_hypertable_find_chunk_if_exists(ht, p); const Chunk *chunk = ts_hypertable_find_chunk_if_exists(ht, p);
if (chunk == NULL) if (chunk == NULL)
{ {
@ -818,10 +904,11 @@ send_copy_data(StringInfo row_data, List *connections)
} }
static void static void
process_and_send_copy_data(RemoteCopyContext *context, Hypertable *ht) process_and_send_copy_data(RemoteCopyContext *context)
{ {
Hypertable *ht = context->ccstate->dispatch->hypertable;
const Chunk *chunk;
Point *point; Point *point;
Chunk *chunk;
List *connections; List *connections;
if (context->binary_operation) if (context->binary_operation)
@ -830,54 +917,38 @@ process_and_send_copy_data(RemoteCopyContext *context, Hypertable *ht)
point = get_current_point_for_text_copy(ht, context->data_context); point = get_current_point_for_text_copy(ht, context->data_context);
chunk = get_target_chunk(ht, point, &context->connection_state); chunk = get_target_chunk(ht, point, &context->connection_state);
connections = get_connections_for_chunk(&context->connection_state, chunk); connections = get_connections_for_chunk(context, chunk);
send_copy_data(context->row_data, connections); send_copy_data(context->row_data, connections);
} }
static void
cleanup_text_copy_context(TextCopyContext *ctx)
{
MemoryContextSwitchTo(ctx->orig_context);
MemoryContextDelete(ctx->tuple_context);
}
static void
cleanup_binary_copy_context(BinaryCopyContext *ctx)
{
MemoryContextSwitchTo(ctx->orig_context);
FreeExecutorState(ctx->estate);
}
static void static void
end_copy_operation(RemoteCopyContext *context) end_copy_operation(RemoteCopyContext *context)
{ {
finish_outstanding_copies(&context->connection_state); finish_outstanding_copies(&context->connection_state);
if (context->binary_operation) MemoryContextDelete(context->mctx);
cleanup_binary_copy_context(context->data_context);
else
cleanup_text_copy_context(context->data_context);
} }
void uint64
remote_distributed_copy(const CopyStmt *stmt, uint64 *processed, CopyChunkState *ccstate, remote_distributed_copy(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums)
List *attnums)
{ {
Hypertable *ht = ccstate->dispatch->hypertable; MemoryContext oldmctx = CurrentMemoryContext;
RemoteCopyContext *context = begin_remote_copy_operation(stmt, ht, attnums); RemoteCopyContext *context = begin_remote_copy_operation(stmt, ccstate, attnums);
uint64 processed = 0;
*processed = 0;
PG_TRY(); PG_TRY();
{ {
while (true) while (true)
{ {
ResetPerTupleExprContext(ccstate->estate);
MemoryContextSwitchTo(GetPerTupleMemoryContext(ccstate->estate));
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
if (!read_next_copy_row(context, ccstate->cstate, attnums)) if (!read_next_copy_row(context, ccstate->cstate, attnums))
break; break;
process_and_send_copy_data(context, ht); process_and_send_copy_data(context);
++*processed; ++processed;
} }
} }
PG_CATCH(); PG_CATCH();
@ -890,4 +961,7 @@ remote_distributed_copy(const CopyStmt *stmt, uint64 *processed, CopyChunkState
PG_END_TRY(); PG_END_TRY();
end_copy_operation(context); end_copy_operation(context);
MemoryContextSwitchTo(oldmctx);
return processed;
} }

View File

@ -12,7 +12,6 @@
typedef struct Hypertable Hypertable; typedef struct Hypertable Hypertable;
typedef struct CopyChunkState CopyChunkState; typedef struct CopyChunkState CopyChunkState;
extern void remote_distributed_copy(const CopyStmt *stmt, uint64 *processed, extern uint64 remote_distributed_copy(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums);
CopyChunkState *ccstate, List *attnums);
#endif /* TIMESCALEDB_TSL_REMOTE_DIST_COPY_H */ #endif /* TIMESCALEDB_TSL_REMOTE_DIST_COPY_H */

View File

@ -66,15 +66,9 @@ ERROR: unable to use default value for partitioning column "pH"
-- Missing required column, these generate a WARNING with a transaction id in them (too flimsy to output) -- Missing required column, these generate a WARNING with a transaction id in them (too flimsy to output)
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
COPY "+ri(k33_')" (thyme, flavor, "pH") FROM STDIN WITH DELIMITER ','; COPY "+ri(k33_')" (thyme, flavor, "pH") FROM STDIN WITH DELIMITER ',';
ERROR: error during copy completion: ERROR: null value in column "))_" violates not-null constraint ERROR: null value in column "))_" violates not-null constraint
DETAIL: Failing row contains (5, null, blue, 2, null).
CONTEXT: COPY +ri(k33_'), line 1: "5,blue,2.0"
COPY "+ri(k33_')" FROM STDIN WITH DELIMITER ','; COPY "+ri(k33_')" FROM STDIN WITH DELIMITER ',';
ERROR: error during copy completion: ERROR: null value in column "))_" violates not-null constraint ERROR: null value in column "))_" violates not-null constraint
DETAIL: Failing row contains (5, null, blue, 1, blah).
CONTEXT: COPY +ri(k33_'), line 1: "5,\N,blue,1,blah"
SET client_min_messages TO INFO; SET client_min_messages TO INFO;
-- Invalid data after new chunk creation, data and chunks should be rolled back -- Invalid data after new chunk creation, data and chunks should be rolled back
COPY "+ri(k33_')" FROM STDIN WITH DELIMITER ','; COPY "+ri(k33_')" FROM STDIN WITH DELIMITER ',';